1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_linker.h"
17 
18 #include <utility>
19 #include "communicator_aggregator.h"
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "hash.h"
23 #include "log_print.h"
24 #include "platform_specific.h"
25 #include "protocol_proto.h"
26 
27 namespace DistributedDB {
28 namespace {
29 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
30 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
31 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
32 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
33 }
34 
CommunicatorLinker(CommunicatorAggregator * inAggregator,std::shared_ptr<DBStatusAdapter> statusAdapter)35 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator,
36     std::shared_ptr<DBStatusAdapter> statusAdapter)
37     : incSequenceId_(0), incAckTriggerId_(0)
38 {
39     aggregator_ = inAggregator;
40     statusAdapter_ = std::move(statusAdapter);
41     RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
42 }
43 
~CommunicatorLinker()44 CommunicatorLinker::~CommunicatorLinker()
45 {
46     RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
47     aggregator_ = nullptr;
48     statusAdapter_ = nullptr;
49 }
50 
Initialize()51 void CommunicatorLinker::Initialize()
52 {
53     uint64_t curTime = 0;
54     int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
55     if (errCode != E_OK) {
56         LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
57     }
58     std::string curTimeStr = std::to_string(curTime);
59     localDistinctValue_ = Hash::HashFunc(curTimeStr);
60     LOGI("[Linker][Init] curTime=%" PRIu64 ", distinct=%" PRIu64 ".", ULL(curTime), ULL(localDistinctValue_));
61 }
62 
63 // Create async task to send out label_exchange and waiting for label_exchange_ack.
64 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)65 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
66 {
67     {
68         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
69         // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
70         // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
71         // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
72         if (remoteOnlineTarget_.count(inTarget) == 0) {
73             outRelatedLabels = targetMapOnlineLabels_[inTarget];
74             remoteOnlineTarget_.insert(inTarget);
75         }
76     }
77     return TriggerLabelExchangeEvent(inTarget);
78 }
79 
80 // Clear all labels related to this target. Let no longer waiting for ack of this target.
81 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)82 void CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
83 {
84     if (statusAdapter_ != nullptr) {
85         statusAdapter_->TargetOffline(inTarget);
86     }
87     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
88     outRelatedLabels = targetMapOnlineLabels_[inTarget];
89     // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
90     remoteOnlineTarget_.erase(inTarget);
91     // Note: The process of remote target may quit, when remote target restart,
92     // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
93     targetDistinctValue_.erase(inTarget);
94     topRecvLabelSeq_.erase(inTarget);
95     if (statusAdapter_ != nullptr && statusAdapter_->IsSupport(inTarget)) {
96         targetMapOnlineLabels_.erase(inTarget);
97     }
98 }
99 
100 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
101 // If waiting timeout, pass the send&wait task to overrall timing retry task.
102 // Find out targets for this label that is already online.
103 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)104 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
105 {
106     {
107         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
108         localOnlineLabels_.insert(inLabel);
109         for (auto &entry : targetMapOnlineLabels_) {
110             if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
111                 continue;
112             }
113             if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
114                 outOnlineTarget.insert(entry.first);
115             }
116         }
117     }
118     return TriggerLabelExchangeEvent() ? -E_INTERNAL_ERROR : E_OK;
119 }
120 
121 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
122 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)123 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
124 {
125     {
126         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
127         localOnlineLabels_.erase(inLabel);
128     }
129     return TriggerLabelExchangeEvent() ? -E_INTERNAL_ERROR : E_OK;
130 }
131 
132 // Compare the latest labels with previous Label, find out label changes.
133 // The caller should notify the target changes according to label changes.
134 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)135 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
136     uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
137 {
138     {
139         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
140         DetectDistinctValueChange(inTarget, inDistinctValue);
141         if (topRecvLabelSeq_.count(inTarget) == 0) {
142             // Firstly receive LabelExchange from this target
143             topRecvLabelSeq_[inTarget] = inSequenceId;
144         } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
145             // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
146             // may be lost, this target resend LabelExchange, and we should resend ack to this target
147             LOGW("[Linker][RecvLabel] inSequenceId=%" PRIu64 " smaller than topRecvLabelSeq=%" PRIu64
148                 ". Frame Ignored.", ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
149             return -E_OUT_OF_DATE;
150         } else {
151             // Update top sequenceId of received LabelExchange
152             topRecvLabelSeq_[inTarget] = inSequenceId;
153         }
154         // Find out online labels by check difference
155         for (auto &entry : inLatestLabels) {
156             if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
157                 outChangeLabels[entry] = true;
158             }
159         }
160         // Find out offline labels by check difference
161         for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
162             if (inLatestLabels.count(entry) == 0) {
163                 outChangeLabels[entry] = false;
164             }
165         }
166         // Update target online labels
167         targetMapOnlineLabels_[inTarget] = inLatestLabels;
168     }
169     // Trigger sending ack
170     int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
171     if (errCode != E_OK) {
172         LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
173         // Do not return error here
174     }
175     return E_OK;
176 }
177 
178 // Waiting finish if the ack is what linker wait by check inSequenceId
179 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)180 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
181     uint64_t inSequenceId)
182 {
183     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
184     DetectDistinctValueChange(inTarget, inDistinctValue);
185     // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
186     // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
187     if (waitAckSeq_.count(inTarget) == 0) {
188         LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%" PRIu64 "", ULL(inSequenceId));
189         return -E_NOT_FOUND;
190     }
191     if (waitAckSeq_[inTarget] < inSequenceId) {
192         LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%" PRIu64 ", waitAckSeq_=%" PRIu64,
193             ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
194         return -E_NOT_FOUND;
195     }
196     // An valid ack received
197     if (recvAckSeq_.count(inTarget) == 0) {
198         // Firstly receive LabelExchangeAck from this target
199         recvAckSeq_[inTarget] = inSequenceId;
200     } else if (inSequenceId <= recvAckSeq_[inTarget]) {
201         LOGW("[Linker][RecvAck] inSequenceId=%" PRIu64 " not greater than recvAckSeq_=%" PRIu64 ". Frame Ignored.",
202             ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
203         return -E_OUT_OF_DATE;
204     } else {
205         // Update top sequenceId of received LabelExchangeAck
206         recvAckSeq_[inTarget] = inSequenceId;
207     }
208     return E_OK;
209 }
210 
GetOnlineRemoteTarget() const211 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
212 {
213     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
214     return remoteOnlineTarget_;
215 }
216 
IsRemoteTargetOnline(const std::string & inTarget) const217 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
218 {
219     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
220     if (remoteOnlineTarget_.count(inTarget) != 0) {
221         return true;
222     }
223     return false;
224 }
225 
226 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)227 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
228 {
229     TimerId thisTimerId = 0;
230     RuntimeContext *context = RuntimeContext::GetInstance();
231     int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
232         // Note: inAction should be captured by value (must not by reference)
233         inAction();
234         return -E_END_TIMER;
235     }, nullptr, thisTimerId);
236     if (errCode == E_OK) {
237         return;
238     }
239     std::thread timerThread([inAction, inCountDown]() {
240         // Note: inAction and inCountDown should be captured by value (must not by reference)
241         std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
242         inAction();
243     });
244     timerThread.detach();
245 }
246 
247 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)248 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
249 {
250     // Firstly received distinctValue from this target ever or after offline
251     if (targetDistinctValue_.count(inTarget) == 0) {
252         targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
253         return;
254     }
255 
256     // DistinctValue is the same as before
257     if (targetDistinctValue_[inTarget] == inDistinctValue) {
258         return;
259     }
260 
261     // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
262     LOGE("[Linker][Detect] DISTINCT VALUE CHANGE DETECTED : %" PRIu64 " VS %" PRIu64 "",
263         ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
264     targetDistinctValue_[inTarget] = inDistinctValue;
265     // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
266     topRecvLabelSeq_.erase(inTarget);
267     RefObject::IncObjRef(this);
268     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
269         LOGD("ReTrigger label exchange because remote process restarted!");
270         this->TriggerLabelExchangeEvent(inTarget);
271         RefObject::DecObjRef(this);
272     });
273     if (errCode != E_OK) {
274         LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
275         RefObject::DecObjRef(this);
276     }
277 }
278 
TriggerLabelExchangeEvent(const std::string & toTarget)279 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
280 {
281     if (statusAdapter_ != nullptr && statusAdapter_->IsSupport(toTarget)) {
282         return E_OK;
283     }
284     // Apply for a latest sequenceId
285     uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
286     // Get a snapshot of current online labels
287     std::set<LabelType> onlineLabels;
288     std::vector<DBInfo> dbInfos;
289     if (statusAdapter_ != nullptr && statusAdapter_->GetLocalDBInfos(dbInfos) == E_OK) {
290         for (const auto &dbInfo: dbInfos) {
291             std::string label = DBCommon::GenerateHashLabel(dbInfo);
292             LabelType labelType(label.begin(), label.end());
293             onlineLabels.insert(labelType);
294         }
295     } else {
296         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
297         onlineLabels = localOnlineLabels_;
298     }
299     // Build LabelExchange Frame
300     int error = E_OK;
301     SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
302     if (error != E_OK) {
303         LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
304         return error;
305     }
306     // Update waitAckSeq, Check whether new event be triggered in other thread
307     {
308         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
309         if (waitAckSeq_.count(toTarget) == 0) {
310             // Firstly send LabelExchange to this target
311             waitAckSeq_[toTarget] = sequenceId;
312         } else if (waitAckSeq_[toTarget] > sequenceId) {
313             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
314             LOGI("[Linker][TriggerLabel] Detect newSeqId=%" PRIu64 " than thisSeqId=%" PRIu64
315                 " be triggered for target=%s{private}", ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
316             delete buffer;
317             buffer = nullptr;
318             return E_OK;
319         } else {
320             waitAckSeq_[toTarget] = sequenceId;
321         }
322     }
323     // Synchronously call SendLabelExchange and hand over buffer to it
324     RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
325     SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
326     return E_OK;
327 }
328 
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)329 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
330 {
331     // Build LabelExchangeAck Frame
332     int errCode = E_OK;
333     SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
334     if (errCode != E_OK) {
335         LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
336         return errCode;
337     }
338     // Apply for a latest ackId and update ackTriggerId_
339     uint64_t ackId;
340     {
341         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
342         ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
343         ackTriggerId_[toTarget] = ackId;
344     }
345     // Synchronously call SendLabelExchangeAck and hand over buffer to it
346     RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
347     SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
348     return E_OK;
349 }
350 
351 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)352 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
353 {
354     if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
355         return TIME_LAPSE_FOR_WAITING_ACK;
356     }
357     uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
358     return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
359 }
360 }
361 
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)362 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
363     uint32_t inRetransmitCount)
364 {
365     // Check whether have the need to send
366     bool noNeedToSend = inRetransmitCount > RETRANSMIT_LIMIT;
367     {
368         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
369         if (remoteOnlineTarget_.count(toTarget) == 0) {
370             // Target offline
371             noNeedToSend = true;
372         }
373         if (waitAckSeq_[toTarget] > inSequenceId) {
374             // New LabelExchangeEvent had been trigger for this target, so this event can be abort
375             noNeedToSend = true;
376         }
377         if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
378             // Ack of this sequenceId had been received or even later ack had been received
379             noNeedToSend = true;
380         }
381         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
382             LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", waitAckSeq=%" PRIu64
383                 ", recvAckSeq=%" PRIu64 ",retrans=%u.", toTarget.c_str(), ULL(inSequenceId),
384                 ULL(waitAckSeq_[toTarget]), ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)),
385                 inRetransmitCount);
386         } // ~0 indicate no ack ever recv
387     }
388     if (noNeedToSend) {
389         delete inBuff;
390         inBuff = nullptr;
391         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
392         return;
393     }
394 
395     int error = E_OK;
396     SerialBuffer *cloneBuffer = inBuff->Clone(error);
397     TaskConfig config{true, 0, Priority::HIGH};
398     int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
399     if (errCode == E_OK) {
400         // Send ok, go on to wait ack, and maybe resend
401         if (error == E_OK) {
402             SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
403                 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
404                 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
405             }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
406         } else {
407             LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%" PRIu64 ".",
408             toTarget.c_str(), ULL(inSequenceId));
409         }
410     } else {
411         // Send fail, go on to retry send
412         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
413             // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
414             SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
415             }, TIME_LAPSE_FOR_RETRY_SEND);
416         if (error == E_OK) {
417             delete cloneBuffer;
418             cloneBuffer = nullptr;
419         }
420     }
421 }
422 
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)423 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
424     uint64_t inSequenceId, uint64_t inAckTriggerId)
425 {
426     // Check whether have the need to send
427     bool noNeedToSend = false;
428     {
429         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
430         // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
431         if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
432             // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
433             // New LabelExchange had been received for this target, so this event can be abort
434             noNeedToSend = true;
435         }
436         if (ackTriggerId_[toTarget] > inAckTriggerId) {
437             // New LabelExchangeAck had been trigger for this target, so this event can be abort
438             noNeedToSend = true;
439         }
440         if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
441             LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", topRecLabelSeq=%" PRIu64
442                 ", thisAckId=%" PRIu64 ",ackTriggerId=%" PRIu64 ".",
443                 toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
444                 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
445                 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
446         }
447     }
448     if (noNeedToSend) {
449         delete inBuff;
450         inBuff = nullptr;
451         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
452         return;
453     }
454 
455     TaskConfig config{true, 0, Priority::HIGH};
456     int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
457     if (errCode == E_OK) {
458         // Send ok, finish event
459         RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
460     } else {
461         // Send fail, go on to retry send
462         SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
463             // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
464             SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
465             }, TIME_LAPSE_FOR_RETRY_SEND);
466     }
467 }
468 
UpdateOnlineLabels(const std::string & device,const std::map<LabelType,bool> & labels)469 void CommunicatorLinker::UpdateOnlineLabels(const std::string &device, const std::map<LabelType, bool> &labels)
470 {
471     std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
472     for (const auto &[label, isOnline]: labels) {
473         if (isOnline) {
474             targetMapOnlineLabels_[device].insert(label);
475         } else {
476             targetMapOnlineLabels_[device].erase(label);
477         }
478     }
479 }
480 
TriggerLabelExchangeEvent(bool checkAdapter)481 bool CommunicatorLinker::TriggerLabelExchangeEvent(bool checkAdapter)
482 {
483     if (checkAdapter && statusAdapter_ != nullptr && !statusAdapter_->IsSendLabelExchange()) {
484         return false;
485     }
486     std::set<std::string> totalOnlineTargets;
487     {
488         std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
489         totalOnlineTargets = remoteOnlineTarget_;
490     }
491     bool everFail = false;
492     for (auto &entry : totalOnlineTargets) {
493         if (TriggerLabelExchangeEvent(entry) != E_OK) {
494             everFail = true;
495         }
496     }
497     return everFail;
498 }
499 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
500 } // namespace DistributedDB
501