1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_sync_state_machine.h"
18
19 #include <cmath>
20 #include <climits>
21 #include <algorithm>
22
23 #include "message_transform.h"
24 #include "log_print.h"
25 #include "sync_types.h"
26 #include "db_common.h"
27 #include "ref_object.h"
28 #include "performance_analysis.h"
29
30 namespace DistributedDB {
31 namespace {
ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32 void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset)
33 {
34 for (MultiVerKvEntry *entry : entries) {
35 if (entry == nullptr) {
36 continue;
37 }
38 Timestamp timestamp;
39 entry->GetTimestamp(timestamp);
40 timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset);
41 entry->SetTimestamp(timestamp);
42 }
43 }
44 }
45 std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_;
MultiVerSyncStateMachine()46 MultiVerSyncStateMachine::MultiVerSyncStateMachine()
47 : context_(nullptr),
48 multiVerStorage_(nullptr),
49 timeSync_(nullptr),
50 commitHistorySync_(nullptr),
51 multiVerDataSync_(nullptr),
52 valueSliceSync_(nullptr)
53 {
54 }
55
~MultiVerSyncStateMachine()56 MultiVerSyncStateMachine::~MultiVerSyncStateMachine()
57 {
58 Clear();
59 }
60
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61 int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
62 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
63 {
64 if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) {
65 return -E_INVALID_ARGS;
66 }
67 int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator);
68 if (errCode != E_OK) {
69 return errCode;
70 }
71
72 timeSync_ = std::make_unique<TimeSync>();
73 commitHistorySync_ = std::make_unique<CommitHistorySync>();
74 multiVerDataSync_ = std::make_unique<MultiVerDataSync>();
75 valueSliceSync_ = std::make_unique<ValueSliceSync>();
76
77 errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId());
78 if (errCode != E_OK) {
79 LOGE("timeSync_->Initialize failed err %d", errCode);
80 goto ERROR_OUT;
81 }
82 LOGD("timeSync_->Initialize OK");
83
84 // init functions below will never fail
85 multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface);
86 commitHistorySync_->Initialize(multiVerStorage_, communicator);
87 multiVerDataSync_->Initialize(multiVerStorage_, communicator);
88 valueSliceSync_->Initialize(multiVerStorage_, communicator);
89
90 context_ = static_cast<MultiVerSyncTaskContext *>(context);
91 currentState_ = IDLE;
92 (void)timeSync_->SyncStart();
93 return E_OK;
94
95 ERROR_OUT:
96 Clear();
97 return errCode;
98 }
99
SyncStep()100 void MultiVerSyncStateMachine::SyncStep()
101 {
102 RefObject::IncObjRef(context_);
103 RefObject::IncObjRef(communicator_);
104 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
105 if (errCode != E_OK) {
106 LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed");
107 RefObject::DecObjRef(communicator_);
108 RefObject::DecObjRef(context_);
109 }
110 }
111
StepToIdle()112 void MultiVerSyncStateMachine::StepToIdle()
113 {
114 currentState_ = IDLE;
115 StopWatchDog();
116 context_->Clear();
117 PerformanceAnalysis::GetInstance()->TimeRecordEnd();
118 LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId()));
119 }
120
MessageCallbackCheck(const Message * inMsg)121 int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg)
122 {
123 RefObject::AutoLock lock(context_);
124 if (context_->IsKilled()) {
125 return -E_OBJ_IS_KILLED;
126 }
127 if (!IsPacketValid(inMsg)) {
128 return -E_INVALID_ARGS;
129 }
130 if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
131 context_->IncSequenceId();
132 int errCode = ResetWatchDog();
133 if (errCode != E_OK) {
134 LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode);
135 }
136 }
137 return E_OK;
138 }
139
ReceiveMessageCallback(Message * inMsg)140 int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
141 {
142 if (inMsg == nullptr) {
143 return -E_INVALID_ARGS;
144 }
145 if (inMsg->IsFeedbackError()) {
146 LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
147 return -static_cast<int>(inMsg->GetErrorNo());
148 }
149 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
150 return TimeSyncPacketRecvCallback(context_, inMsg);
151 }
152 std::lock_guard<std::mutex> lock(stateMachineLock_);
153 int errCode = MessageCallbackCheck(inMsg);
154 if (errCode != E_OK) {
155 return errCode;
156 }
157 switch (inMsg->GetMessageId()) {
158 case COMMIT_HISTORY_SYNC_MESSAGE:
159 errCode = CommitHistorySyncPktRecvCallback(context_, inMsg);
160 if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) {
161 SyncResponseBegin(inMsg->GetSessionId());
162 }
163 break;
164 case MULTI_VER_DATA_SYNC_MESSAGE:
165 errCode = MultiVerDataPktRecvCallback(context_, inMsg);
166 break;
167 case VALUE_SLICE_SYNC_MESSAGE:
168 errCode = ValueSlicePktRecvCallback(context_, inMsg);
169 break;
170 default:
171 errCode = -E_NOT_SUPPORT;
172 break;
173 }
174 if (errCode == -E_LAST_SYNC_FRAME) {
175 SyncResponseEnd(inMsg->GetSessionId());
176 return errCode;
177 }
178 if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) {
179 Abort();
180 }
181 return errCode;
182 }
183
StepToTimeout(TimerId timerId)184 void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId)
185 {
186 {
187 std::lock_guard<std::mutex> lock(stateMachineLock_);
188 TimerId timer = syncContext_->GetTimerId();
189 if (timer != timerId) {
190 return;
191 }
192 currentState_ = SYNC_TIME_OUT;
193 }
194 Abort();
195 }
196
CommitHistorySyncStepInner(void)197 int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void)
198 {
199 int errCode = commitHistorySync_->SyncStart(context_);
200 if (errCode != E_OK) {
201 LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode);
202 }
203 return errCode;
204 }
205
MultiVerDataSyncStepInner(void)206 int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void)
207 {
208 return multiVerDataSync_->SyncStart(context_);
209 }
210
ValueSliceSyncStepInner(void)211 int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void)
212 {
213 return valueSliceSync_->SyncStart(context_);
214 }
215
SyncStepInnerLocked()216 void MultiVerSyncStateMachine::SyncStepInnerLocked()
217 {
218 if (context_->IncUsedCount() != E_OK) {
219 goto SYNC_STEP_OUT;
220 }
221
222 LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_);
223 int errCode;
224 {
225 std::lock_guard<std::mutex> lock(stateMachineLock_);
226 switch (currentState_) {
227 case COMMIT_HISTORY_SYNC:
228 errCode = CommitHistorySyncStepInner();
229 if (errCode != E_OK) {
230 Abort();
231 }
232 break;
233 case MULTI_VER_DATA_ENTRY_SYNC:
234 errCode = MultiVerDataSyncStepInner();
235 if (errCode == -E_NOT_FOUND) {
236 Finish();
237 goto SYNC_STEP_SAFE_OUT;
238 }
239 break;
240 case MULTI_VER_VALUE_SLICE_SYNC:
241 errCode = ValueSliceSyncStepInner();
242 if (errCode == -E_NOT_FOUND) {
243 int err = OneCommitSyncFinish();
244 if (err != E_OK) {
245 valueSliceSync_->SendFinishedRequest(context_);
246 Abort();
247 goto SYNC_STEP_SAFE_OUT;
248 }
249 currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
250 SyncStep();
251 goto SYNC_STEP_SAFE_OUT;
252 }
253 break;
254 default:
255 break;
256 }
257 }
258
259 SYNC_STEP_SAFE_OUT:
260 context_->SafeExit();
261
262 SYNC_STEP_OUT:
263 RefObject::DecObjRef(communicator_);
264 RefObject::DecObjRef(context_);
265 }
266
SyncStepInner()267 void MultiVerSyncStateMachine::SyncStepInner()
268 {
269 }
270
StartSyncInner()271 int MultiVerSyncStateMachine::StartSyncInner()
272 {
273 LOGI("[MultiVerSyncStateMachine] StartSync");
274 currentState_ = COMMIT_HISTORY_SYNC;
275 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
276 if (performance != nullptr) {
277 performance->TimeRecordStart();
278 }
279 int errCode = StartWatchDog();
280 if (errCode != E_OK) {
281 LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode);
282 return errCode;
283 }
284 SyncStep();
285 return E_OK;
286 }
287
AbortInner()288 void MultiVerSyncStateMachine::AbortInner()
289 {
290 context_->Clear();
291 StepToIdle();
292 ExecNextTask();
293 }
294
GetStateSwitchTables() const295 const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const
296 {
297 return stateSwitchTables_;
298 }
299
PrepareNextSyncTask()300 int MultiVerSyncStateMachine::PrepareNextSyncTask()
301 {
302 return StartSyncInner();
303 }
304
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)305 void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
306 {
307 (void)sessionId;
308 (void)sequenceId;
309 (void)inMsgId;
310 }
311
CommErrAbort(uint32_t sessionId)312 void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
313 {
314 (void)sessionId;
315 std::lock_guard<std::mutex> lock(stateMachineLock_);
316 Abort();
317 RefObject::DecObjRef(context_);
318 }
319
TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)320 int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg)
321 {
322 int errCode;
323 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
324 return -E_INVALID_ARGS;
325 }
326 switch (inMsg->GetMessageType()) {
327 case TYPE_REQUEST:
328 errCode = timeSync_->RequestRecv(inMsg);
329 return errCode;
330 case TYPE_RESPONSE:
331 errCode = timeSync_->AckRecv(inMsg);
332 if (errCode != E_OK) {
333 LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode);
334 }
335 return errCode;
336 default:
337 return -E_INVALID_ARGS;
338 }
339 }
340
CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)341 int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
342 {
343 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
344 return -E_INVALID_ARGS;
345 }
346 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
347 int errCode;
348 switch (inMsg->GetMessageType()) {
349 case TYPE_REQUEST:
350 if (performance != nullptr) {
351 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
352 }
353 return commitHistorySync_->RequestRecvCallback(context, inMsg);
354 case TYPE_RESPONSE:
355 if (performance != nullptr) {
356 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
357 }
358 errCode = commitHistorySync_->AckRecvCallback(context, inMsg);
359 if (errCode != E_OK) {
360 return errCode;
361 }
362 currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
363 SyncStep();
364 return errCode;
365 default:
366 return -E_INVALID_ARGS;
367 }
368 }
369
MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)370 int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
371 {
372 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
373 return -E_INVALID_ARGS;
374 }
375 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
376 int errCode;
377 switch (inMsg->GetMessageType()) {
378 case TYPE_REQUEST:
379 return multiVerDataSync_->RequestRecvCallback(context, inMsg);
380 case TYPE_RESPONSE:
381 if (performance != nullptr) {
382 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
383 }
384 errCode = multiVerDataSync_->AckRecvCallback(context, inMsg);
385 if (errCode != E_OK) {
386 multiVerDataSync_->SendFinishedRequest(context);
387 return errCode;
388 }
389 currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
390 SyncStep();
391 return errCode;
392 default:
393 return -E_INVALID_ARGS;
394 }
395 }
396
ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)397 int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
398 {
399 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
400 return -E_INVALID_ARGS;
401 }
402 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
403 int errCode;
404 switch (inMsg->GetMessageType()) {
405 case TYPE_REQUEST:
406 return valueSliceSync_->RequestRecvCallback(context, inMsg);
407 case TYPE_RESPONSE:
408 if (performance != nullptr) {
409 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
410 }
411 errCode = valueSliceSync_->AckRecvCallback(context, inMsg);
412 if (errCode != E_OK) {
413 valueSliceSync_->SendFinishedRequest(context);
414 return errCode;
415 }
416 currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
417 SyncStep();
418 return errCode;
419 default:
420 return -E_INVALID_ARGS;
421 }
422 }
423
Finish()424 void MultiVerSyncStateMachine::Finish()
425 {
426 MultiVerCommitNode commit;
427 int commitsSize = context_->GetCommitsSize();
428 if (commitsSize > 0) {
429 context_->GetCommit(commitsSize - 1, commit);
430 std::vector<MultiVerCommitNode> commits;
431 context_->GetCommits(commits);
432 LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId()));
433 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
434 if (performance != nullptr) {
435 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE);
436 }
437 int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits);
438 LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d",
439 STR_MASK(context_->GetDeviceId()), errCode);
440 if (performance != nullptr) {
441 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE);
442 }
443 }
444 RefObject::AutoLock lock(context_);
445 context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
446 StepToIdle();
447 ExecNextTask();
448 }
449
OneCommitSyncFinish()450 int MultiVerSyncStateMachine::OneCommitSyncFinish()
451 {
452 MultiVerCommitNode commit;
453 TimeOffset outOffset = 0;
454 int errCode = E_OK;
455 int commitIndex = context_->GetCommitIndex();
456
457 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()),
458 commitIndex);
459 if (commitIndex > 0) {
460 context_->GetCommit(commitIndex - 1, commit);
461 std::string deviceName = context_->GetDeviceId();
462 std::vector<MultiVerKvEntry *> entries;
463 context_->GetEntries(entries);
464 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu",
465 STR_MASK(context_->GetDeviceId()), entries.size());
466 errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME);
467 if (errCode != E_OK) {
468 LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode);
469 return errCode;
470 }
471 Timestamp currentLocalTime = context_->GetCurrentLocalTime();
472 commit.timestamp -= static_cast<Timestamp>(outOffset);
473
474 // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp
475 TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp -
476 static_cast<Timestamp>(currentLocalTime));
477 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64,
478 STR_MASK(context_->GetDeviceId()), timefixOffset);
479 commit.timestamp -= static_cast<Timestamp>(timefixOffset);
480 ChangeEntriesTimestamp(entries, outOffset, timefixOffset);
481 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
482 if (performance != nullptr) {
483 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
484 }
485 errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName);
486 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d",
487 STR_MASK(context_->GetDeviceId()), errCode);
488 if (performance != nullptr) {
489 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
490 }
491 if (errCode == E_OK) {
492 context_->ReleaseEntries();
493 }
494 }
495 DBCommon::PrintHexVector(commit.commitId, __LINE__);
496 return errCode;
497 }
498
IsPacketValid(const Message * inMsg) const499 bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
500 {
501 if (inMsg == nullptr) {
502 return false;
503 }
504
505 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) ||
506 (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) {
507 LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId());
508 return false;
509 }
510 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
511 return true;
512 }
513 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
514 if ((inMsg->GetSequenceId() != context_->GetSequenceId()) ||
515 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
516 LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d,"
517 "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(),
518 inMsg->GetSessionId(), context_->GetRequestSessionId());
519 return false;
520 }
521 }
522 return true;
523 }
524
Clear()525 void MultiVerSyncStateMachine::Clear()
526 {
527 commitHistorySync_ = nullptr;
528 multiVerDataSync_ = nullptr;
529 timeSync_ = nullptr;
530 valueSliceSync_ = nullptr;
531 multiVerStorage_ = nullptr;
532 context_ = nullptr;
533 }
534
SyncResponseBegin(uint32_t sessionId)535 void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId)
536 {
537 {
538 std::lock_guard<std::mutex> lock(responseInfosLock_);
539 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
540 return info.sessionId == sessionId;
541 });
542 if (iter != responseInfos_.end()) {
543 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit.");
544 return;
545 }
546 TimerAction timeOutCallback = [this](TimerId timerId) -> int { return SyncResponseTimeout(timerId); };
547 // To make sure context_ alive in timeout callback, we should IncObjRef for the context_.
548 RefObject::IncObjRef(context_);
549 TimerId timerId = 0;
550 int errCode = RuntimeContext::GetInstance()->SetTimer(
551 RESPONSE_TIME_OUT, timeOutCallback,
552 [this]() {
553 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(context_); });
554 if (ret != E_OK) {
555 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret);
556 }
557 },
558 timerId);
559 if (errCode != E_OK) {
560 LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode);
561 RefObject::DecObjRef(context_);
562 return;
563 }
564 ResponseInfo info{sessionId, timerId};
565 responseInfos_.push_back(info);
566 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin");
567 }
568 multiVerStorage_->NotifyStartSyncOperation();
569 }
570
SyncResponseEnd(uint32_t sessionId)571 void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId)
572 {
573 {
574 std::lock_guard<std::mutex> lock(responseInfosLock_);
575 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
576 return info.sessionId == sessionId;
577 });
578 if (iter == responseInfos_.end()) {
579 LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId);
580 return;
581 }
582 RuntimeContext::GetInstance()->RemoveTimer(iter->timerId);
583 responseInfos_.erase(iter);
584 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response");
585 }
586 multiVerStorage_->NotifyFinishSyncOperation();
587 }
588
SyncResponseTimeout(TimerId timerId)589 int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId)
590 {
591 uint32_t sessionId;
592 {
593 std::lock_guard<std::mutex> lock(responseInfosLock_);
594 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) {
595 return info.timerId == timerId;
596 });
597 if (iter == responseInfos_.end()) {
598 LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId);
599 return E_OK;
600 }
601 sessionId = iter->sessionId;
602 }
603 SyncResponseEnd(sessionId);
604 return E_OK;
605 }
606
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)607 bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
608 {
609 (void) inMsg;
610 (void) query;
611 return false;
612 }
613 }
614 #endif