1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_sync_state_machine.h"
18 
19 #include <cmath>
20 #include <climits>
21 #include <algorithm>
22 
23 #include "message_transform.h"
24 #include "log_print.h"
25 #include "sync_types.h"
26 #include "db_common.h"
27 #include "ref_object.h"
28 #include "performance_analysis.h"
29 
30 namespace DistributedDB {
31 namespace {
ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32 void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset)
33 {
34     for (MultiVerKvEntry *entry : entries) {
35         if (entry == nullptr) {
36             continue;
37         }
38         Timestamp timestamp;
39         entry->GetTimestamp(timestamp);
40         timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset);
41         entry->SetTimestamp(timestamp);
42     }
43 }
44 }
45 std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_;
MultiVerSyncStateMachine()46 MultiVerSyncStateMachine::MultiVerSyncStateMachine()
47     : context_(nullptr),
48       multiVerStorage_(nullptr),
49       timeSync_(nullptr),
50       commitHistorySync_(nullptr),
51       multiVerDataSync_(nullptr),
52       valueSliceSync_(nullptr)
53 {
54 }
55 
~MultiVerSyncStateMachine()56 MultiVerSyncStateMachine::~MultiVerSyncStateMachine()
57 {
58     Clear();
59 }
60 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61 int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
62     const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
63 {
64     if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) {
65         return -E_INVALID_ARGS;
66     }
67     int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator);
68     if (errCode != E_OK) {
69         return errCode;
70     }
71 
72     timeSync_ = std::make_unique<TimeSync>();
73     commitHistorySync_ = std::make_unique<CommitHistorySync>();
74     multiVerDataSync_ = std::make_unique<MultiVerDataSync>();
75     valueSliceSync_ = std::make_unique<ValueSliceSync>();
76 
77     errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId());
78     if (errCode != E_OK) {
79         LOGE("timeSync_->Initialize failed err %d", errCode);
80         goto ERROR_OUT;
81     }
82     LOGD("timeSync_->Initialize OK");
83 
84     // init functions below will never fail
85     multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface);
86     commitHistorySync_->Initialize(multiVerStorage_, communicator);
87     multiVerDataSync_->Initialize(multiVerStorage_, communicator);
88     valueSliceSync_->Initialize(multiVerStorage_, communicator);
89 
90     context_ = static_cast<MultiVerSyncTaskContext *>(context);
91     currentState_ = IDLE;
92     (void)timeSync_->SyncStart();
93     return E_OK;
94 
95 ERROR_OUT:
96     Clear();
97     return errCode;
98 }
99 
SyncStep()100 void MultiVerSyncStateMachine::SyncStep()
101 {
102     RefObject::IncObjRef(context_);
103     RefObject::IncObjRef(communicator_);
104     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
105     if (errCode != E_OK) {
106         LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed");
107         RefObject::DecObjRef(communicator_);
108         RefObject::DecObjRef(context_);
109     }
110 }
111 
StepToIdle()112 void MultiVerSyncStateMachine::StepToIdle()
113 {
114     currentState_ = IDLE;
115     StopWatchDog();
116     context_->Clear();
117     PerformanceAnalysis::GetInstance()->TimeRecordEnd();
118     LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId()));
119 }
120 
MessageCallbackCheck(const Message * inMsg)121 int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg)
122 {
123     RefObject::AutoLock lock(context_);
124     if (context_->IsKilled()) {
125         return -E_OBJ_IS_KILLED;
126     }
127     if (!IsPacketValid(inMsg)) {
128         return -E_INVALID_ARGS;
129     }
130     if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
131         context_->IncSequenceId();
132         int errCode = ResetWatchDog();
133         if (errCode != E_OK) {
134             LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode);
135         }
136     }
137     return E_OK;
138 }
139 
ReceiveMessageCallback(Message * inMsg)140 int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
141 {
142     if (inMsg == nullptr) {
143         return -E_INVALID_ARGS;
144     }
145     if (inMsg->IsFeedbackError()) {
146         LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
147         return -static_cast<int>(inMsg->GetErrorNo());
148     }
149     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
150         return TimeSyncPacketRecvCallback(context_, inMsg);
151     }
152     std::lock_guard<std::mutex> lock(stateMachineLock_);
153     int errCode = MessageCallbackCheck(inMsg);
154     if (errCode != E_OK) {
155         return errCode;
156     }
157     switch (inMsg->GetMessageId()) {
158         case COMMIT_HISTORY_SYNC_MESSAGE:
159             errCode = CommitHistorySyncPktRecvCallback(context_, inMsg);
160             if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) {
161                 SyncResponseBegin(inMsg->GetSessionId());
162             }
163             break;
164         case MULTI_VER_DATA_SYNC_MESSAGE:
165             errCode = MultiVerDataPktRecvCallback(context_, inMsg);
166             break;
167         case VALUE_SLICE_SYNC_MESSAGE:
168             errCode = ValueSlicePktRecvCallback(context_, inMsg);
169             break;
170         default:
171             errCode = -E_NOT_SUPPORT;
172             break;
173     }
174     if (errCode == -E_LAST_SYNC_FRAME) {
175         SyncResponseEnd(inMsg->GetSessionId());
176         return errCode;
177     }
178     if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) {
179         Abort();
180     }
181     return errCode;
182 }
183 
StepToTimeout(TimerId timerId)184 void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId)
185 {
186     {
187         std::lock_guard<std::mutex> lock(stateMachineLock_);
188         TimerId timer = syncContext_->GetTimerId();
189         if (timer != timerId) {
190             return;
191         }
192         currentState_ = SYNC_TIME_OUT;
193     }
194     Abort();
195 }
196 
CommitHistorySyncStepInner(void)197 int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void)
198 {
199     int errCode = commitHistorySync_->SyncStart(context_);
200     if (errCode != E_OK) {
201         LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode);
202     }
203     return errCode;
204 }
205 
MultiVerDataSyncStepInner(void)206 int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void)
207 {
208     return multiVerDataSync_->SyncStart(context_);
209 }
210 
ValueSliceSyncStepInner(void)211 int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void)
212 {
213     return valueSliceSync_->SyncStart(context_);
214 }
215 
SyncStepInnerLocked()216 void MultiVerSyncStateMachine::SyncStepInnerLocked()
217 {
218     if (context_->IncUsedCount() != E_OK) {
219         goto SYNC_STEP_OUT;
220     }
221 
222     LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_);
223     int errCode;
224     {
225         std::lock_guard<std::mutex> lock(stateMachineLock_);
226         switch (currentState_) {
227             case COMMIT_HISTORY_SYNC:
228                 errCode = CommitHistorySyncStepInner();
229                 if (errCode != E_OK) {
230                     Abort();
231                 }
232                 break;
233             case MULTI_VER_DATA_ENTRY_SYNC:
234                 errCode = MultiVerDataSyncStepInner();
235                 if (errCode == -E_NOT_FOUND) {
236                     Finish();
237                     goto SYNC_STEP_SAFE_OUT;
238                 }
239                 break;
240             case MULTI_VER_VALUE_SLICE_SYNC:
241                 errCode = ValueSliceSyncStepInner();
242                 if (errCode == -E_NOT_FOUND) {
243                     int err = OneCommitSyncFinish();
244                     if (err != E_OK) {
245                         valueSliceSync_->SendFinishedRequest(context_);
246                         Abort();
247                         goto SYNC_STEP_SAFE_OUT;
248                     }
249                     currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
250                     SyncStep();
251                     goto SYNC_STEP_SAFE_OUT;
252                 }
253                 break;
254             default:
255                 break;
256         }
257     }
258 
259 SYNC_STEP_SAFE_OUT:
260     context_->SafeExit();
261 
262 SYNC_STEP_OUT:
263     RefObject::DecObjRef(communicator_);
264     RefObject::DecObjRef(context_);
265 }
266 
SyncStepInner()267 void MultiVerSyncStateMachine::SyncStepInner()
268 {
269 }
270 
StartSyncInner()271 int MultiVerSyncStateMachine::StartSyncInner()
272 {
273     LOGI("[MultiVerSyncStateMachine] StartSync");
274     currentState_ = COMMIT_HISTORY_SYNC;
275     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
276     if (performance != nullptr) {
277         performance->TimeRecordStart();
278     }
279     int errCode = StartWatchDog();
280     if (errCode != E_OK) {
281         LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode);
282         return errCode;
283     }
284     SyncStep();
285     return E_OK;
286 }
287 
AbortInner()288 void MultiVerSyncStateMachine::AbortInner()
289 {
290     context_->Clear();
291     StepToIdle();
292     ExecNextTask();
293 }
294 
GetStateSwitchTables() const295 const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const
296 {
297     return stateSwitchTables_;
298 }
299 
PrepareNextSyncTask()300 int MultiVerSyncStateMachine::PrepareNextSyncTask()
301 {
302     return StartSyncInner();
303 }
304 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)305 void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
306 {
307     (void)sessionId;
308     (void)sequenceId;
309     (void)inMsgId;
310 }
311 
CommErrAbort(uint32_t sessionId)312 void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
313 {
314     (void)sessionId;
315     std::lock_guard<std::mutex> lock(stateMachineLock_);
316     Abort();
317     RefObject::DecObjRef(context_);
318 }
319 
TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)320 int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg)
321 {
322     int errCode;
323     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
324         return -E_INVALID_ARGS;
325     }
326     switch (inMsg->GetMessageType()) {
327         case TYPE_REQUEST:
328             errCode = timeSync_->RequestRecv(inMsg);
329             return errCode;
330         case TYPE_RESPONSE:
331             errCode = timeSync_->AckRecv(inMsg);
332             if (errCode != E_OK) {
333                 LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode);
334             }
335             return errCode;
336         default:
337             return -E_INVALID_ARGS;
338     }
339 }
340 
CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)341 int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
342 {
343     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
344         return -E_INVALID_ARGS;
345     }
346     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
347     int errCode;
348     switch (inMsg->GetMessageType()) {
349         case TYPE_REQUEST:
350             if (performance != nullptr) {
351                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
352             }
353             return commitHistorySync_->RequestRecvCallback(context, inMsg);
354         case TYPE_RESPONSE:
355             if (performance != nullptr) {
356                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
357             }
358             errCode = commitHistorySync_->AckRecvCallback(context, inMsg);
359             if (errCode != E_OK) {
360                 return errCode;
361             }
362             currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
363             SyncStep();
364             return errCode;
365         default:
366             return -E_INVALID_ARGS;
367     }
368 }
369 
MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)370 int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
371 {
372     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
373         return -E_INVALID_ARGS;
374     }
375     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
376     int errCode;
377     switch (inMsg->GetMessageType()) {
378         case TYPE_REQUEST:
379             return multiVerDataSync_->RequestRecvCallback(context, inMsg);
380         case TYPE_RESPONSE:
381             if (performance != nullptr) {
382                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
383             }
384             errCode = multiVerDataSync_->AckRecvCallback(context, inMsg);
385             if (errCode != E_OK) {
386                 multiVerDataSync_->SendFinishedRequest(context);
387                 return errCode;
388             }
389             currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
390             SyncStep();
391             return errCode;
392         default:
393             return -E_INVALID_ARGS;
394     }
395 }
396 
ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)397 int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
398 {
399     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
400         return -E_INVALID_ARGS;
401     }
402     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
403     int errCode;
404     switch (inMsg->GetMessageType()) {
405         case TYPE_REQUEST:
406             return valueSliceSync_->RequestRecvCallback(context, inMsg);
407         case TYPE_RESPONSE:
408             if (performance != nullptr) {
409                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
410             }
411             errCode = valueSliceSync_->AckRecvCallback(context, inMsg);
412             if (errCode != E_OK) {
413                 valueSliceSync_->SendFinishedRequest(context);
414                 return errCode;
415             }
416             currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
417             SyncStep();
418             return errCode;
419         default:
420             return -E_INVALID_ARGS;
421     }
422 }
423 
Finish()424 void MultiVerSyncStateMachine::Finish()
425 {
426     MultiVerCommitNode commit;
427     int commitsSize = context_->GetCommitsSize();
428     if (commitsSize > 0) {
429         context_->GetCommit(commitsSize - 1, commit);
430         std::vector<MultiVerCommitNode> commits;
431         context_->GetCommits(commits);
432         LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId()));
433         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
434         if (performance != nullptr) {
435             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE);
436         }
437         int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits);
438         LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d",
439             STR_MASK(context_->GetDeviceId()), errCode);
440         if (performance != nullptr) {
441             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE);
442         }
443     }
444     RefObject::AutoLock lock(context_);
445     context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
446     StepToIdle();
447     ExecNextTask();
448 }
449 
OneCommitSyncFinish()450 int MultiVerSyncStateMachine::OneCommitSyncFinish()
451 {
452     MultiVerCommitNode commit;
453     TimeOffset outOffset = 0;
454     int errCode = E_OK;
455     int commitIndex = context_->GetCommitIndex();
456 
457     LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish  src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()),
458         commitIndex);
459     if (commitIndex > 0) {
460         context_->GetCommit(commitIndex - 1, commit);
461         std::string deviceName = context_->GetDeviceId();
462         std::vector<MultiVerKvEntry *> entries;
463         context_->GetEntries(entries);
464         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu",
465             STR_MASK(context_->GetDeviceId()), entries.size());
466         errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME);
467         if (errCode != E_OK) {
468             LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode);
469             return errCode;
470         }
471         Timestamp currentLocalTime = context_->GetCurrentLocalTime();
472         commit.timestamp -= static_cast<Timestamp>(outOffset);
473 
474         // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp
475         TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp -
476             static_cast<Timestamp>(currentLocalTime));
477         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64,
478             STR_MASK(context_->GetDeviceId()), timefixOffset);
479         commit.timestamp -= static_cast<Timestamp>(timefixOffset);
480         ChangeEntriesTimestamp(entries, outOffset, timefixOffset);
481         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
482         if (performance != nullptr) {
483             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
484         }
485         errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName);
486         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d",
487             STR_MASK(context_->GetDeviceId()), errCode);
488         if (performance != nullptr) {
489             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
490         }
491         if (errCode == E_OK) {
492             context_->ReleaseEntries();
493         }
494     }
495     DBCommon::PrintHexVector(commit.commitId, __LINE__);
496     return errCode;
497 }
498 
IsPacketValid(const Message * inMsg) const499 bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
500 {
501     if (inMsg == nullptr) {
502         return false;
503     }
504 
505     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) ||
506         (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) {
507         LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId());
508         return false;
509     }
510     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
511         return true;
512     }
513     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
514         if ((inMsg->GetSequenceId() != context_->GetSequenceId()) ||
515             (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
516             LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d,"
517                 "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(),
518                 inMsg->GetSessionId(), context_->GetRequestSessionId());
519             return false;
520         }
521     }
522     return true;
523 }
524 
Clear()525 void MultiVerSyncStateMachine::Clear()
526 {
527     commitHistorySync_ = nullptr;
528     multiVerDataSync_ = nullptr;
529     timeSync_ = nullptr;
530     valueSliceSync_ = nullptr;
531     multiVerStorage_ = nullptr;
532     context_ = nullptr;
533 }
534 
SyncResponseBegin(uint32_t sessionId)535 void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId)
536 {
537     {
538         std::lock_guard<std::mutex> lock(responseInfosLock_);
539         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
540             return info.sessionId == sessionId;
541         });
542         if (iter != responseInfos_.end()) {
543             LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit.");
544             return;
545         }
546         TimerAction timeOutCallback = [this](TimerId timerId) -> int { return SyncResponseTimeout(timerId); };
547         // To make sure context_ alive in timeout callback, we should IncObjRef for the context_.
548         RefObject::IncObjRef(context_);
549         TimerId timerId = 0;
550         int errCode = RuntimeContext::GetInstance()->SetTimer(
551             RESPONSE_TIME_OUT, timeOutCallback,
552             [this]() {
553                 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(context_); });
554                 if (ret != E_OK) {
555                     LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret);
556                 }
557             },
558             timerId);
559         if (errCode != E_OK) {
560             LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode);
561             RefObject::DecObjRef(context_);
562             return;
563         }
564         ResponseInfo info{sessionId, timerId};
565         responseInfos_.push_back(info);
566         LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin");
567     }
568     multiVerStorage_->NotifyStartSyncOperation();
569 }
570 
SyncResponseEnd(uint32_t sessionId)571 void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId)
572 {
573     {
574         std::lock_guard<std::mutex> lock(responseInfosLock_);
575         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
576             return info.sessionId == sessionId;
577         });
578         if (iter == responseInfos_.end()) {
579             LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId);
580             return;
581         }
582         RuntimeContext::GetInstance()->RemoveTimer(iter->timerId);
583         responseInfos_.erase(iter);
584         LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response");
585     }
586     multiVerStorage_->NotifyFinishSyncOperation();
587 }
588 
SyncResponseTimeout(TimerId timerId)589 int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId)
590 {
591     uint32_t sessionId;
592     {
593         std::lock_guard<std::mutex> lock(responseInfosLock_);
594         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) {
595             return info.timerId == timerId;
596         });
597         if (iter == responseInfos_.end()) {
598             LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId);
599             return E_OK;
600         }
601         sessionId = iter->sessionId;
602     }
603     SyncResponseEnd(sessionId);
604     return E_OK;
605 }
606 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)607 bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
608 {
609     (void) inMsg;
610     (void) query;
611     return false;
612 }
613 }
614 #endif