1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "process_communicator_impl.h"
17
18 #include <logger.h>
19
20 #include "anonymous.h"
21
22 namespace OHOS {
23 namespace ObjectStore {
24 using namespace DistributedDB;
ProcessCommunicatorImpl()25 ProcessCommunicatorImpl::ProcessCommunicatorImpl()
26 {
27 }
28
~ProcessCommunicatorImpl()29 ProcessCommunicatorImpl::~ProcessCommunicatorImpl()
30 {
31 LOG_ERROR("destructor.");
32 }
33
Start(const std::string & processLabel)34 DBStatus ProcessCommunicatorImpl::Start(const std::string &processLabel)
35 {
36 LOG_INFO("init commProvider");
37 thisProcessLabel_ = processLabel;
38 PipeInfo pi = { thisProcessLabel_ };
39 Status errCode = CommunicationProvider::GetInstance().Start(pi);
40 if (errCode != Status::SUCCESS) {
41 LOG_ERROR("commProvider_ Start Fail.");
42 return DBStatus::DB_ERROR;
43 }
44 return DBStatus::OK;
45 }
46
Stop()47 DBStatus ProcessCommunicatorImpl::Stop()
48 {
49 PipeInfo pi = { thisProcessLabel_ };
50 Status errCode = CommunicationProvider::GetInstance().Stop(pi);
51 if (errCode != Status::SUCCESS) {
52 LOG_ERROR("commProvider_ Stop Fail.");
53 return DBStatus::DB_ERROR;
54 }
55 return DBStatus::OK;
56 }
57
RegOnDeviceChange(const OnDeviceChange & callback)58 DBStatus ProcessCommunicatorImpl::RegOnDeviceChange(const OnDeviceChange &callback)
59 {
60 {
61 std::lock_guard<std::mutex> onDeviceChangeLockGard(onDeviceChangeMutex_);
62 onDeviceChangeHandler_ = callback;
63 }
64
65 PipeInfo pi = { thisProcessLabel_ };
66 if (callback) {
67 Status errCode = CommunicationProvider::GetInstance().StartWatchDeviceChange(this, pi);
68 if (errCode != Status::SUCCESS) {
69 LOG_ERROR("commProvider_ StartWatchDeviceChange Fail.");
70 return DBStatus::DB_ERROR;
71 }
72 } else {
73 Status errCode = CommunicationProvider::GetInstance().StopWatchDeviceChange(this, pi);
74 if (errCode != Status::SUCCESS) {
75 LOG_ERROR("commProvider_ StopWatchDeviceChange Fail.");
76 return DBStatus::DB_ERROR;
77 }
78 }
79
80 return DBStatus::OK;
81 }
82
RegOnDataReceive(const OnDataReceive & callback)83 DBStatus ProcessCommunicatorImpl::RegOnDataReceive(const OnDataReceive &callback)
84 {
85 {
86 std::lock_guard<std::mutex> onDataReceiveLockGard(onDataReceiveMutex_);
87 onDataReceiveHandler_ = callback;
88 }
89
90 PipeInfo pi = { thisProcessLabel_ };
91 if (callback) {
92 Status errCode = CommunicationProvider::GetInstance().StartWatchDataChange(this, pi);
93 if (errCode != Status::SUCCESS) {
94 LOG_ERROR("commProvider_ StartWatchDataChange Fail.");
95 return DBStatus::DB_ERROR;
96 }
97 } else {
98 Status errCode = CommunicationProvider::GetInstance().StopWatchDataChange(this, pi);
99 if (errCode != Status::SUCCESS) {
100 LOG_ERROR("commProvider_ StopWatchDataChange Fail.");
101 return DBStatus::DB_ERROR;
102 }
103 }
104 return DBStatus::OK;
105 }
106
SendData(const DeviceInfos & dstDevInfo,const uint8_t * data,uint32_t length)107 DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length)
108 {
109 uint32_t totalLength = 0;
110 return SendData(dstDevInfo, data, length, totalLength);
111 }
112
SendData(const DeviceInfos & dstDevInfo,const uint8_t * data,uint32_t length,uint32_t totalLength)113 DBStatus ProcessCommunicatorImpl::SendData(
114 const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length, uint32_t totalLength)
115 {
116 PipeInfo pi = { thisProcessLabel_ };
117 const DataInfo dataInfo = { const_cast<uint8_t *>(data), length };
118 DeviceId destination;
119 destination.deviceId = dstDevInfo.identifier;
120 Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
121 if (errCode != Status::SUCCESS) {
122 LOG_ERROR("commProvider_ SendData Fail.");
123 return DBStatus::DB_ERROR;
124 }
125
126 return DBStatus::OK;
127 }
128
GetMtuSize()129 uint32_t ProcessCommunicatorImpl::GetMtuSize()
130 {
131 return MTU_SIZE;
132 }
133
GetMtuSize(const DeviceInfos & devInfo)134 uint32_t ProcessCommunicatorImpl::GetMtuSize(const DeviceInfos &devInfo)
135 {
136 return MTU_SIZE;
137 }
138
GetLocalDeviceInfos()139 DeviceInfos ProcessCommunicatorImpl::GetLocalDeviceInfos()
140 {
141 DeviceInfos localDevInfos;
142 DeviceInfo devInfo = CommunicationProvider::GetInstance().GetLocalDevice();
143 localDevInfos.identifier = devInfo.deviceId;
144 return localDevInfos;
145 }
146
GetRemoteOnlineDeviceInfosList()147 std::vector<DeviceInfos> ProcessCommunicatorImpl::GetRemoteOnlineDeviceInfosList()
148 {
149 std::vector<DeviceInfos> remoteDevInfos;
150 std::vector<DeviceInfo> devInfoVec = CommunicationProvider::GetInstance().GetDeviceList();
151 for (auto const &entry : devInfoVec) {
152 DeviceInfos remoteDev;
153 remoteDev.identifier = entry.deviceId;
154 remoteDevInfos.push_back(remoteDev);
155 }
156 return remoteDevInfos;
157 }
158
IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos & peerDevInfo)159 bool ProcessCommunicatorImpl::IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo)
160 {
161 PipeInfo pi = { thisProcessLabel_ };
162 DeviceId di = { peerDevInfo.identifier };
163 return CommunicationProvider::GetInstance().IsSameStartedOnPeer(pi, di);
164 }
165
OnMessage(const DeviceInfo & info,const uint8_t * ptr,const int size,const PipeInfo & pipeInfo) const166 void ProcessCommunicatorImpl::OnMessage(
167 const DeviceInfo &info, const uint8_t *ptr, const int size, __attribute__((unused)) const PipeInfo &pipeInfo) const
168 {
169 std::lock_guard<std::mutex> onDataReceiveLockGuard(onDataReceiveMutex_);
170 if (onDataReceiveHandler_ == nullptr) {
171 LOG_ERROR("onDataReceiveHandler_ invalid.");
172 return;
173 }
174 DeviceInfos devInfo;
175 devInfo.identifier = info.deviceId;
176 onDataReceiveHandler_(devInfo, ptr, static_cast<uint32_t>(size));
177 }
178
OnDeviceChanged(const DeviceInfo & info,const DeviceChangeType & type) const179 void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const
180 {
181 std::lock_guard<std::mutex> onDeviceChangeLockGuard(onDeviceChangeMutex_);
182 if (onDeviceChangeHandler_ == nullptr) {
183 LOG_ERROR("onDeviceChangeHandler_ invalid.");
184 return;
185 }
186 std::vector<DeviceInfo> devices = CommunicationProvider::GetInstance().GetDeviceList();
187 for (const auto &device : devices) {
188 if (info.deviceId == device.deviceId) {
189 DeviceInfos devInfo{ info.deviceId };
190 onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE));
191 return;
192 }
193 }
194 LOG_WARN("Not a collaboration device, uuid: %{public}s.", Anonymous::Change(info.deviceId).c_str());
195 }
196 } // namespace ObjectStore
197 } // namespace OHOS
198