1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26 
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33     std::stringstream stream;
34     stream << std::this_thread::get_id();
35     return stream.str();
36 }
37 }
38 
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40 
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42     : shutdown_(false),
43       incFrameId_(0),
44       localSourceId_(0)
45 {
46 }
47 
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51     adapterHandle_ = nullptr;
52     commLinker_ = nullptr;
53 }
54 
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57     if (inAdapter == nullptr) {
58         return -E_INVALID_ARGS;
59     }
60     adapterHandle_ = inAdapter;
61 
62     combiner_.Initialize();
63     retainer_.Initialize();
64     scheduler_.Initialize();
65 
66     int errCode;
67     commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68     if (commLinker_ == nullptr) {
69         errCode = -E_OUT_OF_MEMORY;
70         goto ROLL_BACK;
71     }
72     commLinker_->Initialize();
73 
74     errCode = RegCallbackToAdapter();
75     if (errCode != E_OK) {
76         goto ROLL_BACK;
77     }
78 
79     errCode = adapterHandle_->StartAdapter();
80     if (errCode != E_OK) {
81         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82         goto ROLL_BACK;
83     }
84     GenerateLocalSourceId();
85 
86     shutdown_ = false;
87     InitSendThread();
88     dbStatusAdapter_ = statusAdapter;
89     RegDBChangeCallback();
90     return E_OK;
91 ROLL_BACK:
92     UnRegCallbackFromAdapter();
93     if (commLinker_ != nullptr) {
94         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95         commLinker_ = nullptr;
96     }
97     // Scheduler do not need to do finalize in this roll_back
98     retainer_.Finalize();
99     combiner_.Finalize();
100     return errCode;
101 }
102 
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105     shutdown_ = true;
106     retryCv_.notify_all();
107     {
108         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109         wakingSignal_ = true;
110         wakingCv_.notify_one();
111     }
112     if (useExclusiveThread_) {
113         exclusiveThread_.join(); // Waiting thread to thoroughly quit
114         LOGI("[CommAggr][Final] Sub Thread Exit.");
115     } else {
116         LOGI("[CommAggr][Final] Begin wait send task exit.");
117         std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118         finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119             return !sendTaskStart_;
120         });
121         LOGI("[CommAggr][Final] End wait send task exit.");
122     }
123     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124 
125     adapterHandle_->StopAdapter();
126     UnRegCallbackFromAdapter();
127     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128 
129     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131     commLinker_ = nullptr;
132     retainer_.Finalize();
133     combiner_.Finalize();
134     dbStatusAdapter_ = nullptr;
135 }
136 
AllocCommunicator(uint64_t commLabel,int & outErrorNo)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
138 {
139     uint64_t netOrderLabel = HostToNet(commLabel);
140     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143         realLabel[i] = eachByte[i];
144     }
145     return AllocCommunicator(realLabel, outErrorNo);
146 }
147 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
149 {
150     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
151     LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
152     if (commLabel.size() != COMM_LABEL_LENGTH) {
153         outErrorNo = -E_INVALID_ARGS;
154         return nullptr;
155     }
156 
157     if (commMap_.count(commLabel) != 0) {
158         outErrorNo = -E_ALREADY_ALLOC;
159         return nullptr;
160     }
161 
162     Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
163     if (commPtr == nullptr) {
164         outErrorNo = -E_OUT_OF_MEMORY;
165         return nullptr;
166     }
167     commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
168     return commPtr;
169 }
170 
ReleaseCommunicator(ICommunicator * inCommunicator)171 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
172 {
173     if (inCommunicator == nullptr) {
174         return;
175     }
176     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
177     LabelType commLabel = commPtr->GetCommunicatorLabel();
178     LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
179 
180     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
181     if (commMap_.count(commLabel) == 0) {
182         LOGE("[CommAggr][Release] Not Found.");
183         return;
184     }
185     commMap_.erase(commLabel);
186     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
187 
188     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
189     if (errCode != E_OK) {
190         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
191     }
192 }
193 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)194 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
195     const Finalizer &inOper)
196 {
197     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
198     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
199 }
200 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)201 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
202 {
203     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
204     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
205     if (onConnect && errCode == E_OK) {
206         // Register action and success
207         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
208         for (auto &entry : onlineTargets) {
209             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
210             onConnectHandle_(entry, true);
211         }
212     }
213     return errCode;
214 }
215 
GetCommunicatorAggregatorMtuSize() const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
217 {
218     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
219 }
220 
GetCommunicatorAggregatorMtuSize(const std::string & target) const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
222 {
223     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225 
GetCommunicatorAggregatorTimeout() const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
227 {
228     return adapterHandle_->GetTimeout();
229 }
230 
GetCommunicatorAggregatorTimeout(const std::string & target) const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
232 {
233     return adapterHandle_->GetTimeout(target);
234 }
235 
IsDeviceOnline(const std::string & device) const236 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
237 {
238     return adapterHandle_->IsDeviceOnline(device);
239 }
240 
GetLocalIdentity(std::string & outTarget) const241 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
242 {
243     return adapterHandle_->GetLocalIdentity(outTarget);
244 }
245 
ActivateCommunicator(const LabelType & commLabel)246 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
247 {
248     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
249     LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
250     if (commMap_.count(commLabel) == 0) {
251         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
252         return;
253     }
254     if (commMap_.at(commLabel).second) {
255         return;
256     }
257     commMap_.at(commLabel).second = true; // Mark this communicator as activated
258 
259     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
260     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
261     std::set<std::string> onlineTargets;
262     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
263     if (errCode != E_OK) {
264         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
265         // Do not return here
266     }
267     for (auto &entry : onlineTargets) {
268         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
269         commMap_.at(commLabel).first->OnConnectChange(entry, true);
270     }
271     // Do Redeliver, the communicator is responsible to deal with the frame
272     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
273     for (auto &entry : framesToRedeliver) {
274         commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
275     }
276 }
277 
278 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)279 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
280 {
281     if (onEnd) { // LCOV_EXCL_BR_LINE
282         TaskAction onSendEndTask = [onEnd, result]() {
283             LOGD("[CommAggr][SendEndTask] Before On Send End.");
284             onEnd(result, true);
285             LOGD("[CommAggr][SendEndTask] After On Send End.");
286         };
287         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
288         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
289             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
290         }
291     }
292 }
293 }
294 
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)295 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
296     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
297 {
298     if (inBuff == nullptr) {
299         return -E_INVALID_ARGS;
300     }
301 
302     if (!ReGenerateLocalSourceIdIfNeed()) {
303         delete inBuff;
304         inBuff = nullptr;
305         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
306         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
307         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
308     }
309     bool sendLabelExchange = true;
310     if (dbStatusAdapter_ != nullptr) {
311         sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
312     }
313     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
314         sendLabelExchange};
315     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
316     if (errCode != E_OK) {
317         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
318         return errCode;
319     }
320     {
321         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
322         sendRecord_[info.frameId] = {};
323     }
324     SendTask task{inBuff, dstTarget, onEnd, info.frameId, true};
325     if (inConfig.nonBlock) {
326         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
327     } else {
328         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
329     }
330     if (errCode != E_OK) {
331         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
332         return errCode;
333     }
334     TriggerSendData();
335     LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
336     return E_OK;
337 }
338 
EnableCommunicatorNotFoundFeedback(bool isEnable)339 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
340 {
341     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
342 }
343 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const344 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
345 {
346     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
347     auto pair = versionMap_.find(target);
348     if (pair == versionMap_.end()) { // LCOV_EXCL_BR_LINE
349         return -E_NOT_FOUND;
350     }
351     outVersion = pair->second;
352     return E_OK;
353 }
354 
SendDataRoutine()355 void CommunicatorAggregator::SendDataRoutine()
356 {
357     while (!shutdown_) {
358         if (scheduler_.GetNoDelayTaskCount() == 0) {
359             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
360             LOGI("[CommAggr][Routine] Send done and sleep.");
361             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
362             LOGI("[CommAggr][Routine] Send continue.");
363             wakingSignal_ = false;
364             continue;
365         }
366         SendOnceData();
367     }
368 }
369 
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)370 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
371     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
372 {
373     bool taskNeedFinalize = true;
374     int errCode = E_OK;
375     ResetFrameRecordIfNeed(inTask.frameId, mtu);
376     uint32_t startIndex;
377     {
378         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
379         startIndex = sendRecord_[inTask.frameId].sendIndex;
380     }
381     uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
382     for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
383         auto &entry = eachPacket[index];
384         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
385             ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
386         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
387         errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
388         {
389             std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
390             sendRecord_[inTask.frameId].sendIndex = index;
391         }
392         if (errCode == -E_WAIT_RETRY) {
393             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
394             taskNeedFinalize = false;
395             break;
396         } else if (errCode != E_OK) {
397             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
398             break;
399         } else {
400             std::lock_guard<std::mutex> autoLock(retryCountMutex_);
401             retryCount_[inTask.dstTarget] = 0;
402         }
403     }
404     if (errCode == -E_WAIT_RETRY) {
405         RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
406     }
407     if (taskNeedFinalize) {
408         TaskFinalizer(inTask, errCode);
409         std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
410         sendRecord_.erase(inTask.frameId);
411     }
412 }
413 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)414 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
415 {
416     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
417     if (errCode != E_OK) {
418         bool notTimeout = true;
419         auto retryFunc = [this, inPrio, &inTask]()->bool {
420             if (this->shutdown_) {
421                 delete inTask.buffer;
422                 inTask.buffer = nullptr;
423                 return true;
424             }
425             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
426             if (retCode != E_OK) {
427                 return false;
428             }
429             return true;
430         };
431 
432         if (timeout == 0) { // Unlimited retry
433             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
434             retryCv_.wait(retryUniqueLock, retryFunc);
435         } else {
436             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
437             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
438         }
439 
440         if (shutdown_) {
441             return E_OK;
442         }
443         if (!notTimeout) {
444             return -E_TIMEOUT;
445         }
446     }
447     return E_OK;
448 }
449 
TaskFinalizer(const SendTask & inTask,int result)450 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
451 {
452     // Call the OnSendEnd if need
453     if (inTask.onEnd) {
454         LOGD("[CommAggr][TaskFinal] On Send End.");
455         inTask.onEnd(result, true);
456     }
457     // Finalize the task that just scheduled
458     int errCode = scheduler_.FinalizeLastScheduleTask();
459     // Notify Sendable To All Communicator If Need
460     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
461         retryCv_.notify_all();
462     }
463     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
464         NotifySendableToAllCommunicator();
465     }
466 }
467 
NotifySendableToAllCommunicator()468 void CommunicatorAggregator::NotifySendableToAllCommunicator()
469 {
470     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
471     for (auto &entry : commMap_) {
472         // Ignore nonactivated communicator
473         if (entry.second.second) {
474             entry.second.first->OnSendAvailable();
475         }
476     }
477 }
478 
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)479 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
480     const std::string &userId)
481 {
482     ProtocolProto::DisplayPacketInformation(bytes, length);
483     ParseResult packetResult;
484     int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
485     if (errCode != E_OK) {
486         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
487         if (errCode == -E_VERSION_NOT_SUPPORT) {
488             TriggerVersionNegotiation(srcTarget);
489         }
490         return;
491     }
492 
493     // Update version of remote target
494     SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
495     if (dbStatusAdapter_ != nullptr) {
496         dbStatusAdapter_->SetRemoteOptimizeCommunication(srcTarget, !packetResult.IsSendLabelExchange());
497     }
498     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
499         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
500         return;
501     }
502 
503     if (packetResult.IsFragment()) {
504         OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
505     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
506         errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
507         if (errCode != E_OK) {
508             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
509         }
510     } else {
511         errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
512         if (errCode != E_OK) {
513             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
514         }
515     }
516 }
517 
OnTargetChange(const std::string & target,bool isConnect)518 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
519 {
520     if (target.empty()) {
521         LOGE("[CommAggr][OnTarget] Target empty string.");
522         return;
523     }
524     // For process level target change
525     {
526         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
527         if (onConnectHandle_) {
528             onConnectHandle_(target, isConnect);
529             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
530         } else {
531             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
532         }
533     }
534     std::set<LabelType> relatedLabels;
535     // For communicator level target change
536     if (isConnect) {
537         int errCode = commLinker_->TargetOnline(target, relatedLabels);
538         if (errCode != E_OK) {
539             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
540         }
541     } else {
542         commLinker_->TargetOffline(target, relatedLabels);
543     }
544     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
545     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
546     for (auto &entry : commMap_) {
547         // Ignore nonactivated communicator
548         if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
549             entry.second.first->OnConnectChange(target, isConnect);
550         }
551     }
552 }
553 
OnSendable(const std::string & target)554 void CommunicatorAggregator::OnSendable(const std::string &target)
555 {
556     int errCode = scheduler_.NoDelayTaskByTarget(target);
557     if (errCode != E_OK) {
558         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
559         return;
560     }
561     TriggerSendData();
562 }
563 
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)564 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
565     const ParseResult &inResult, const std::string &userId)
566 {
567     int errorNo = E_OK;
568     ParseResult frameResult;
569     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
570     if (errorNo != E_OK) {
571         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
572         return;
573     }
574     if (frameBuffer == nullptr) {
575         LOGW("[CommAggr][Receive] Combine undone.");
576         return;
577     }
578 
579     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
580     if (errCode != E_OK) {
581         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
582         delete frameBuffer;
583         frameBuffer = nullptr;
584         if (errCode == -E_VERSION_NOT_SUPPORT) {
585             TriggerVersionNegotiation(srcTarget);
586         }
587         return;
588     }
589 
590     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
591         errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
592         if (errCode != E_OK) {
593             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
594         }
595         delete frameBuffer;
596         frameBuffer = nullptr;
597     } else {
598         errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
599         if (errCode != E_OK) {
600             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
601         }
602     }
603 }
604 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)605 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
606 {
607     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
608         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
609             inResult.GetLabelExchangeSequenceId());
610         if (errCode != E_OK) {
611             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
612             return errCode;
613         }
614     } else {
615         std::map<LabelType, bool> changedLabels;
616         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
617             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
618         if (errCode != E_OK) {
619             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
620             return errCode;
621         }
622         NotifyConnectChange(srcTarget, changedLabels);
623     }
624     return E_OK;
625 }
626 
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)627 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
628     uint32_t length, const ParseResult &inResult, const std::string &userId)
629 {
630     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
631     if (buffer == nullptr) {
632         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
633         return -E_OUT_OF_MEMORY;
634     }
635     int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
636         ProtocolProto::GetAppLayerFrameHeaderLength());
637     if (errCode != E_OK) {
638         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
639         delete buffer;
640         buffer = nullptr;
641         return -E_INTERNAL_ERROR;
642     }
643     return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
644 }
645 
646 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
647 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
648 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
649 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
650 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
651 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
652 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
653 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
654 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
655 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
656 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
657 // in the same callback thread finally causing DeadLock on commMapMutex_.
658 // #### SO #### we have to make a change described below
659 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
660 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
661 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
662 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
663 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)664 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
665     const ParseResult &inResult, const std::string &userId)
666 {
667     LabelType toLabel = inResult.GetCommLabel();
668     {
669         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
670         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
671         if (errCode == E_OK) { // Attention: Here is equal to E_OK
672             return E_OK;
673         }
674     }
675     LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
676     int errCode = -E_NOT_FOUND;
677     {
678         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
679         if (onCommLackHandle_) {
680             errCode = onCommLackHandle_(toLabel, userId);
681             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
682         } else {
683             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
684         }
685     }
686     // Here we have to lock commMapMutex_ and search communicator again.
687     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
688     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
689     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
690         LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
691         return E_OK;
692     }
693     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
694     if (errCode != E_OK) {
695         TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
696         delete inFrameBuffer;
697         inFrameBuffer = nullptr;
698         return errCode; // The caller will display errCode in log
699     }
700     // Do Retention, the retainer is responsible to deal with the frame
701     retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
702     inFrameBuffer = nullptr;
703     return E_OK;
704 }
705 
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)706 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
707     SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
708 {
709     // Ignore nonactivated communicator, which is regarded as inexistent
710     if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
711         commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
712         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
713         inFrameBuffer = nullptr;
714         return E_OK;
715     }
716     return -E_NOT_FOUND;
717 }
718 
RegCallbackToAdapter()719 int CommunicatorAggregator::RegCallbackToAdapter()
720 {
721     RefObject::IncObjRef(this); // Reference to be hold by adapter
722     int errCode = adapterHandle_->RegBytesReceiveCallback(
723         [this](const std::string &srcTarget, const uint8_t *bytes, uint32_t length, const std::string &userId) {
724             OnBytesReceive(srcTarget, bytes, length, userId);
725         }, [this]() { RefObject::DecObjRef(this); });
726     if (errCode != E_OK) {
727         RefObject::DecObjRef(this); // Rollback in case reg failed
728         return errCode;
729     }
730 
731     RefObject::IncObjRef(this); // Reference to be hold by adapter
732     errCode = adapterHandle_->RegTargetChangeCallback(
733         [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
734         [this]() { RefObject::DecObjRef(this); });
735     if (errCode != E_OK) {
736         RefObject::DecObjRef(this); // Rollback in case reg failed
737         return errCode;
738     }
739 
740     RefObject::IncObjRef(this); // Reference to be hold by adapter
741     errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int softBusErrCode) {
742             LOGI("[CommAggr] Send able dev=%.3s, softBusErrCode=%d", target.c_str(), softBusErrCode);
743             if (softBusErrCode == E_OK) {
744                 (void)IncreaseSendSequenceId(target);
745                 OnSendable(target);
746             }
747             scheduler_.SetSoftBusErrCode(target, softBusErrCode);
748         },
749         [this]() { RefObject::DecObjRef(this); });
750     if (errCode != E_OK) {
751         RefObject::DecObjRef(this); // Rollback in case reg failed
752         return errCode;
753     }
754 
755     return E_OK;
756 }
757 
UnRegCallbackFromAdapter()758 void CommunicatorAggregator::UnRegCallbackFromAdapter()
759 {
760     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
761     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
762     adapterHandle_->RegSendableCallback(nullptr, nullptr);
763     if (dbStatusAdapter_ != nullptr) {
764         dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
765     }
766 }
767 
GenerateLocalSourceId()768 void CommunicatorAggregator::GenerateLocalSourceId()
769 {
770     std::string identity;
771     adapterHandle_->GetLocalIdentity(identity);
772     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
773     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
774     uint64_t identityHash = Hash::HashFunc(identity);
775     if (identityHash != localSourceId_) {
776         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
777     }
778     localSourceId_ = identityHash;
779 }
780 
ReGenerateLocalSourceIdIfNeed()781 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
782 {
783     // The deviceId will change when switch user from A to B
784     // We can't listen to the user change, because it's hard to ensure the timing is correct.
785     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
786     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
787     GenerateLocalSourceId();
788     return (localSourceId_ != 0);
789 }
790 
TriggerVersionNegotiation(const std::string & dstTarget)791 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
792 {
793     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
794     int errCode = E_OK;
795     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
796     if (errCode != E_OK) {
797         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
798         return;
799     }
800 
801     TaskConfig config{true, 0, Priority::HIGH};
802     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
803     if (errCode != E_OK) {
804         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
805         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
806         delete buffer;
807         buffer = nullptr;
808     }
809 }
810 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)811 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
812     const LabelType &dstLabel, const SerialBuffer *inOriFrame)
813 {
814     if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
815         return;
816     }
817     int errCode = E_OK;
818     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
819     if (message == nullptr) {
820         if (errCode == -E_VERSION_NOT_SUPPORT) {
821             TriggerVersionNegotiation(dstTarget);
822         }
823         return;
824     }
825     // Message is release in TriggerCommunicatorNotFoundFeedback
826     TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
827 }
828 
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)829 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
830     const LabelType &dstLabel, Message* &oriMsg)
831 {
832     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
833         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
834         // Do not have to do feedback if the message is not a request type message
835         delete oriMsg;
836         oriMsg = nullptr;
837         return;
838     }
839 
840     LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
841     oriMsg->SetMessageType(TYPE_RESPONSE);
842     oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
843 
844     int errCode = E_OK;
845     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
846     delete oriMsg;
847     oriMsg = nullptr;
848     if (errCode != E_OK) {
849         LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
850         return;
851     }
852 
853     TaskConfig config{true, 0, Priority::HIGH};
854     errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
855     if (errCode != E_OK) {
856         LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
857         // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
858         delete buffer;
859         buffer = nullptr;
860     }
861 }
862 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)863 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
864 {
865     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
866     versionMap_[target] = version;
867 }
868 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)869 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
870 {
871     if (adapterHandle_ == nullptr) {
872         return nullptr;
873     }
874     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
875 }
876 
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)877 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
878 {
879     std::map<LabelType, bool> changedLabels;
880     for (const auto &dbInfo: dbInfos) {
881         std::string label = DBCommon::GenerateHashLabel(dbInfo);
882         LabelType labelType(label.begin(), label.end());
883         changedLabels[labelType] = dbInfo.isNeedSync;
884     }
885     if (commLinker_ != nullptr) {
886         commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
887     }
888     NotifyConnectChange(devInfo, changedLabels);
889 }
890 
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)891 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
892     const std::map<LabelType, bool> &changedLabels)
893 {
894     if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
895         LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
896         for (const auto &entry : changedLabels) {
897             LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
898         }
899         return;
900     }
901     // Do target change notify
902     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
903     for (auto &entry : changedLabels) {
904         // Ignore nonactivated communicator
905         if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
906             LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
907                 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
908             commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
909         }
910     }
911 }
912 
RegDBChangeCallback()913 void CommunicatorAggregator::RegDBChangeCallback()
914 {
915     if (dbStatusAdapter_ != nullptr) {
916         dbStatusAdapter_->SetDBStatusChangeCallback(
917             [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
918                 OnRemoteDBStatusChange(devInfo, dbInfos);
919             },
920             [this]() {
921                 if (commLinker_ != nullptr) {
922                     (void)commLinker_->TriggerLabelExchangeEvent(false);
923                 }
924             },
925             [this](const std::string &dev) {
926                 if (commLinker_ != nullptr) {
927                     std::set<LabelType> relatedLabels;
928                     (void)commLinker_->TargetOnline(dev, relatedLabels);
929                 }
930             });
931     }
932 }
InitSendThread()933 void CommunicatorAggregator::InitSendThread()
934 {
935     if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
936         return;
937     }
938     exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
939     useExclusiveThread_ = true;
940 }
941 
SendOnceData()942 void CommunicatorAggregator::SendOnceData()
943 {
944     SendTask taskToSend;
945     uint32_t totalLength = 0;
946     int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
947     if (errCode != E_OK) {
948         return; // Not possible to happen
949     }
950     // <vector, extendHeadSize>
951     std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
952     uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
953     errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
954     if (errCode != E_OK) {
955         LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
956         TaskFinalizer(taskToSend, errCode);
957         return;
958     }
959     // <addr, <extendHeadSize, totalLen>>
960     std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
961     if (piecePackets.empty()) {
962         // Case that no need to split a frame, just use original buffer as a packet
963         std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
964         std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
965         entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
966         entry.second.first = taskToSend.buffer->GetExtendHeadLength();
967         entry.second.second = tmpEntry.second + entry.second.first;
968         eachPacket.push_back(entry);
969     } else {
970         for (auto &entry : piecePackets) {
971             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
972                 {entry.second, entry.first.size()}};
973             eachPacket.push_back(tmpEntry);
974         }
975     }
976 
977     SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
978 }
979 
TriggerSendData()980 void CommunicatorAggregator::TriggerSendData()
981 {
982     if (useExclusiveThread_) {
983         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
984         wakingSignal_ = true;
985         wakingCv_.notify_one();
986         return;
987     }
988     {
989         std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
990         if (sendTaskStart_) {
991             return;
992         }
993         sendTaskStart_ = true;
994     }
995     RefObject::IncObjRef(this);
996     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
997         LOGI("[CommAggr] Send thread start.");
998         while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
999             SendOnceData();
1000         }
1001         {
1002             std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1003             sendTaskStart_ = false;
1004         }
1005         if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1006             TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1007         }
1008         finalizeCv_.notify_one();
1009         RefObject::DecObjRef(this);
1010         LOGI("[CommAggr] Send thread end.");
1011     });
1012     if (errCode != E_OK) {
1013         LOGW("[CommAggr] Trigger send data failed %d", errCode);
1014         RefObject::DecObjRef(this);
1015     }
1016 }
1017 
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1018 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1019 {
1020     std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1021     if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1022         sendRecord_[frameId].splitMtu = mtu;
1023         sendRecord_[frameId].sendIndex = 0u;
1024     }
1025 }
1026 
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1027 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1028 {
1029     if (IsRetryOutOfLimit(target)) {
1030         LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1031         scheduler_.InvalidSendTask(target);
1032         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1033         retryCount_[target] = 0;
1034     } else {
1035         if (sendSequenceId != GetSendSequenceId(target)) {
1036             LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1037             return;
1038         }
1039         scheduler_.DelayTaskByTarget(target);
1040         RetrySendTask(target, sendSequenceId);
1041     }
1042 }
1043 
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1044 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1045 {
1046     int32_t currentRetryCount = 0;
1047     {
1048         std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1049         retryCount_[target]++;
1050         currentRetryCount = retryCount_[target];
1051         LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1052     }
1053     TimerId timerId = 0u;
1054     RefObject::IncObjRef(this);
1055     (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1056         [this, target, sendSequenceId](TimerId id) {
1057         if (sendSequenceId == GetSendSequenceId(target)) {
1058             OnSendable(target);
1059         } else {
1060             LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1061         }
1062         RefObject::DecObjRef(this);
1063         return -E_END_TIMER;
1064     }, nullptr, timerId);
1065 }
1066 
IsRetryOutOfLimit(const std::string & target)1067 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1068 {
1069     std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1070     return retryCount_[target] >= MAX_SEND_RETRY;
1071 }
1072 
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1073 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1074 {
1075     uint32_t timeout = DBConstant::MIN_TIMEOUT;
1076     if (adapterHandle_ != nullptr) {
1077         timeout = adapterHandle_->GetTimeout(target);
1078     }
1079     return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1080 }
1081 
GetSendSequenceId(const std::string & target)1082 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1083 {
1084     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1085     return sendSequence_[target];
1086 }
1087 
IncreaseSendSequenceId(const std::string & target)1088 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1089 {
1090     std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1091     return ++sendSequence_[target];
1092 }
1093 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1094 } // namespace DistributedDB
1095