1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "process_communicator_impl.h"
17 
18 #include <logger.h>
19 
20 #include "anonymous.h"
21 
22 namespace OHOS {
23 namespace ObjectStore {
24 using namespace DistributedDB;
ProcessCommunicatorImpl()25 ProcessCommunicatorImpl::ProcessCommunicatorImpl()
26 {
27 }
28 
~ProcessCommunicatorImpl()29 ProcessCommunicatorImpl::~ProcessCommunicatorImpl()
30 {
31     LOG_ERROR("destructor.");
32 }
33 
Start(const std::string & processLabel)34 DBStatus ProcessCommunicatorImpl::Start(const std::string &processLabel)
35 {
36     LOG_INFO("init commProvider");
37     thisProcessLabel_ = processLabel;
38     PipeInfo pi = { thisProcessLabel_ };
39     Status errCode = CommunicationProvider::GetInstance().Start(pi);
40     if (errCode != Status::SUCCESS) {
41         LOG_ERROR("commProvider_ Start Fail.");
42         return DBStatus::DB_ERROR;
43     }
44     return DBStatus::OK;
45 }
46 
Stop()47 DBStatus ProcessCommunicatorImpl::Stop()
48 {
49     PipeInfo pi = { thisProcessLabel_ };
50     Status errCode = CommunicationProvider::GetInstance().Stop(pi);
51     if (errCode != Status::SUCCESS) {
52         LOG_ERROR("commProvider_ Stop Fail.");
53         return DBStatus::DB_ERROR;
54     }
55     return DBStatus::OK;
56 }
57 
RegOnDeviceChange(const OnDeviceChange & callback)58 DBStatus ProcessCommunicatorImpl::RegOnDeviceChange(const OnDeviceChange &callback)
59 {
60     {
61         std::lock_guard<std::mutex> onDeviceChangeLockGard(onDeviceChangeMutex_);
62         onDeviceChangeHandler_ = callback;
63     }
64 
65     PipeInfo pi = { thisProcessLabel_ };
66     if (callback) {
67         Status errCode = CommunicationProvider::GetInstance().StartWatchDeviceChange(this, pi);
68         if (errCode != Status::SUCCESS) {
69             LOG_ERROR("commProvider_ StartWatchDeviceChange Fail.");
70             return DBStatus::DB_ERROR;
71         }
72     } else {
73         Status errCode = CommunicationProvider::GetInstance().StopWatchDeviceChange(this, pi);
74         if (errCode != Status::SUCCESS) {
75             LOG_ERROR("commProvider_ StopWatchDeviceChange Fail.");
76             return DBStatus::DB_ERROR;
77         }
78     }
79 
80     return DBStatus::OK;
81 }
82 
RegOnDataReceive(const OnDataReceive & callback)83 DBStatus ProcessCommunicatorImpl::RegOnDataReceive(const OnDataReceive &callback)
84 {
85     {
86         std::lock_guard<std::mutex> onDataReceiveLockGard(onDataReceiveMutex_);
87         onDataReceiveHandler_ = callback;
88     }
89 
90     PipeInfo pi = { thisProcessLabel_ };
91     if (callback) {
92         Status errCode = CommunicationProvider::GetInstance().StartWatchDataChange(this, pi);
93         if (errCode != Status::SUCCESS) {
94             LOG_ERROR("commProvider_ StartWatchDataChange Fail.");
95             return DBStatus::DB_ERROR;
96         }
97     } else {
98         Status errCode = CommunicationProvider::GetInstance().StopWatchDataChange(this, pi);
99         if (errCode != Status::SUCCESS) {
100             LOG_ERROR("commProvider_ StopWatchDataChange Fail.");
101             return DBStatus::DB_ERROR;
102         }
103     }
104     return DBStatus::OK;
105 }
106 
SendData(const DeviceInfos & dstDevInfo,const uint8_t * data,uint32_t length)107 DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length)
108 {
109     uint32_t totalLength = 0;
110     return SendData(dstDevInfo, data, length, totalLength);
111 }
112 
SendData(const DeviceInfos & dstDevInfo,const uint8_t * data,uint32_t length,uint32_t totalLength)113 DBStatus ProcessCommunicatorImpl::SendData(
114     const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length, uint32_t totalLength)
115 {
116     PipeInfo pi = { thisProcessLabel_ };
117     const DataInfo dataInfo = { const_cast<uint8_t *>(data), length };
118     DeviceId destination;
119     destination.deviceId = dstDevInfo.identifier;
120     Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, dataInfo, totalLength);
121     if (errCode != Status::SUCCESS) {
122         LOG_ERROR("commProvider_ SendData Fail.");
123         return DBStatus::DB_ERROR;
124     }
125 
126     return DBStatus::OK;
127 }
128 
GetMtuSize()129 uint32_t ProcessCommunicatorImpl::GetMtuSize()
130 {
131     return MTU_SIZE;
132 }
133 
GetMtuSize(const DeviceInfos & devInfo)134 uint32_t ProcessCommunicatorImpl::GetMtuSize(const DeviceInfos &devInfo)
135 {
136     return MTU_SIZE;
137 }
138 
GetLocalDeviceInfos()139 DeviceInfos ProcessCommunicatorImpl::GetLocalDeviceInfos()
140 {
141     DeviceInfos localDevInfos;
142     DeviceInfo devInfo = CommunicationProvider::GetInstance().GetLocalDevice();
143     localDevInfos.identifier = devInfo.deviceId;
144     return localDevInfos;
145 }
146 
GetRemoteOnlineDeviceInfosList()147 std::vector<DeviceInfos> ProcessCommunicatorImpl::GetRemoteOnlineDeviceInfosList()
148 {
149     std::vector<DeviceInfos> remoteDevInfos;
150     std::vector<DeviceInfo> devInfoVec = CommunicationProvider::GetInstance().GetDeviceList();
151     for (auto const &entry : devInfoVec) {
152         DeviceInfos remoteDev;
153         remoteDev.identifier = entry.deviceId;
154         remoteDevInfos.push_back(remoteDev);
155     }
156     return remoteDevInfos;
157 }
158 
IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos & peerDevInfo)159 bool ProcessCommunicatorImpl::IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo)
160 {
161     PipeInfo pi = { thisProcessLabel_ };
162     DeviceId di = { peerDevInfo.identifier };
163     return CommunicationProvider::GetInstance().IsSameStartedOnPeer(pi, di);
164 }
165 
OnMessage(const DeviceInfo & info,const uint8_t * ptr,const int size,const PipeInfo & pipeInfo) const166 void ProcessCommunicatorImpl::OnMessage(
167     const DeviceInfo &info, const uint8_t *ptr, const int size, __attribute__((unused)) const PipeInfo &pipeInfo) const
168 {
169     std::lock_guard<std::mutex> onDataReceiveLockGuard(onDataReceiveMutex_);
170     if (onDataReceiveHandler_ == nullptr) {
171         LOG_ERROR("onDataReceiveHandler_ invalid.");
172         return;
173     }
174     DeviceInfos devInfo;
175     devInfo.identifier = info.deviceId;
176     onDataReceiveHandler_(devInfo, ptr, static_cast<uint32_t>(size));
177 }
178 
OnDeviceChanged(const DeviceInfo & info,const DeviceChangeType & type) const179 void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const
180 {
181     std::lock_guard<std::mutex> onDeviceChangeLockGuard(onDeviceChangeMutex_);
182     if (onDeviceChangeHandler_ == nullptr) {
183         LOG_ERROR("onDeviceChangeHandler_ invalid.");
184         return;
185     }
186     std::vector<DeviceInfo> devices = CommunicationProvider::GetInstance().GetDeviceList();
187     for (const auto &device : devices) {
188         if (info.deviceId == device.deviceId) {
189             DeviceInfos devInfo{ info.deviceId };
190             onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE));
191             return;
192         }
193     }
194     LOG_WARN("Not a collaboration device, uuid: %{public}s.", Anonymous::Change(info.deviceId).c_str());
195 }
196 } // namespace ObjectStore
197 } // namespace OHOS
198