1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "virtual_communicator_aggregator.h"
16 
17 #include <cstdint>
18 #include <thread>
19 #include <utility>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "runtime_context.h"
25 
26 namespace DistributedDB {
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)27 int VirtualCommunicatorAggregator::Initialize(IAdapter *inAdapter,
28     const std::shared_ptr<DBStatusAdapter> &statusAdapter)
29 {
30     return E_OK;
31 }
32 
Finalize()33 void VirtualCommunicatorAggregator::Finalize()
34 {
35 }
36 
37 // If not success, return nullptr and set outErrorNo
AllocCommunicator(uint64_t commLabel,int & outErrorNo)38 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
39 {
40     if (isEnable_) {
41         return AllocCommunicator(remoteDeviceId_, outErrorNo);
42     }
43     return nullptr;
44 }
45 
AllocCommunicator(const LabelType & commLabel,int & outErrorNo)46 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const LabelType &commLabel, int &outErrorNo)
47 {
48     LOGI("[VirtualCommunicatorAggregator][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
49     if (commLabel.size() != COMM_LABEL_LENGTH) {
50         outErrorNo = -E_INVALID_ARGS;
51         return nullptr;
52     }
53 
54     if (isEnable_) {
55         return AllocCommunicator(remoteDeviceId_, outErrorNo);
56     }
57     return nullptr;
58 }
59 
ReleaseCommunicator(ICommunicator * inCommunicator)60 void VirtualCommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
61 {
62     // Called in main thread only
63     VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(inCommunicator);
64     OfflineDevice(communicator->GetDeviceId());
65     {
66         std::lock_guard<std::mutex> lock(communicatorsLock_);
67         communicators_.erase(communicator->GetDeviceId());
68     }
69     RefObject::KillAndDecObjRef(communicator);
70     communicator = nullptr;
71 }
72 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)73 int VirtualCommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
74     const Finalizer &inOper)
75 {
76     onCommLack_ = onCommLack;
77     return E_OK;
78 }
79 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)80 int VirtualCommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
81 {
82     onConnect_ = onConnect;
83     RunOnConnectCallback("deviceId", true);
84     return E_OK;
85 }
86 
RunCommunicatorLackCallback(const LabelType & commLabel)87 void VirtualCommunicatorAggregator::RunCommunicatorLackCallback(const LabelType &commLabel)
88 {
89     if (onCommLack_) {
90         onCommLack_(commLabel, userId_);
91     }
92 }
93 
RunOnConnectCallback(const std::string & target,bool isConnect)94 void VirtualCommunicatorAggregator::RunOnConnectCallback(const std::string &target, bool isConnect)
95 {
96     if (onConnect_) {
97         onConnect_(target, isConnect);
98     }
99 }
100 
GetLocalIdentity(std::string & outTarget) const101 int VirtualCommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
102 {
103     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
104     if (localDeviceId_.empty()) {
105         outTarget = "DEVICES_A";
106     } else {
107         outTarget = localDeviceId_;
108     }
109     return getLocalDeviceRet_;
110 }
111 
OnlineDevice(const std::string & deviceId) const112 void VirtualCommunicatorAggregator::OnlineDevice(const std::string &deviceId) const
113 {
114     if (!isEnable_) {
115         return;
116     }
117 
118     // Called in main thread only
119     for (const auto &iter : communicators_) {
120         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
121         if (iter.first != deviceId) {
122             communicatorTmp->CallbackOnConnect(deviceId, true);
123         }
124     }
125 }
126 
OfflineDevice(const std::string & deviceId) const127 void VirtualCommunicatorAggregator::OfflineDevice(const std::string &deviceId) const
128 {
129     if (!isEnable_) {
130         return;
131     }
132 
133     // Called in main thread only
134     for (const auto &iter : communicators_) {
135         VirtualCommunicator *communicatorTmp = static_cast<VirtualCommunicator *>(iter.second);
136         if (iter.first != deviceId) {
137             communicatorTmp->CallbackOnConnect(deviceId, false);
138         }
139     }
140 }
141 
AllocCommunicator(const std::string & deviceId,int & outErrorNo)142 ICommunicator *VirtualCommunicatorAggregator::AllocCommunicator(const std::string &deviceId, int &outErrorNo)
143 {
144     // Called in main thread only
145     VirtualCommunicator *communicator = new (std::nothrow) VirtualCommunicator(deviceId, this);
146     if (communicator == nullptr) {
147         outErrorNo = -E_OUT_OF_MEMORY;
148     }
149     {
150         std::lock_guard<std::mutex> lock(communicatorsLock_);
151         communicators_.insert(std::pair<std::string, VirtualCommunicator *>(deviceId, communicator));
152     }
153     OnlineDevice(deviceId);
154     return communicator;
155 }
156 
GetCommunicator(const std::string & deviceId) const157 ICommunicator *VirtualCommunicatorAggregator::GetCommunicator(const std::string &deviceId) const
158 {
159     std::lock_guard<std::mutex> lock(communicatorsLock_);
160     auto iter = communicators_.find(deviceId);
161     if (iter != communicators_.end()) {
162         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
163         return communicator;
164     }
165     return nullptr;
166 }
167 
DispatchMessage(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)168 void VirtualCommunicatorAggregator::DispatchMessage(const std::string &srcTarget, const std::string &dstTarget,
169     const Message *inMsg, const OnSendEnd &onEnd)
170 {
171     if (VirtualCommunicatorAggregator::GetBlockValue()) {
172         std::unique_lock<std::mutex> lock(blockLock_);
173         conditionVar_.wait(lock);
174     }
175 
176     if (!isEnable_) {
177         LOGD("[VirtualCommunicatorAggregator] DispatchMessage, VirtualCommunicatorAggregator is disabled");
178         delete inMsg;
179         inMsg = nullptr;
180         return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
181     }
182     if (beforeDispatch_) {
183         beforeDispatch_(dstTarget, inMsg);
184     }
185     DispatchMessageInner(srcTarget, dstTarget, inMsg, onEnd);
186 }
187 
DispatchMessageInner(const std::string & srcTarget,const std::string & dstTarget,const Message * inMsg,const OnSendEnd & onEnd)188 void VirtualCommunicatorAggregator::DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget,
189     const Message *inMsg, const OnSendEnd &onEnd)
190 {
191     std::lock_guard<std::mutex> lock(communicatorsLock_);
192     auto iter = communicators_.find(dstTarget);
193     if (iter != communicators_.end()) {
194         LOGI("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s", dstTarget.c_str());
195         VirtualCommunicator *communicator = static_cast<VirtualCommunicator *>(iter->second);
196         if (!communicator->IsEnabled()) {
197             LOGE("[VirtualCommunicatorAggregator] DispatchMessage, find dstTarget %s disabled", dstTarget.c_str());
198             delete inMsg;
199             inMsg = nullptr;
200             return CallSendEnd(-E_PERIPHERAL_INTERFACE_FAIL, onEnd);
201         }
202         uint32_t messageId = inMsg->GetMessageId();
203         Message *msg = const_cast<Message *>(inMsg);
204         msg->SetTarget(srcTarget);
205         RefObject::IncObjRef(communicator);
206         auto onDispatch = onDispatch_;
207         bool isNeedDelay = ((sendDelayTime_ > 0) && (delayTimes_ > 0) && (messageId == delayMessageId_) &&
208             (delayDevices_.count(dstTarget) > 0) && (skipTimes_ == 0));
209         uint32_t sendDelayTime = sendDelayTime_;
210         std::thread thread([communicator, srcTarget, dstTarget, msg, isNeedDelay, sendDelayTime, onDispatch]() {
211             if (isNeedDelay) {
212                 std::this_thread::sleep_for(std::chrono::milliseconds(sendDelayTime));
213             }
214             if (onDispatch) {
215                 onDispatch(dstTarget, msg);
216             }
217             communicator->CallbackOnMessage(srcTarget, msg);
218             RefObject::DecObjRef(communicator);
219         });
220         DelayTimeHandle(messageId, dstTarget);
221         thread.detach();
222         CallSendEnd(E_OK, onEnd);
223     } else {
224         LOGE("[VirtualCommunicatorAggregator] DispatchMessage, can't find dstTarget %s", dstTarget.c_str());
225         delete inMsg;
226         inMsg = nullptr;
227         CallSendEnd(-E_NOT_FOUND, onEnd);
228     }
229 }
230 
SetBlockValue(bool value)231 void VirtualCommunicatorAggregator::SetBlockValue(bool value)
232 {
233     std::unique_lock<std::mutex> lock(blockLock_);
234     isBlock_ = value;
235     if (!value) {
236         conditionVar_.notify_all();
237     }
238 }
239 
GetBlockValue() const240 bool VirtualCommunicatorAggregator::GetBlockValue() const
241 {
242     return isBlock_;
243 }
244 
Disable()245 void VirtualCommunicatorAggregator::Disable()
246 {
247     isEnable_ = false;
248 }
249 
Enable()250 void VirtualCommunicatorAggregator::Enable()
251 {
252     LOGD("[VirtualCommunicatorAggregator] enable");
253     isEnable_ = true;
254 }
255 
CallSendEnd(int errCode,const OnSendEnd & onEnd)256 void VirtualCommunicatorAggregator::CallSendEnd(int errCode, const OnSendEnd &onEnd)
257 {
258     if (commErrCodeMock_ != E_OK) {
259         errCode = commErrCodeMock_;
260     }
261     if (onEnd) {
262         (void)RuntimeContext::GetInstance()->ScheduleTask([errCode, onEnd, this]() {
263             onEnd(errCode, isDirectEnd_);
264         });
265     }
266 }
267 
RegOnDispatch(const std::function<void (const std::string &,Message * inMsg)> & onDispatch)268 void VirtualCommunicatorAggregator::RegOnDispatch(
269     const std::function<void(const std::string&, Message *inMsg)> &onDispatch)
270 {
271     onDispatch_ = onDispatch;
272 }
273 
SetCurrentUserId(const std::string & userId)274 void VirtualCommunicatorAggregator::SetCurrentUserId(const std::string &userId)
275 {
276     userId_ = userId;
277 }
278 
SetTimeout(const std::string & deviceId,uint32_t timeout)279 void VirtualCommunicatorAggregator::SetTimeout(const std::string &deviceId, uint32_t timeout)
280 {
281     std::lock_guard<std::mutex> lock(communicatorsLock_);
282     if (communicators_.find(deviceId) != communicators_.end()) {
283         communicators_[deviceId]->SetTimeout(timeout);
284     }
285 }
286 
SetDropMessageTypeByDevice(const std::string & deviceId,MessageId msgid,uint32_t dropTimes)287 void VirtualCommunicatorAggregator::SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid,
288     uint32_t dropTimes)
289 {
290     std::lock_guard<std::mutex> lock(communicatorsLock_);
291     if (communicators_.find(deviceId) != communicators_.end()) {
292         communicators_[deviceId]->SetDropMessageTypeByDevice(msgid, dropTimes);
293     }
294 }
295 
SetDeviceMtuSize(const std::string & deviceId,uint32_t mtuSize)296 void VirtualCommunicatorAggregator::SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize)
297 {
298     std::lock_guard<std::mutex> lock(communicatorsLock_);
299     if (communicators_.find(deviceId) != communicators_.end()) {
300         communicators_[deviceId]->SetCommunicatorMtuSize(mtuSize);
301     }
302 }
303 
SetSendDelayInfo(uint32_t sendDelayTime,uint32_t delayMessageId,uint32_t delayTimes,uint32_t skipTimes,std::set<std::string> & delayDevices)304 void VirtualCommunicatorAggregator::SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId,
305     uint32_t delayTimes, uint32_t skipTimes, std::set<std::string> &delayDevices)
306 {
307     sendDelayTime_ = sendDelayTime;
308     delayMessageId_ = delayMessageId;
309     delayTimes_ = delayTimes;
310     delayDevices_ = delayDevices;
311     skipTimes_ = skipTimes;
312 }
313 
ResetSendDelayInfo()314 void VirtualCommunicatorAggregator::ResetSendDelayInfo()
315 {
316     sendDelayTime_ = 0;
317     delayMessageId_ = INVALID_MESSAGE_ID;
318     delayTimes_ = 0;
319     skipTimes_ = 0;
320     delayDevices_.clear();
321 }
322 
DelayTimeHandle(uint32_t messageId,const std::string & dstTarget)323 void VirtualCommunicatorAggregator::DelayTimeHandle(uint32_t messageId, const std::string &dstTarget)
324 {
325     if ((skipTimes_ == 0) && delayTimes_ > 0 && (messageId == delayMessageId_) &&
326         (delayDevices_.count(dstTarget) > 0)) {
327         delayTimes_--;
328     }
329     if (skipTimes_ > 0 && (messageId == delayMessageId_) && (delayDevices_.count(dstTarget) > 0)) {
330         skipTimes_--;
331     }
332 }
333 
GetOnlineDevices()334 std::set<std::string> VirtualCommunicatorAggregator::GetOnlineDevices()
335 {
336     std::lock_guard<std::mutex> lock(communicatorsLock_);
337     std::set<std::string> onlineDevices;
338     for (const auto &item: communicators_) {
339         onlineDevices.insert(item.first);
340     }
341     return onlineDevices;
342 }
343 
DisableCommunicator()344 void VirtualCommunicatorAggregator::DisableCommunicator()
345 {
346     std::lock_guard<std::mutex> lock(communicatorsLock_);
347     for (const auto &communicator: communicators_) {
348         communicator.second->Disable();
349     }
350 }
351 
EnableCommunicator()352 void VirtualCommunicatorAggregator::EnableCommunicator()
353 {
354     std::lock_guard<std::mutex> lock(communicatorsLock_);
355     for (const auto &communicator: communicators_) {
356         communicator.second->Disable();
357     }
358 }
359 
RegBeforeDispatch(const std::function<void (const std::string &,const Message *)> & beforeDispatch)360 void VirtualCommunicatorAggregator::RegBeforeDispatch(
361     const std::function<void(const std::string &, const Message *)> &beforeDispatch)
362 {
363     beforeDispatch_ = beforeDispatch;
364 }
365 
SetLocalDeviceId(const std::string & deviceId)366 void VirtualCommunicatorAggregator::SetLocalDeviceId(const std::string &deviceId)
367 {
368     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
369     localDeviceId_ = deviceId;
370 }
371 
MockGetLocalDeviceRes(int mockRes)372 void VirtualCommunicatorAggregator::MockGetLocalDeviceRes(int mockRes)
373 {
374     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
375     getLocalDeviceRet_ = mockRes;
376 }
377 
MockCommErrCode(int mockErrCode)378 void VirtualCommunicatorAggregator::MockCommErrCode(int mockErrCode)
379 {
380     std::lock_guard<std::mutex> lock(localDeviceIdMutex_);
381     commErrCodeMock_ = mockErrCode;
382 }
383 
MockDirectEndFlag(bool isDirectEnd)384 void VirtualCommunicatorAggregator::MockDirectEndFlag(bool isDirectEnd)
385 {
386     isDirectEnd_ = isDirectEnd;
387 }
388 } // namespace DistributedDB
389