1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_linker.h"
17
18 #include <utility>
19 #include "communicator_aggregator.h"
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "hash.h"
23 #include "log_print.h"
24 #include "platform_specific.h"
25 #include "protocol_proto.h"
26
27 namespace DistributedDB {
28 namespace {
29 constexpr uint32_t TIME_LAPSE_FOR_WAITING_ACK = 5000; // 5s
30 constexpr uint32_t TIME_LAPSE_FOR_RETRY_SEND = 1000; // 1s
31 constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransmission if no ack received
32 constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval
33 }
34
CommunicatorLinker(CommunicatorAggregator * inAggregator,std::shared_ptr<DBStatusAdapter> statusAdapter)35 CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator,
36 std::shared_ptr<DBStatusAdapter> statusAdapter)
37 : incSequenceId_(0), incAckTriggerId_(0)
38 {
39 aggregator_ = inAggregator;
40 statusAdapter_ = std::move(statusAdapter);
41 RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator
42 }
43
~CommunicatorLinker()44 CommunicatorLinker::~CommunicatorLinker()
45 {
46 RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator
47 aggregator_ = nullptr;
48 statusAdapter_ = nullptr;
49 }
50
Initialize()51 void CommunicatorLinker::Initialize()
52 {
53 uint64_t curTime = 0;
54 int errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
55 if (errCode != E_OK) {
56 LOGW("[Linker][Init] Get systime fail, use default, errCode=%d.", errCode);
57 }
58 std::string curTimeStr = std::to_string(curTime);
59 localDistinctValue_ = Hash::HashFunc(curTimeStr);
60 LOGI("[Linker][Init] curTime=%" PRIu64 ", distinct=%" PRIu64 ".", ULL(curTime), ULL(localDistinctValue_));
61 }
62
63 // Create async task to send out label_exchange and waiting for label_exchange_ack.
64 // If waiting timeout, pass the send&wait task to overrall timing retry task.
TargetOnline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)65 int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
66 {
67 {
68 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
69 // if inTarget is offline before, use the remembered previous online labels to decide which communicator to be
70 // notified online. Such handling is in case for abnormal unilateral offline, which A and B is notified online
71 // mutually, then B is notified A offline and for a while B is notified A online again, but A feels no notify.
72 if (remoteOnlineTarget_.count(inTarget) == 0) {
73 outRelatedLabels = targetMapOnlineLabels_[inTarget];
74 remoteOnlineTarget_.insert(inTarget);
75 }
76 }
77 return TriggerLabelExchangeEvent(inTarget);
78 }
79
80 // Clear all labels related to this target. Let no longer waiting for ack of this target.
81 // The caller should notify all related communicator about this target offline.
TargetOffline(const std::string & inTarget,std::set<LabelType> & outRelatedLabels)82 void CommunicatorLinker::TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels)
83 {
84 if (statusAdapter_ != nullptr) {
85 statusAdapter_->TargetOffline(inTarget);
86 }
87 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
88 outRelatedLabels = targetMapOnlineLabels_[inTarget];
89 // Do not erase the Labels of inTarget from targetMapOnlineLabels_, remember it for using when TargetOnline
90 remoteOnlineTarget_.erase(inTarget);
91 // Note: The process of remote target may quit, when remote target restart,
92 // the distinctValue of this remote target may be changed, and the sequenceId may start from zero
93 targetDistinctValue_.erase(inTarget);
94 topRecvLabelSeq_.erase(inTarget);
95 if (statusAdapter_ != nullptr && statusAdapter_->IsSupport(inTarget)) {
96 targetMapOnlineLabels_.erase(inTarget);
97 }
98 }
99
100 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
101 // If waiting timeout, pass the send&wait task to overrall timing retry task.
102 // Find out targets for this label that is already online.
103 // The caller should notify communicator of this label about already online target.
IncreaseLocalLabel(const LabelType & inLabel,std::set<std::string> & outOnlineTarget)104 int CommunicatorLinker::IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget)
105 {
106 {
107 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
108 localOnlineLabels_.insert(inLabel);
109 for (auto &entry : targetMapOnlineLabels_) {
110 if (remoteOnlineTarget_.count(entry.first) == 0) { // Ignore offline target
111 continue;
112 }
113 if (entry.second.count(inLabel) != 0) { // This online target had opened then same Label
114 outOnlineTarget.insert(entry.first);
115 }
116 }
117 }
118 return TriggerLabelExchangeEvent() ? -E_INTERNAL_ERROR : E_OK;
119 }
120
121 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack.
122 // If waiting timeout, pass the send&wait task to overrall timing retry task.
DecreaseLocalLabel(const LabelType & inLabel)123 int CommunicatorLinker::DecreaseLocalLabel(const LabelType &inLabel)
124 {
125 {
126 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
127 localOnlineLabels_.erase(inLabel);
128 }
129 return TriggerLabelExchangeEvent() ? -E_INTERNAL_ERROR : E_OK;
130 }
131
132 // Compare the latest labels with previous Label, find out label changes.
133 // The caller should notify the target changes according to label changes.
134 // Update the online labels of this target. Send out label_exchange_ack.
ReceiveLabelExchange(const std::string & inTarget,const std::set<LabelType> & inLatestLabels,uint64_t inDistinctValue,uint64_t inSequenceId,std::map<LabelType,bool> & outChangeLabels)135 int CommunicatorLinker::ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels,
136 uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels)
137 {
138 {
139 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
140 DetectDistinctValueChange(inTarget, inDistinctValue);
141 if (topRecvLabelSeq_.count(inTarget) == 0) {
142 // Firstly receive LabelExchange from this target
143 topRecvLabelSeq_[inTarget] = inSequenceId;
144 } else if (inSequenceId < topRecvLabelSeq_[inTarget]) {
145 // inSequenceId can be equal to topRecvLabelSeq, in this case, the ack of this sequence send to this target
146 // may be lost, this target resend LabelExchange, and we should resend ack to this target
147 LOGW("[Linker][RecvLabel] inSequenceId=%" PRIu64 " smaller than topRecvLabelSeq=%" PRIu64
148 ". Frame Ignored.", ULL(inSequenceId), ULL(topRecvLabelSeq_[inTarget]));
149 return -E_OUT_OF_DATE;
150 } else {
151 // Update top sequenceId of received LabelExchange
152 topRecvLabelSeq_[inTarget] = inSequenceId;
153 }
154 // Find out online labels by check difference
155 for (auto &entry : inLatestLabels) {
156 if (targetMapOnlineLabels_[inTarget].count(entry) == 0) {
157 outChangeLabels[entry] = true;
158 }
159 }
160 // Find out offline labels by check difference
161 for (const auto &entry : targetMapOnlineLabels_[inTarget]) {
162 if (inLatestLabels.count(entry) == 0) {
163 outChangeLabels[entry] = false;
164 }
165 }
166 // Update target online labels
167 targetMapOnlineLabels_[inTarget] = inLatestLabels;
168 }
169 // Trigger sending ack
170 int errCode = TriggerLabelExchangeAckEvent(inTarget, inSequenceId);
171 if (errCode != E_OK) {
172 LOGE("[Linker][RecvLabel] TriggerAckEvent Fail, Just Log, errCode=%d.", errCode);
173 // Do not return error here
174 }
175 return E_OK;
176 }
177
178 // Waiting finish if the ack is what linker wait by check inSequenceId
179 // Similarly, stop the retry task of this Target.
ReceiveLabelExchangeAck(const std::string & inTarget,uint64_t inDistinctValue,uint64_t inSequenceId)180 int CommunicatorLinker::ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue,
181 uint64_t inSequenceId)
182 {
183 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
184 DetectDistinctValueChange(inTarget, inDistinctValue);
185 // This two judge is for detecting case that local device process restart so incSequenceId_ restart from 0
186 // The remote device may send an ack cause by previous process, which may destroy the functionality of this process
187 if (waitAckSeq_.count(inTarget) == 0) {
188 LOGW("[Linker][RecvAck] Not waiting any ack now, inSequenceId=%" PRIu64 "", ULL(inSequenceId));
189 return -E_NOT_FOUND;
190 }
191 if (waitAckSeq_[inTarget] < inSequenceId) {
192 LOGW("[Linker][RecvAck] Not waiting this ack now, inSequenceId=%" PRIu64 ", waitAckSeq_=%" PRIu64,
193 ULL(inSequenceId), ULL(waitAckSeq_[inTarget]));
194 return -E_NOT_FOUND;
195 }
196 // An valid ack received
197 if (recvAckSeq_.count(inTarget) == 0) {
198 // Firstly receive LabelExchangeAck from this target
199 recvAckSeq_[inTarget] = inSequenceId;
200 } else if (inSequenceId <= recvAckSeq_[inTarget]) {
201 LOGW("[Linker][RecvAck] inSequenceId=%" PRIu64 " not greater than recvAckSeq_=%" PRIu64 ". Frame Ignored.",
202 ULL(inSequenceId), ULL(recvAckSeq_[inTarget]));
203 return -E_OUT_OF_DATE;
204 } else {
205 // Update top sequenceId of received LabelExchangeAck
206 recvAckSeq_[inTarget] = inSequenceId;
207 }
208 return E_OK;
209 }
210
GetOnlineRemoteTarget() const211 std::set<std::string> CommunicatorLinker::GetOnlineRemoteTarget() const
212 {
213 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
214 return remoteOnlineTarget_;
215 }
216
IsRemoteTargetOnline(const std::string & inTarget) const217 bool CommunicatorLinker::IsRemoteTargetOnline(const std::string &inTarget) const
218 {
219 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
220 if (remoteOnlineTarget_.count(inTarget) != 0) {
221 return true;
222 }
223 return false;
224 }
225
226 // inCountDown is in millisecond
SuspendByOnceTimer(const std::function<void (void)> & inAction,uint32_t inCountDown)227 void CommunicatorLinker::SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown)
228 {
229 TimerId thisTimerId = 0;
230 RuntimeContext *context = RuntimeContext::GetInstance();
231 int errCode = context->SetTimer(static_cast<int>(inCountDown), [inAction](TimerId inTimerId)->int{
232 // Note: inAction should be captured by value (must not by reference)
233 inAction();
234 return -E_END_TIMER;
235 }, nullptr, thisTimerId);
236 if (errCode == E_OK) {
237 return;
238 }
239 std::thread timerThread([inAction, inCountDown]() {
240 // Note: inAction and inCountDown should be captured by value (must not by reference)
241 std::this_thread::sleep_for(std::chrono::milliseconds(inCountDown));
242 inAction();
243 });
244 timerThread.detach();
245 }
246
247 // This function should be called under protection of entireInfoMutex_
DetectDistinctValueChange(const std::string & inTarget,uint64_t inDistinctValue)248 void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue)
249 {
250 // Firstly received distinctValue from this target ever or after offline
251 if (targetDistinctValue_.count(inTarget) == 0) {
252 targetDistinctValue_.try_emplace(inTarget, inDistinctValue);
253 return;
254 }
255
256 // DistinctValue is the same as before
257 if (targetDistinctValue_[inTarget] == inDistinctValue) {
258 return;
259 }
260
261 // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component.
262 LOGE("[Linker][Detect] DISTINCT VALUE CHANGE DETECTED : %" PRIu64 " VS %" PRIu64 "",
263 ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget]));
264 targetDistinctValue_[inTarget] = inDistinctValue;
265 // The process of remote target must have undergone a quit and restart, the remote sequenceId will start from zero.
266 topRecvLabelSeq_.erase(inTarget);
267 RefObject::IncObjRef(this);
268 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, inTarget]() {
269 LOGD("ReTrigger label exchange because remote process restarted!");
270 this->TriggerLabelExchangeEvent(inTarget);
271 RefObject::DecObjRef(this);
272 });
273 if (errCode != E_OK) {
274 LOGD("ReTrigger label exchange failed! errCode = %d", errCode);
275 RefObject::DecObjRef(this);
276 }
277 }
278
TriggerLabelExchangeEvent(const std::string & toTarget)279 int CommunicatorLinker::TriggerLabelExchangeEvent(const std::string &toTarget)
280 {
281 if (statusAdapter_ != nullptr && statusAdapter_->IsSupport(toTarget)) {
282 return E_OK;
283 }
284 // Apply for a latest sequenceId
285 uint64_t sequenceId = incSequenceId_.fetch_add(1, std::memory_order_seq_cst);
286 // Get a snapshot of current online labels
287 std::set<LabelType> onlineLabels;
288 std::vector<DBInfo> dbInfos;
289 if (statusAdapter_ != nullptr && statusAdapter_->GetLocalDBInfos(dbInfos) == E_OK) {
290 for (const auto &dbInfo: dbInfos) {
291 std::string label = DBCommon::GenerateHashLabel(dbInfo);
292 LabelType labelType(label.begin(), label.end());
293 onlineLabels.insert(labelType);
294 }
295 } else {
296 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
297 onlineLabels = localOnlineLabels_;
298 }
299 // Build LabelExchange Frame
300 int error = E_OK;
301 SerialBuffer *buffer = ProtocolProto::BuildLabelExchange(localDistinctValue_, sequenceId, onlineLabels, error);
302 if (error != E_OK) {
303 LOGE("[Linker][TriggerLabel] BuildLabel fail, error=%d", error);
304 return error;
305 }
306 // Update waitAckSeq, Check whether new event be triggered in other thread
307 {
308 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
309 if (waitAckSeq_.count(toTarget) == 0) {
310 // Firstly send LabelExchange to this target
311 waitAckSeq_[toTarget] = sequenceId;
312 } else if (waitAckSeq_[toTarget] > sequenceId) {
313 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
314 LOGI("[Linker][TriggerLabel] Detect newSeqId=%" PRIu64 " than thisSeqId=%" PRIu64
315 " be triggered for target=%s{private}", ULL(waitAckSeq_[toTarget]), ULL(sequenceId), toTarget.c_str());
316 delete buffer;
317 buffer = nullptr;
318 return E_OK;
319 } else {
320 waitAckSeq_[toTarget] = sequenceId;
321 }
322 }
323 // Synchronously call SendLabelExchange and hand over buffer to it
324 RefObject::IncObjRef(this); // SendLabelExchange will only DecRef when total done if no need to send
325 SendLabelExchange(toTarget, buffer, sequenceId, 0); // Initially retransmitCount is 0
326 return E_OK;
327 }
328
TriggerLabelExchangeAckEvent(const std::string & toTarget,uint64_t inSequenceId)329 int CommunicatorLinker::TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId)
330 {
331 // Build LabelExchangeAck Frame
332 int errCode = E_OK;
333 SerialBuffer *buffer = ProtocolProto::BuildLabelExchangeAck(localDistinctValue_, inSequenceId, errCode);
334 if (errCode != E_OK) {
335 LOGE("[Linker][TriggerAck] BuildAck fail, error=%d", errCode);
336 return errCode;
337 }
338 // Apply for a latest ackId and update ackTriggerId_
339 uint64_t ackId;
340 {
341 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
342 ackId = incAckTriggerId_.fetch_add(1, std::memory_order_seq_cst);
343 ackTriggerId_[toTarget] = ackId;
344 }
345 // Synchronously call SendLabelExchangeAck and hand over buffer to it
346 RefObject::IncObjRef(this); // SendLabelExchangeAck will only DecRef when total done if no need to send
347 SendLabelExchangeAck(toTarget, buffer, inSequenceId, ackId);
348 return E_OK;
349 }
350
351 namespace {
GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)352 inline uint32_t GetDynamicTimeLapseForWaitingAck(uint32_t inRetransmitCount)
353 {
354 if (inRetransmitCount <= RETRANSMIT_LIMIT_EQUAL_INTERVAL) {
355 return TIME_LAPSE_FOR_WAITING_ACK;
356 }
357 uint32_t subsequentRetransmit = inRetransmitCount - RETRANSMIT_LIMIT_EQUAL_INTERVAL;
358 return subsequentRetransmit * subsequentRetransmit * TIME_LAPSE_FOR_WAITING_ACK;
359 }
360 }
361
SendLabelExchange(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint32_t inRetransmitCount)362 void CommunicatorLinker::SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId,
363 uint32_t inRetransmitCount)
364 {
365 // Check whether have the need to send
366 bool noNeedToSend = inRetransmitCount > RETRANSMIT_LIMIT;
367 {
368 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
369 if (remoteOnlineTarget_.count(toTarget) == 0) {
370 // Target offline
371 noNeedToSend = true;
372 }
373 if (waitAckSeq_[toTarget] > inSequenceId) {
374 // New LabelExchangeEvent had been trigger for this target, so this event can be abort
375 noNeedToSend = true;
376 }
377 if (recvAckSeq_.count(toTarget) != 0 && recvAckSeq_[toTarget] >= inSequenceId) {
378 // Ack of this sequenceId had been received or even later ack had been received
379 noNeedToSend = true;
380 }
381 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
382 LOGI("[Linker][SendLabel] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", waitAckSeq=%" PRIu64
383 ", recvAckSeq=%" PRIu64 ",retrans=%u.", toTarget.c_str(), ULL(inSequenceId),
384 ULL(waitAckSeq_[toTarget]), ULL((recvAckSeq_.count(toTarget) != 0) ? recvAckSeq_[toTarget] : ~ULL(0)),
385 inRetransmitCount);
386 } // ~0 indicate no ack ever recv
387 }
388 if (noNeedToSend) {
389 delete inBuff;
390 inBuff = nullptr;
391 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
392 return;
393 }
394
395 int error = E_OK;
396 SerialBuffer *cloneBuffer = inBuff->Clone(error);
397 TaskConfig config{true, 0, Priority::HIGH};
398 int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE, config);
399 if (errCode == E_OK) {
400 // Send ok, go on to wait ack, and maybe resend
401 if (error == E_OK) {
402 SuspendByOnceTimer([this, toTarget, cloneBuffer, inSequenceId, inRetransmitCount]() {
403 // Note: toTarget and cloneBuffer and inSequenceId should be captured by value (must not by reference)
404 SendLabelExchange(toTarget, cloneBuffer, inSequenceId, inRetransmitCount + 1); // Do retransmission
405 }, GetDynamicTimeLapseForWaitingAck(inRetransmitCount));
406 } else {
407 LOGE("[Linker][SendLabel] CloneFail: target=%s{private}, SeqId=%" PRIu64 ".",
408 toTarget.c_str(), ULL(inSequenceId));
409 }
410 } else {
411 // Send fail, go on to retry send
412 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inRetransmitCount]() {
413 // Note: toTarget and inBuff and inSequenceId should be captured by value (must not by reference)
414 SendLabelExchange(toTarget, inBuff, inSequenceId, inRetransmitCount); // Just do retry send
415 }, TIME_LAPSE_FOR_RETRY_SEND);
416 if (error == E_OK) {
417 delete cloneBuffer;
418 cloneBuffer = nullptr;
419 }
420 }
421 }
422
SendLabelExchangeAck(const std::string & toTarget,SerialBuffer * inBuff,uint64_t inSequenceId,uint64_t inAckTriggerId)423 void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff,
424 uint64_t inSequenceId, uint64_t inAckTriggerId)
425 {
426 // Check whether have the need to send
427 bool noNeedToSend = false;
428 {
429 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
430 // Now that LabelExchange is received, LabelExchangeAck should be send no matter target online or not
431 if (topRecvLabelSeq_.count(toTarget) != 0 && topRecvLabelSeq_[toTarget] > inSequenceId) {
432 // topRecvLabelSeq for this target may have been erased, detect it for avoid creating an entry
433 // New LabelExchange had been received for this target, so this event can be abort
434 noNeedToSend = true;
435 }
436 if (ackTriggerId_[toTarget] > inAckTriggerId) {
437 // New LabelExchangeAck had been trigger for this target, so this event can be abort
438 noNeedToSend = true;
439 }
440 if (noNeedToSend) { // ATTENTION: This Log should be inside the protection of entireInfoLockGuard!!!
441 LOGI("[Linker][SendAck] NoNeedSend:target=%s{private}, thisSeqId=%" PRIu64 ", topRecLabelSeq=%" PRIu64
442 ", thisAckId=%" PRIu64 ",ackTriggerId=%" PRIu64 ".",
443 toTarget.c_str(), ULL(inSequenceId), // ~0 indacate no label ever recv
444 ULL((topRecvLabelSeq_.count(toTarget) != 0) ? topRecvLabelSeq_[toTarget] : ~ULL(0)),
445 ULL(inAckTriggerId), ULL(ackTriggerId_[toTarget]));
446 }
447 }
448 if (noNeedToSend) {
449 delete inBuff;
450 inBuff = nullptr;
451 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
452 return;
453 }
454
455 TaskConfig config{true, 0, Priority::HIGH};
456 int errCode = aggregator_->ScheduleSendTask(toTarget, inBuff, FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK, config);
457 if (errCode == E_OK) {
458 // Send ok, finish event
459 RefObject::DecObjRef(this); // ATTENTION: The DecObjRef should be outside entireInfoLockGuard!!!
460 } else {
461 // Send fail, go on to retry send
462 SuspendByOnceTimer([this, toTarget, inBuff, inSequenceId, inAckTriggerId]() {
463 // Note: toTarget, inBuff, inSequenceId, inAckTriggerId should be captured by value (must not by reference)
464 SendLabelExchangeAck(toTarget, inBuff, inSequenceId, inAckTriggerId);
465 }, TIME_LAPSE_FOR_RETRY_SEND);
466 }
467 }
468
UpdateOnlineLabels(const std::string & device,const std::map<LabelType,bool> & labels)469 void CommunicatorLinker::UpdateOnlineLabels(const std::string &device, const std::map<LabelType, bool> &labels)
470 {
471 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
472 for (const auto &[label, isOnline]: labels) {
473 if (isOnline) {
474 targetMapOnlineLabels_[device].insert(label);
475 } else {
476 targetMapOnlineLabels_[device].erase(label);
477 }
478 }
479 }
480
TriggerLabelExchangeEvent(bool checkAdapter)481 bool CommunicatorLinker::TriggerLabelExchangeEvent(bool checkAdapter)
482 {
483 if (checkAdapter && statusAdapter_ != nullptr && !statusAdapter_->IsSendLabelExchange()) {
484 return false;
485 }
486 std::set<std::string> totalOnlineTargets;
487 {
488 std::lock_guard<std::mutex> entireInfoLockGuard(entireInfoMutex_);
489 totalOnlineTargets = remoteOnlineTarget_;
490 }
491 bool everFail = false;
492 for (auto &entry : totalOnlineTargets) {
493 if (TriggerLabelExchangeEvent(entry) != E_OK) {
494 everFail = true;
495 }
496 }
497 return everFail;
498 }
499 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker)
500 } // namespace DistributedDB
501