1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33 std::stringstream stream;
34 stream << std::this_thread::get_id();
35 return stream.str();
36 }
37 }
38
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42 : shutdown_(false),
43 incFrameId_(0),
44 localSourceId_(0)
45 {
46 }
47
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51 adapterHandle_ = nullptr;
52 commLinker_ = nullptr;
53 }
54
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57 if (inAdapter == nullptr) {
58 return -E_INVALID_ARGS;
59 }
60 adapterHandle_ = inAdapter;
61
62 combiner_.Initialize();
63 retainer_.Initialize();
64 scheduler_.Initialize();
65
66 int errCode;
67 commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68 if (commLinker_ == nullptr) {
69 errCode = -E_OUT_OF_MEMORY;
70 goto ROLL_BACK;
71 }
72 commLinker_->Initialize();
73
74 errCode = RegCallbackToAdapter();
75 if (errCode != E_OK) {
76 goto ROLL_BACK;
77 }
78
79 errCode = adapterHandle_->StartAdapter();
80 if (errCode != E_OK) {
81 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82 goto ROLL_BACK;
83 }
84 GenerateLocalSourceId();
85
86 shutdown_ = false;
87 InitSendThread();
88 dbStatusAdapter_ = statusAdapter;
89 RegDBChangeCallback();
90 return E_OK;
91 ROLL_BACK:
92 UnRegCallbackFromAdapter();
93 if (commLinker_ != nullptr) {
94 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95 commLinker_ = nullptr;
96 }
97 // Scheduler do not need to do finalize in this roll_back
98 retainer_.Finalize();
99 combiner_.Finalize();
100 return errCode;
101 }
102
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105 shutdown_ = true;
106 retryCv_.notify_all();
107 {
108 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109 wakingSignal_ = true;
110 wakingCv_.notify_one();
111 }
112 if (useExclusiveThread_) {
113 exclusiveThread_.join(); // Waiting thread to thoroughly quit
114 LOGI("[CommAggr][Final] Sub Thread Exit.");
115 } else {
116 LOGI("[CommAggr][Final] Begin wait send task exit.");
117 std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118 finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119 return !sendTaskStart_;
120 });
121 LOGI("[CommAggr][Final] End wait send task exit.");
122 }
123 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124
125 adapterHandle_->StopAdapter();
126 UnRegCallbackFromAdapter();
127 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128
129 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131 commLinker_ = nullptr;
132 retainer_.Finalize();
133 combiner_.Finalize();
134 dbStatusAdapter_ = nullptr;
135 }
136
AllocCommunicator(uint64_t commLabel,int & outErrorNo)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
138 {
139 uint64_t netOrderLabel = HostToNet(commLabel);
140 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143 realLabel[i] = eachByte[i];
144 }
145 return AllocCommunicator(realLabel, outErrorNo);
146 }
147
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
149 {
150 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
151 LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
152 if (commLabel.size() != COMM_LABEL_LENGTH) {
153 outErrorNo = -E_INVALID_ARGS;
154 return nullptr;
155 }
156
157 if (commMap_.count(commLabel) != 0) {
158 outErrorNo = -E_ALREADY_ALLOC;
159 return nullptr;
160 }
161
162 Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
163 if (commPtr == nullptr) {
164 outErrorNo = -E_OUT_OF_MEMORY;
165 return nullptr;
166 }
167 commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
168 return commPtr;
169 }
170
ReleaseCommunicator(ICommunicator * inCommunicator)171 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
172 {
173 if (inCommunicator == nullptr) {
174 return;
175 }
176 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
177 LabelType commLabel = commPtr->GetCommunicatorLabel();
178 LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
179
180 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
181 if (commMap_.count(commLabel) == 0) {
182 LOGE("[CommAggr][Release] Not Found.");
183 return;
184 }
185 commMap_.erase(commLabel);
186 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
187
188 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
189 if (errCode != E_OK) {
190 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
191 }
192 }
193
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)194 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
195 const Finalizer &inOper)
196 {
197 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
198 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
199 }
200
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)201 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
202 {
203 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
204 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
205 if (onConnect && errCode == E_OK) {
206 // Register action and success
207 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
208 for (auto &entry : onlineTargets) {
209 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
210 onConnectHandle_(entry, true);
211 }
212 }
213 return errCode;
214 }
215
GetCommunicatorAggregatorMtuSize() const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
217 {
218 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
219 }
220
GetCommunicatorAggregatorMtuSize(const std::string & target) const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
222 {
223 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225
GetCommunicatorAggregatorTimeout() const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
227 {
228 return adapterHandle_->GetTimeout();
229 }
230
GetCommunicatorAggregatorTimeout(const std::string & target) const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
232 {
233 return adapterHandle_->GetTimeout(target);
234 }
235
IsDeviceOnline(const std::string & device) const236 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
237 {
238 return adapterHandle_->IsDeviceOnline(device);
239 }
240
GetLocalIdentity(std::string & outTarget) const241 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
242 {
243 return adapterHandle_->GetLocalIdentity(outTarget);
244 }
245
ActivateCommunicator(const LabelType & commLabel)246 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
247 {
248 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
249 LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
250 if (commMap_.count(commLabel) == 0) {
251 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
252 return;
253 }
254 if (commMap_.at(commLabel).second) {
255 return;
256 }
257 commMap_.at(commLabel).second = true; // Mark this communicator as activated
258
259 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
260 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
261 std::set<std::string> onlineTargets;
262 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
263 if (errCode != E_OK) {
264 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
265 // Do not return here
266 }
267 for (auto &entry : onlineTargets) {
268 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
269 commMap_.at(commLabel).first->OnConnectChange(entry, true);
270 }
271 // Do Redeliver, the communicator is responsible to deal with the frame
272 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
273 for (auto &entry : framesToRedeliver) {
274 commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
275 }
276 }
277
278 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)279 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
280 {
281 if (onEnd) { // LCOV_EXCL_BR_LINE
282 TaskAction onSendEndTask = [onEnd, result]() {
283 LOGD("[CommAggr][SendEndTask] Before On Send End.");
284 onEnd(result, true);
285 LOGD("[CommAggr][SendEndTask] After On Send End.");
286 };
287 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
288 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
289 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
290 }
291 }
292 }
293 }
294
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)295 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
296 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
297 {
298 if (inBuff == nullptr) {
299 return -E_INVALID_ARGS;
300 }
301
302 if (!ReGenerateLocalSourceIdIfNeed()) {
303 delete inBuff;
304 inBuff = nullptr;
305 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
306 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
307 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
308 }
309 bool sendLabelExchange = true;
310 if (dbStatusAdapter_ != nullptr) {
311 sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
312 }
313 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
314 sendLabelExchange};
315 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
316 if (errCode != E_OK) {
317 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
318 return errCode;
319 }
320 {
321 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
322 sendRecord_[info.frameId] = {};
323 }
324 SendTask task{inBuff, dstTarget, onEnd, info.frameId, true};
325 if (inConfig.nonBlock) {
326 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
327 } else {
328 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
329 }
330 if (errCode != E_OK) {
331 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
332 return errCode;
333 }
334 TriggerSendData();
335 LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
336 return E_OK;
337 }
338
EnableCommunicatorNotFoundFeedback(bool isEnable)339 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
340 {
341 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
342 }
343
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const344 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
345 {
346 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
347 auto pair = versionMap_.find(target);
348 if (pair == versionMap_.end()) { // LCOV_EXCL_BR_LINE
349 return -E_NOT_FOUND;
350 }
351 outVersion = pair->second;
352 return E_OK;
353 }
354
SendDataRoutine()355 void CommunicatorAggregator::SendDataRoutine()
356 {
357 while (!shutdown_) {
358 if (scheduler_.GetNoDelayTaskCount() == 0) {
359 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
360 LOGI("[CommAggr][Routine] Send done and sleep.");
361 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
362 LOGI("[CommAggr][Routine] Send continue.");
363 wakingSignal_ = false;
364 continue;
365 }
366 SendOnceData();
367 }
368 }
369
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)370 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
371 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
372 {
373 bool taskNeedFinalize = true;
374 int errCode = E_OK;
375 ResetFrameRecordIfNeed(inTask.frameId, mtu);
376 uint32_t startIndex;
377 {
378 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
379 startIndex = sendRecord_[inTask.frameId].sendIndex;
380 }
381 uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
382 for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
383 auto &entry = eachPacket[index];
384 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
385 ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
386 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
387 errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
388 {
389 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
390 sendRecord_[inTask.frameId].sendIndex = index;
391 }
392 if (errCode == -E_WAIT_RETRY) {
393 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
394 taskNeedFinalize = false;
395 break;
396 } else if (errCode != E_OK) {
397 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
398 break;
399 } else {
400 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
401 retryCount_[inTask.dstTarget] = 0;
402 }
403 }
404 if (errCode == -E_WAIT_RETRY) {
405 RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
406 }
407 if (taskNeedFinalize) {
408 TaskFinalizer(inTask, errCode);
409 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
410 sendRecord_.erase(inTask.frameId);
411 }
412 }
413
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)414 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
415 {
416 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
417 if (errCode != E_OK) {
418 bool notTimeout = true;
419 auto retryFunc = [this, inPrio, &inTask]()->bool {
420 if (this->shutdown_) {
421 delete inTask.buffer;
422 inTask.buffer = nullptr;
423 return true;
424 }
425 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
426 if (retCode != E_OK) {
427 return false;
428 }
429 return true;
430 };
431
432 if (timeout == 0) { // Unlimited retry
433 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
434 retryCv_.wait(retryUniqueLock, retryFunc);
435 } else {
436 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
437 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
438 }
439
440 if (shutdown_) {
441 return E_OK;
442 }
443 if (!notTimeout) {
444 return -E_TIMEOUT;
445 }
446 }
447 return E_OK;
448 }
449
TaskFinalizer(const SendTask & inTask,int result)450 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
451 {
452 // Call the OnSendEnd if need
453 if (inTask.onEnd) {
454 LOGD("[CommAggr][TaskFinal] On Send End.");
455 inTask.onEnd(result, true);
456 }
457 // Finalize the task that just scheduled
458 int errCode = scheduler_.FinalizeLastScheduleTask();
459 // Notify Sendable To All Communicator If Need
460 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
461 retryCv_.notify_all();
462 }
463 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
464 NotifySendableToAllCommunicator();
465 }
466 }
467
NotifySendableToAllCommunicator()468 void CommunicatorAggregator::NotifySendableToAllCommunicator()
469 {
470 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
471 for (auto &entry : commMap_) {
472 // Ignore nonactivated communicator
473 if (entry.second.second) {
474 entry.second.first->OnSendAvailable();
475 }
476 }
477 }
478
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)479 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
480 const std::string &userId)
481 {
482 ProtocolProto::DisplayPacketInformation(bytes, length);
483 ParseResult packetResult;
484 int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
485 if (errCode != E_OK) {
486 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
487 if (errCode == -E_VERSION_NOT_SUPPORT) {
488 TriggerVersionNegotiation(srcTarget);
489 }
490 return;
491 }
492
493 // Update version of remote target
494 SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
495 if (dbStatusAdapter_ != nullptr) {
496 dbStatusAdapter_->SetRemoteOptimizeCommunication(srcTarget, !packetResult.IsSendLabelExchange());
497 }
498 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
499 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
500 return;
501 }
502
503 if (packetResult.IsFragment()) {
504 OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
505 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
506 errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
507 if (errCode != E_OK) {
508 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
509 }
510 } else {
511 errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
512 if (errCode != E_OK) {
513 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
514 }
515 }
516 }
517
OnTargetChange(const std::string & target,bool isConnect)518 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
519 {
520 if (target.empty()) {
521 LOGE("[CommAggr][OnTarget] Target empty string.");
522 return;
523 }
524 // For process level target change
525 {
526 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
527 if (onConnectHandle_) {
528 onConnectHandle_(target, isConnect);
529 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
530 } else {
531 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
532 }
533 }
534 std::set<LabelType> relatedLabels;
535 // For communicator level target change
536 if (isConnect) {
537 int errCode = commLinker_->TargetOnline(target, relatedLabels);
538 if (errCode != E_OK) {
539 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
540 }
541 } else {
542 commLinker_->TargetOffline(target, relatedLabels);
543 }
544 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
545 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
546 for (auto &entry : commMap_) {
547 // Ignore nonactivated communicator
548 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
549 entry.second.first->OnConnectChange(target, isConnect);
550 }
551 }
552 }
553
OnSendable(const std::string & target)554 void CommunicatorAggregator::OnSendable(const std::string &target)
555 {
556 int errCode = scheduler_.NoDelayTaskByTarget(target);
557 if (errCode != E_OK) {
558 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
559 return;
560 }
561 TriggerSendData();
562 }
563
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)564 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
565 const ParseResult &inResult, const std::string &userId)
566 {
567 int errorNo = E_OK;
568 ParseResult frameResult;
569 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
570 if (errorNo != E_OK) {
571 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
572 return;
573 }
574 if (frameBuffer == nullptr) {
575 LOGW("[CommAggr][Receive] Combine undone.");
576 return;
577 }
578
579 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
580 if (errCode != E_OK) {
581 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
582 delete frameBuffer;
583 frameBuffer = nullptr;
584 if (errCode == -E_VERSION_NOT_SUPPORT) {
585 TriggerVersionNegotiation(srcTarget);
586 }
587 return;
588 }
589
590 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
591 errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
592 if (errCode != E_OK) {
593 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
594 }
595 delete frameBuffer;
596 frameBuffer = nullptr;
597 } else {
598 errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
599 if (errCode != E_OK) {
600 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
601 }
602 }
603 }
604
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)605 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
606 {
607 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
608 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
609 inResult.GetLabelExchangeSequenceId());
610 if (errCode != E_OK) {
611 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
612 return errCode;
613 }
614 } else {
615 std::map<LabelType, bool> changedLabels;
616 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
617 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
618 if (errCode != E_OK) {
619 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
620 return errCode;
621 }
622 NotifyConnectChange(srcTarget, changedLabels);
623 }
624 return E_OK;
625 }
626
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)627 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
628 uint32_t length, const ParseResult &inResult, const std::string &userId)
629 {
630 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
631 if (buffer == nullptr) {
632 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
633 return -E_OUT_OF_MEMORY;
634 }
635 int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
636 ProtocolProto::GetAppLayerFrameHeaderLength());
637 if (errCode != E_OK) {
638 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
639 delete buffer;
640 buffer = nullptr;
641 return -E_INTERNAL_ERROR;
642 }
643 return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
644 }
645
646 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
647 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
648 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
649 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
650 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
651 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
652 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
653 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
654 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
655 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
656 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
657 // in the same callback thread finally causing DeadLock on commMapMutex_.
658 // #### SO #### we have to make a change described below
659 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
660 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
661 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
662 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
663 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)664 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
665 const ParseResult &inResult, const std::string &userId)
666 {
667 LabelType toLabel = inResult.GetCommLabel();
668 {
669 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
670 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
671 if (errCode == E_OK) { // Attention: Here is equal to E_OK
672 return E_OK;
673 }
674 }
675 LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
676 int errCode = -E_NOT_FOUND;
677 {
678 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
679 if (onCommLackHandle_) {
680 errCode = onCommLackHandle_(toLabel, userId);
681 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
682 } else {
683 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
684 }
685 }
686 // Here we have to lock commMapMutex_ and search communicator again.
687 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
688 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
689 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
690 LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
691 return E_OK;
692 }
693 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
694 if (errCode != E_OK) {
695 TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
696 delete inFrameBuffer;
697 inFrameBuffer = nullptr;
698 return errCode; // The caller will display errCode in log
699 }
700 // Do Retention, the retainer is responsible to deal with the frame
701 retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
702 inFrameBuffer = nullptr;
703 return E_OK;
704 }
705
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)706 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
707 SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
708 {
709 // Ignore nonactivated communicator, which is regarded as inexistent
710 if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
711 commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
712 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
713 inFrameBuffer = nullptr;
714 return E_OK;
715 }
716 return -E_NOT_FOUND;
717 }
718
RegCallbackToAdapter()719 int CommunicatorAggregator::RegCallbackToAdapter()
720 {
721 RefObject::IncObjRef(this); // Reference to be hold by adapter
722 int errCode = adapterHandle_->RegBytesReceiveCallback(
723 [this](const std::string &srcTarget, const uint8_t *bytes, uint32_t length, const std::string &userId) {
724 OnBytesReceive(srcTarget, bytes, length, userId);
725 }, [this]() { RefObject::DecObjRef(this); });
726 if (errCode != E_OK) {
727 RefObject::DecObjRef(this); // Rollback in case reg failed
728 return errCode;
729 }
730
731 RefObject::IncObjRef(this); // Reference to be hold by adapter
732 errCode = adapterHandle_->RegTargetChangeCallback(
733 [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
734 [this]() { RefObject::DecObjRef(this); });
735 if (errCode != E_OK) {
736 RefObject::DecObjRef(this); // Rollback in case reg failed
737 return errCode;
738 }
739
740 RefObject::IncObjRef(this); // Reference to be hold by adapter
741 errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int softBusErrCode) {
742 LOGI("[CommAggr] Send able dev=%.3s, softBusErrCode=%d", target.c_str(), softBusErrCode);
743 if (softBusErrCode == E_OK) {
744 (void)IncreaseSendSequenceId(target);
745 OnSendable(target);
746 }
747 scheduler_.SetSoftBusErrCode(target, softBusErrCode);
748 },
749 [this]() { RefObject::DecObjRef(this); });
750 if (errCode != E_OK) {
751 RefObject::DecObjRef(this); // Rollback in case reg failed
752 return errCode;
753 }
754
755 return E_OK;
756 }
757
UnRegCallbackFromAdapter()758 void CommunicatorAggregator::UnRegCallbackFromAdapter()
759 {
760 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
761 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
762 adapterHandle_->RegSendableCallback(nullptr, nullptr);
763 if (dbStatusAdapter_ != nullptr) {
764 dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
765 }
766 }
767
GenerateLocalSourceId()768 void CommunicatorAggregator::GenerateLocalSourceId()
769 {
770 std::string identity;
771 adapterHandle_->GetLocalIdentity(identity);
772 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
773 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
774 uint64_t identityHash = Hash::HashFunc(identity);
775 if (identityHash != localSourceId_) {
776 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
777 }
778 localSourceId_ = identityHash;
779 }
780
ReGenerateLocalSourceIdIfNeed()781 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
782 {
783 // The deviceId will change when switch user from A to B
784 // We can't listen to the user change, because it's hard to ensure the timing is correct.
785 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
786 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
787 GenerateLocalSourceId();
788 return (localSourceId_ != 0);
789 }
790
TriggerVersionNegotiation(const std::string & dstTarget)791 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
792 {
793 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
794 int errCode = E_OK;
795 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
796 if (errCode != E_OK) {
797 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
798 return;
799 }
800
801 TaskConfig config{true, 0, Priority::HIGH};
802 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
803 if (errCode != E_OK) {
804 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
805 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
806 delete buffer;
807 buffer = nullptr;
808 }
809 }
810
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)811 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
812 const LabelType &dstLabel, const SerialBuffer *inOriFrame)
813 {
814 if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
815 return;
816 }
817 int errCode = E_OK;
818 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
819 if (message == nullptr) {
820 if (errCode == -E_VERSION_NOT_SUPPORT) {
821 TriggerVersionNegotiation(dstTarget);
822 }
823 return;
824 }
825 // Message is release in TriggerCommunicatorNotFoundFeedback
826 TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
827 }
828
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)829 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
830 const LabelType &dstLabel, Message* &oriMsg)
831 {
832 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
833 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
834 // Do not have to do feedback if the message is not a request type message
835 delete oriMsg;
836 oriMsg = nullptr;
837 return;
838 }
839
840 LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
841 oriMsg->SetMessageType(TYPE_RESPONSE);
842 oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
843
844 int errCode = E_OK;
845 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
846 delete oriMsg;
847 oriMsg = nullptr;
848 if (errCode != E_OK) {
849 LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
850 return;
851 }
852
853 TaskConfig config{true, 0, Priority::HIGH};
854 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
855 if (errCode != E_OK) {
856 LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
857 // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
858 delete buffer;
859 buffer = nullptr;
860 }
861 }
862
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)863 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
864 {
865 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
866 versionMap_[target] = version;
867 }
868
GetExtendHeaderHandle(const ExtendInfo & paramInfo)869 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
870 {
871 if (adapterHandle_ == nullptr) {
872 return nullptr;
873 }
874 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
875 }
876
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)877 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
878 {
879 std::map<LabelType, bool> changedLabels;
880 for (const auto &dbInfo: dbInfos) {
881 std::string label = DBCommon::GenerateHashLabel(dbInfo);
882 LabelType labelType(label.begin(), label.end());
883 changedLabels[labelType] = dbInfo.isNeedSync;
884 }
885 if (commLinker_ != nullptr) {
886 commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
887 }
888 NotifyConnectChange(devInfo, changedLabels);
889 }
890
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)891 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
892 const std::map<LabelType, bool> &changedLabels)
893 {
894 if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
895 LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
896 for (const auto &entry : changedLabels) {
897 LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
898 }
899 return;
900 }
901 // Do target change notify
902 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
903 for (auto &entry : changedLabels) {
904 // Ignore nonactivated communicator
905 if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
906 LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
907 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
908 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
909 }
910 }
911 }
912
RegDBChangeCallback()913 void CommunicatorAggregator::RegDBChangeCallback()
914 {
915 if (dbStatusAdapter_ != nullptr) {
916 dbStatusAdapter_->SetDBStatusChangeCallback(
917 [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
918 OnRemoteDBStatusChange(devInfo, dbInfos);
919 },
920 [this]() {
921 if (commLinker_ != nullptr) {
922 (void)commLinker_->TriggerLabelExchangeEvent(false);
923 }
924 },
925 [this](const std::string &dev) {
926 if (commLinker_ != nullptr) {
927 std::set<LabelType> relatedLabels;
928 (void)commLinker_->TargetOnline(dev, relatedLabels);
929 }
930 });
931 }
932 }
InitSendThread()933 void CommunicatorAggregator::InitSendThread()
934 {
935 if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
936 return;
937 }
938 exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
939 useExclusiveThread_ = true;
940 }
941
SendOnceData()942 void CommunicatorAggregator::SendOnceData()
943 {
944 SendTask taskToSend;
945 uint32_t totalLength = 0;
946 int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
947 if (errCode != E_OK) {
948 return; // Not possible to happen
949 }
950 // <vector, extendHeadSize>
951 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
952 uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
953 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
954 if (errCode != E_OK) {
955 LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
956 TaskFinalizer(taskToSend, errCode);
957 return;
958 }
959 // <addr, <extendHeadSize, totalLen>>
960 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
961 if (piecePackets.empty()) {
962 // Case that no need to split a frame, just use original buffer as a packet
963 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
964 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
965 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
966 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
967 entry.second.second = tmpEntry.second + entry.second.first;
968 eachPacket.push_back(entry);
969 } else {
970 for (auto &entry : piecePackets) {
971 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
972 {entry.second, entry.first.size()}};
973 eachPacket.push_back(tmpEntry);
974 }
975 }
976
977 SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
978 }
979
TriggerSendData()980 void CommunicatorAggregator::TriggerSendData()
981 {
982 if (useExclusiveThread_) {
983 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
984 wakingSignal_ = true;
985 wakingCv_.notify_one();
986 return;
987 }
988 {
989 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
990 if (sendTaskStart_) {
991 return;
992 }
993 sendTaskStart_ = true;
994 }
995 RefObject::IncObjRef(this);
996 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
997 LOGI("[CommAggr] Send thread start.");
998 while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
999 SendOnceData();
1000 }
1001 {
1002 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1003 sendTaskStart_ = false;
1004 }
1005 if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1006 TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1007 }
1008 finalizeCv_.notify_one();
1009 RefObject::DecObjRef(this);
1010 LOGI("[CommAggr] Send thread end.");
1011 });
1012 if (errCode != E_OK) {
1013 LOGW("[CommAggr] Trigger send data failed %d", errCode);
1014 RefObject::DecObjRef(this);
1015 }
1016 }
1017
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1018 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1019 {
1020 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1021 if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1022 sendRecord_[frameId].splitMtu = mtu;
1023 sendRecord_[frameId].sendIndex = 0u;
1024 }
1025 }
1026
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1027 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1028 {
1029 if (IsRetryOutOfLimit(target)) {
1030 LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1031 scheduler_.InvalidSendTask(target);
1032 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1033 retryCount_[target] = 0;
1034 } else {
1035 if (sendSequenceId != GetSendSequenceId(target)) {
1036 LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1037 return;
1038 }
1039 scheduler_.DelayTaskByTarget(target);
1040 RetrySendTask(target, sendSequenceId);
1041 }
1042 }
1043
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1044 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1045 {
1046 int32_t currentRetryCount = 0;
1047 {
1048 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1049 retryCount_[target]++;
1050 currentRetryCount = retryCount_[target];
1051 LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1052 }
1053 TimerId timerId = 0u;
1054 RefObject::IncObjRef(this);
1055 (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1056 [this, target, sendSequenceId](TimerId id) {
1057 if (sendSequenceId == GetSendSequenceId(target)) {
1058 OnSendable(target);
1059 } else {
1060 LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1061 }
1062 RefObject::DecObjRef(this);
1063 return -E_END_TIMER;
1064 }, nullptr, timerId);
1065 }
1066
IsRetryOutOfLimit(const std::string & target)1067 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1068 {
1069 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1070 return retryCount_[target] >= MAX_SEND_RETRY;
1071 }
1072
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1073 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1074 {
1075 uint32_t timeout = DBConstant::MIN_TIMEOUT;
1076 if (adapterHandle_ != nullptr) {
1077 timeout = adapterHandle_->GetTimeout(target);
1078 }
1079 return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1080 }
1081
GetSendSequenceId(const std::string & target)1082 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1083 {
1084 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1085 return sendSequence_[target];
1086 }
1087
IncreaseSendSequenceId(const std::string & target)1088 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1089 {
1090 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1091 return ++sendSequence_[target];
1092 }
1093 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1094 } // namespace DistributedDB
1095