1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #ifndef OMIT_MULTI_VER
17  #include "multi_ver_sync_state_machine.h"
18  
19  #include <cmath>
20  #include <climits>
21  #include <algorithm>
22  
23  #include "message_transform.h"
24  #include "log_print.h"
25  #include "sync_types.h"
26  #include "db_common.h"
27  #include "ref_object.h"
28  #include "performance_analysis.h"
29  
30  namespace DistributedDB {
31  namespace {
ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32  void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset)
33  {
34      for (MultiVerKvEntry *entry : entries) {
35          if (entry == nullptr) {
36              continue;
37          }
38          Timestamp timestamp;
39          entry->GetTimestamp(timestamp);
40          timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset);
41          entry->SetTimestamp(timestamp);
42      }
43  }
44  }
45  std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_;
MultiVerSyncStateMachine()46  MultiVerSyncStateMachine::MultiVerSyncStateMachine()
47      : context_(nullptr),
48        multiVerStorage_(nullptr),
49        timeSync_(nullptr),
50        commitHistorySync_(nullptr),
51        multiVerDataSync_(nullptr),
52        valueSliceSync_(nullptr)
53  {
54  }
55  
~MultiVerSyncStateMachine()56  MultiVerSyncStateMachine::~MultiVerSyncStateMachine()
57  {
58      Clear();
59  }
60  
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61  int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
62      const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
63  {
64      if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) {
65          return -E_INVALID_ARGS;
66      }
67      int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator);
68      if (errCode != E_OK) {
69          return errCode;
70      }
71  
72      timeSync_ = std::make_unique<TimeSync>();
73      commitHistorySync_ = std::make_unique<CommitHistorySync>();
74      multiVerDataSync_ = std::make_unique<MultiVerDataSync>();
75      valueSliceSync_ = std::make_unique<ValueSliceSync>();
76  
77      errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId());
78      if (errCode != E_OK) {
79          LOGE("timeSync_->Initialize failed err %d", errCode);
80          goto ERROR_OUT;
81      }
82      LOGD("timeSync_->Initialize OK");
83  
84      // init functions below will never fail
85      multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface);
86      commitHistorySync_->Initialize(multiVerStorage_, communicator);
87      multiVerDataSync_->Initialize(multiVerStorage_, communicator);
88      valueSliceSync_->Initialize(multiVerStorage_, communicator);
89  
90      context_ = static_cast<MultiVerSyncTaskContext *>(context);
91      currentState_ = IDLE;
92      (void)timeSync_->SyncStart();
93      return E_OK;
94  
95  ERROR_OUT:
96      Clear();
97      return errCode;
98  }
99  
SyncStep()100  void MultiVerSyncStateMachine::SyncStep()
101  {
102      RefObject::IncObjRef(context_);
103      RefObject::IncObjRef(communicator_);
104      int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
105      if (errCode != E_OK) {
106          LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed");
107          RefObject::DecObjRef(communicator_);
108          RefObject::DecObjRef(context_);
109      }
110  }
111  
StepToIdle()112  void MultiVerSyncStateMachine::StepToIdle()
113  {
114      currentState_ = IDLE;
115      StopWatchDog();
116      context_->Clear();
117      PerformanceAnalysis::GetInstance()->TimeRecordEnd();
118      LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId()));
119  }
120  
MessageCallbackCheck(const Message * inMsg)121  int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg)
122  {
123      RefObject::AutoLock lock(context_);
124      if (context_->IsKilled()) {
125          return -E_OBJ_IS_KILLED;
126      }
127      if (!IsPacketValid(inMsg)) {
128          return -E_INVALID_ARGS;
129      }
130      if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
131          context_->IncSequenceId();
132          int errCode = ResetWatchDog();
133          if (errCode != E_OK) {
134              LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode);
135          }
136      }
137      return E_OK;
138  }
139  
ReceiveMessageCallback(Message * inMsg)140  int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
141  {
142      if (inMsg == nullptr) {
143          return -E_INVALID_ARGS;
144      }
145      if (inMsg->IsFeedbackError()) {
146          LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
147          return -static_cast<int>(inMsg->GetErrorNo());
148      }
149      if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
150          return TimeSyncPacketRecvCallback(context_, inMsg);
151      }
152      std::lock_guard<std::mutex> lock(stateMachineLock_);
153      int errCode = MessageCallbackCheck(inMsg);
154      if (errCode != E_OK) {
155          return errCode;
156      }
157      switch (inMsg->GetMessageId()) {
158          case COMMIT_HISTORY_SYNC_MESSAGE:
159              errCode = CommitHistorySyncPktRecvCallback(context_, inMsg);
160              if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) {
161                  SyncResponseBegin(inMsg->GetSessionId());
162              }
163              break;
164          case MULTI_VER_DATA_SYNC_MESSAGE:
165              errCode = MultiVerDataPktRecvCallback(context_, inMsg);
166              break;
167          case VALUE_SLICE_SYNC_MESSAGE:
168              errCode = ValueSlicePktRecvCallback(context_, inMsg);
169              break;
170          default:
171              errCode = -E_NOT_SUPPORT;
172              break;
173      }
174      if (errCode == -E_LAST_SYNC_FRAME) {
175          SyncResponseEnd(inMsg->GetSessionId());
176          return errCode;
177      }
178      if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) {
179          Abort();
180      }
181      return errCode;
182  }
183  
StepToTimeout(TimerId timerId)184  void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId)
185  {
186      {
187          std::lock_guard<std::mutex> lock(stateMachineLock_);
188          TimerId timer = syncContext_->GetTimerId();
189          if (timer != timerId) {
190              return;
191          }
192          currentState_ = SYNC_TIME_OUT;
193      }
194      Abort();
195  }
196  
CommitHistorySyncStepInner(void)197  int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void)
198  {
199      int errCode = commitHistorySync_->SyncStart(context_);
200      if (errCode != E_OK) {
201          LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode);
202      }
203      return errCode;
204  }
205  
MultiVerDataSyncStepInner(void)206  int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void)
207  {
208      return multiVerDataSync_->SyncStart(context_);
209  }
210  
ValueSliceSyncStepInner(void)211  int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void)
212  {
213      return valueSliceSync_->SyncStart(context_);
214  }
215  
SyncStepInnerLocked()216  void MultiVerSyncStateMachine::SyncStepInnerLocked()
217  {
218      if (context_->IncUsedCount() != E_OK) {
219          goto SYNC_STEP_OUT;
220      }
221  
222      LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_);
223      int errCode;
224      {
225          std::lock_guard<std::mutex> lock(stateMachineLock_);
226          switch (currentState_) {
227              case COMMIT_HISTORY_SYNC:
228                  errCode = CommitHistorySyncStepInner();
229                  if (errCode != E_OK) {
230                      Abort();
231                  }
232                  break;
233              case MULTI_VER_DATA_ENTRY_SYNC:
234                  errCode = MultiVerDataSyncStepInner();
235                  if (errCode == -E_NOT_FOUND) {
236                      Finish();
237                      goto SYNC_STEP_SAFE_OUT;
238                  }
239                  break;
240              case MULTI_VER_VALUE_SLICE_SYNC:
241                  errCode = ValueSliceSyncStepInner();
242                  if (errCode == -E_NOT_FOUND) {
243                      int err = OneCommitSyncFinish();
244                      if (err != E_OK) {
245                          valueSliceSync_->SendFinishedRequest(context_);
246                          Abort();
247                          goto SYNC_STEP_SAFE_OUT;
248                      }
249                      currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
250                      SyncStep();
251                      goto SYNC_STEP_SAFE_OUT;
252                  }
253                  break;
254              default:
255                  break;
256          }
257      }
258  
259  SYNC_STEP_SAFE_OUT:
260      context_->SafeExit();
261  
262  SYNC_STEP_OUT:
263      RefObject::DecObjRef(communicator_);
264      RefObject::DecObjRef(context_);
265  }
266  
SyncStepInner()267  void MultiVerSyncStateMachine::SyncStepInner()
268  {
269  }
270  
StartSyncInner()271  int MultiVerSyncStateMachine::StartSyncInner()
272  {
273      LOGI("[MultiVerSyncStateMachine] StartSync");
274      currentState_ = COMMIT_HISTORY_SYNC;
275      PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
276      if (performance != nullptr) {
277          performance->TimeRecordStart();
278      }
279      int errCode = StartWatchDog();
280      if (errCode != E_OK) {
281          LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode);
282          return errCode;
283      }
284      SyncStep();
285      return E_OK;
286  }
287  
AbortInner()288  void MultiVerSyncStateMachine::AbortInner()
289  {
290      context_->Clear();
291      StepToIdle();
292      ExecNextTask();
293  }
294  
GetStateSwitchTables() const295  const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const
296  {
297      return stateSwitchTables_;
298  }
299  
PrepareNextSyncTask()300  int MultiVerSyncStateMachine::PrepareNextSyncTask()
301  {
302      return StartSyncInner();
303  }
304  
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)305  void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
306  {
307      (void)sessionId;
308      (void)sequenceId;
309      (void)inMsgId;
310  }
311  
CommErrAbort(uint32_t sessionId)312  void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
313  {
314      (void)sessionId;
315      std::lock_guard<std::mutex> lock(stateMachineLock_);
316      Abort();
317      RefObject::DecObjRef(context_);
318  }
319  
TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)320  int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg)
321  {
322      int errCode;
323      if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
324          return -E_INVALID_ARGS;
325      }
326      switch (inMsg->GetMessageType()) {
327          case TYPE_REQUEST:
328              errCode = timeSync_->RequestRecv(inMsg);
329              return errCode;
330          case TYPE_RESPONSE:
331              errCode = timeSync_->AckRecv(inMsg);
332              if (errCode != E_OK) {
333                  LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode);
334              }
335              return errCode;
336          default:
337              return -E_INVALID_ARGS;
338      }
339  }
340  
CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)341  int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
342  {
343      if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
344          return -E_INVALID_ARGS;
345      }
346      PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
347      int errCode;
348      switch (inMsg->GetMessageType()) {
349          case TYPE_REQUEST:
350              if (performance != nullptr) {
351                  performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
352              }
353              return commitHistorySync_->RequestRecvCallback(context, inMsg);
354          case TYPE_RESPONSE:
355              if (performance != nullptr) {
356                  performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
357              }
358              errCode = commitHistorySync_->AckRecvCallback(context, inMsg);
359              if (errCode != E_OK) {
360                  return errCode;
361              }
362              currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
363              SyncStep();
364              return errCode;
365          default:
366              return -E_INVALID_ARGS;
367      }
368  }
369  
MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)370  int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
371  {
372      if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
373          return -E_INVALID_ARGS;
374      }
375      PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
376      int errCode;
377      switch (inMsg->GetMessageType()) {
378          case TYPE_REQUEST:
379              return multiVerDataSync_->RequestRecvCallback(context, inMsg);
380          case TYPE_RESPONSE:
381              if (performance != nullptr) {
382                  performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
383              }
384              errCode = multiVerDataSync_->AckRecvCallback(context, inMsg);
385              if (errCode != E_OK) {
386                  multiVerDataSync_->SendFinishedRequest(context);
387                  return errCode;
388              }
389              currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
390              SyncStep();
391              return errCode;
392          default:
393              return -E_INVALID_ARGS;
394      }
395  }
396  
ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)397  int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
398  {
399      if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
400          return -E_INVALID_ARGS;
401      }
402      PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
403      int errCode;
404      switch (inMsg->GetMessageType()) {
405          case TYPE_REQUEST:
406              return valueSliceSync_->RequestRecvCallback(context, inMsg);
407          case TYPE_RESPONSE:
408              if (performance != nullptr) {
409                  performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
410              }
411              errCode = valueSliceSync_->AckRecvCallback(context, inMsg);
412              if (errCode != E_OK) {
413                  valueSliceSync_->SendFinishedRequest(context);
414                  return errCode;
415              }
416              currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
417              SyncStep();
418              return errCode;
419          default:
420              return -E_INVALID_ARGS;
421      }
422  }
423  
Finish()424  void MultiVerSyncStateMachine::Finish()
425  {
426      MultiVerCommitNode commit;
427      int commitsSize = context_->GetCommitsSize();
428      if (commitsSize > 0) {
429          context_->GetCommit(commitsSize - 1, commit);
430          std::vector<MultiVerCommitNode> commits;
431          context_->GetCommits(commits);
432          LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId()));
433          PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
434          if (performance != nullptr) {
435              performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE);
436          }
437          int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits);
438          LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d",
439              STR_MASK(context_->GetDeviceId()), errCode);
440          if (performance != nullptr) {
441              performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE);
442          }
443      }
444      RefObject::AutoLock lock(context_);
445      context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
446      StepToIdle();
447      ExecNextTask();
448  }
449  
OneCommitSyncFinish()450  int MultiVerSyncStateMachine::OneCommitSyncFinish()
451  {
452      MultiVerCommitNode commit;
453      TimeOffset outOffset = 0;
454      int errCode = E_OK;
455      int commitIndex = context_->GetCommitIndex();
456  
457      LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish  src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()),
458          commitIndex);
459      if (commitIndex > 0) {
460          context_->GetCommit(commitIndex - 1, commit);
461          std::string deviceName = context_->GetDeviceId();
462          std::vector<MultiVerKvEntry *> entries;
463          context_->GetEntries(entries);
464          LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu",
465              STR_MASK(context_->GetDeviceId()), entries.size());
466          errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME);
467          if (errCode != E_OK) {
468              LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode);
469              return errCode;
470          }
471          Timestamp currentLocalTime = context_->GetCurrentLocalTime();
472          commit.timestamp -= static_cast<Timestamp>(outOffset);
473  
474          // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp
475          TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp -
476              static_cast<Timestamp>(currentLocalTime));
477          LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64,
478              STR_MASK(context_->GetDeviceId()), timefixOffset);
479          commit.timestamp -= static_cast<Timestamp>(timefixOffset);
480          ChangeEntriesTimestamp(entries, outOffset, timefixOffset);
481          PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
482          if (performance != nullptr) {
483              performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
484          }
485          errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName);
486          LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d",
487              STR_MASK(context_->GetDeviceId()), errCode);
488          if (performance != nullptr) {
489              performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
490          }
491          if (errCode == E_OK) {
492              context_->ReleaseEntries();
493          }
494      }
495      DBCommon::PrintHexVector(commit.commitId, __LINE__);
496      return errCode;
497  }
498  
IsPacketValid(const Message * inMsg) const499  bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
500  {
501      if (inMsg == nullptr) {
502          return false;
503      }
504  
505      if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) ||
506          (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) {
507          LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId());
508          return false;
509      }
510      if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
511          return true;
512      }
513      if (inMsg->GetMessageType() == TYPE_RESPONSE) {
514          if ((inMsg->GetSequenceId() != context_->GetSequenceId()) ||
515              (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
516              LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d,"
517                  "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(),
518                  inMsg->GetSessionId(), context_->GetRequestSessionId());
519              return false;
520          }
521      }
522      return true;
523  }
524  
Clear()525  void MultiVerSyncStateMachine::Clear()
526  {
527      commitHistorySync_ = nullptr;
528      multiVerDataSync_ = nullptr;
529      timeSync_ = nullptr;
530      valueSliceSync_ = nullptr;
531      multiVerStorage_ = nullptr;
532      context_ = nullptr;
533  }
534  
SyncResponseBegin(uint32_t sessionId)535  void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId)
536  {
537      {
538          std::lock_guard<std::mutex> lock(responseInfosLock_);
539          auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
540              return info.sessionId == sessionId;
541          });
542          if (iter != responseInfos_.end()) {
543              LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit.");
544              return;
545          }
546          TimerAction timeOutCallback = [this](TimerId timerId) -> int { return SyncResponseTimeout(timerId); };
547          // To make sure context_ alive in timeout callback, we should IncObjRef for the context_.
548          RefObject::IncObjRef(context_);
549          TimerId timerId = 0;
550          int errCode = RuntimeContext::GetInstance()->SetTimer(
551              RESPONSE_TIME_OUT, timeOutCallback,
552              [this]() {
553                  int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(context_); });
554                  if (ret != E_OK) {
555                      LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret);
556                  }
557              },
558              timerId);
559          if (errCode != E_OK) {
560              LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode);
561              RefObject::DecObjRef(context_);
562              return;
563          }
564          ResponseInfo info{sessionId, timerId};
565          responseInfos_.push_back(info);
566          LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin");
567      }
568      multiVerStorage_->NotifyStartSyncOperation();
569  }
570  
SyncResponseEnd(uint32_t sessionId)571  void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId)
572  {
573      {
574          std::lock_guard<std::mutex> lock(responseInfosLock_);
575          auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
576              return info.sessionId == sessionId;
577          });
578          if (iter == responseInfos_.end()) {
579              LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId);
580              return;
581          }
582          RuntimeContext::GetInstance()->RemoveTimer(iter->timerId);
583          responseInfos_.erase(iter);
584          LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response");
585      }
586      multiVerStorage_->NotifyFinishSyncOperation();
587  }
588  
SyncResponseTimeout(TimerId timerId)589  int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId)
590  {
591      uint32_t sessionId;
592      {
593          std::lock_guard<std::mutex> lock(responseInfosLock_);
594          auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) {
595              return info.timerId == timerId;
596          });
597          if (iter == responseInfos_.end()) {
598              LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId);
599              return E_OK;
600          }
601          sessionId = iter->sessionId;
602      }
603      SyncResponseEnd(sessionId);
604      return E_OK;
605  }
606  
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)607  bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
608  {
609      (void) inMsg;
610      (void) query;
611      return false;
612  }
613  }
614  #endif