1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OMIT_MULTI_VER 17 #include "multi_ver_sync_state_machine.h" 18 19 #include <cmath> 20 #include <climits> 21 #include <algorithm> 22 23 #include "message_transform.h" 24 #include "log_print.h" 25 #include "sync_types.h" 26 #include "db_common.h" 27 #include "ref_object.h" 28 #include "performance_analysis.h" 29 30 namespace DistributedDB { 31 namespace { ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32 void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset) 33 { 34 for (MultiVerKvEntry *entry : entries) { 35 if (entry == nullptr) { 36 continue; 37 } 38 Timestamp timestamp; 39 entry->GetTimestamp(timestamp); 40 timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset); 41 entry->SetTimestamp(timestamp); 42 } 43 } 44 } 45 std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_; MultiVerSyncStateMachine()46 MultiVerSyncStateMachine::MultiVerSyncStateMachine() 47 : context_(nullptr), 48 multiVerStorage_(nullptr), 49 timeSync_(nullptr), 50 commitHistorySync_(nullptr), 51 multiVerDataSync_(nullptr), 52 valueSliceSync_(nullptr) 53 { 54 } 55 ~MultiVerSyncStateMachine()56 MultiVerSyncStateMachine::~MultiVerSyncStateMachine() 57 { 58 Clear(); 59 } 60 Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61 int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, 62 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator) 63 { 64 if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) { 65 return -E_INVALID_ARGS; 66 } 67 int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator); 68 if (errCode != E_OK) { 69 return errCode; 70 } 71 72 timeSync_ = std::make_unique<TimeSync>(); 73 commitHistorySync_ = std::make_unique<CommitHistorySync>(); 74 multiVerDataSync_ = std::make_unique<MultiVerDataSync>(); 75 valueSliceSync_ = std::make_unique<ValueSliceSync>(); 76 77 errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId()); 78 if (errCode != E_OK) { 79 LOGE("timeSync_->Initialize failed err %d", errCode); 80 goto ERROR_OUT; 81 } 82 LOGD("timeSync_->Initialize OK"); 83 84 // init functions below will never fail 85 multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface); 86 commitHistorySync_->Initialize(multiVerStorage_, communicator); 87 multiVerDataSync_->Initialize(multiVerStorage_, communicator); 88 valueSliceSync_->Initialize(multiVerStorage_, communicator); 89 90 context_ = static_cast<MultiVerSyncTaskContext *>(context); 91 currentState_ = IDLE; 92 (void)timeSync_->SyncStart(); 93 return E_OK; 94 95 ERROR_OUT: 96 Clear(); 97 return errCode; 98 } 99 SyncStep()100 void MultiVerSyncStateMachine::SyncStep() 101 { 102 RefObject::IncObjRef(context_); 103 RefObject::IncObjRef(communicator_); 104 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); }); 105 if (errCode != E_OK) { 106 LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed"); 107 RefObject::DecObjRef(communicator_); 108 RefObject::DecObjRef(context_); 109 } 110 } 111 StepToIdle()112 void MultiVerSyncStateMachine::StepToIdle() 113 { 114 currentState_ = IDLE; 115 StopWatchDog(); 116 context_->Clear(); 117 PerformanceAnalysis::GetInstance()->TimeRecordEnd(); 118 LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId())); 119 } 120 MessageCallbackCheck(const Message * inMsg)121 int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg) 122 { 123 RefObject::AutoLock lock(context_); 124 if (context_->IsKilled()) { 125 return -E_OBJ_IS_KILLED; 126 } 127 if (!IsPacketValid(inMsg)) { 128 return -E_INVALID_ARGS; 129 } 130 if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) { 131 context_->IncSequenceId(); 132 int errCode = ResetWatchDog(); 133 if (errCode != E_OK) { 134 LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode); 135 } 136 } 137 return E_OK; 138 } 139 ReceiveMessageCallback(Message * inMsg)140 int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg) 141 { 142 if (inMsg == nullptr) { 143 return -E_INVALID_ARGS; 144 } 145 if (inMsg->IsFeedbackError()) { 146 LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo()); 147 return -static_cast<int>(inMsg->GetErrorNo()); 148 } 149 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) { 150 return TimeSyncPacketRecvCallback(context_, inMsg); 151 } 152 std::lock_guard<std::mutex> lock(stateMachineLock_); 153 int errCode = MessageCallbackCheck(inMsg); 154 if (errCode != E_OK) { 155 return errCode; 156 } 157 switch (inMsg->GetMessageId()) { 158 case COMMIT_HISTORY_SYNC_MESSAGE: 159 errCode = CommitHistorySyncPktRecvCallback(context_, inMsg); 160 if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) { 161 SyncResponseBegin(inMsg->GetSessionId()); 162 } 163 break; 164 case MULTI_VER_DATA_SYNC_MESSAGE: 165 errCode = MultiVerDataPktRecvCallback(context_, inMsg); 166 break; 167 case VALUE_SLICE_SYNC_MESSAGE: 168 errCode = ValueSlicePktRecvCallback(context_, inMsg); 169 break; 170 default: 171 errCode = -E_NOT_SUPPORT; 172 break; 173 } 174 if (errCode == -E_LAST_SYNC_FRAME) { 175 SyncResponseEnd(inMsg->GetSessionId()); 176 return errCode; 177 } 178 if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) { 179 Abort(); 180 } 181 return errCode; 182 } 183 StepToTimeout(TimerId timerId)184 void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId) 185 { 186 { 187 std::lock_guard<std::mutex> lock(stateMachineLock_); 188 TimerId timer = syncContext_->GetTimerId(); 189 if (timer != timerId) { 190 return; 191 } 192 currentState_ = SYNC_TIME_OUT; 193 } 194 Abort(); 195 } 196 CommitHistorySyncStepInner(void)197 int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void) 198 { 199 int errCode = commitHistorySync_->SyncStart(context_); 200 if (errCode != E_OK) { 201 LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode); 202 } 203 return errCode; 204 } 205 MultiVerDataSyncStepInner(void)206 int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void) 207 { 208 return multiVerDataSync_->SyncStart(context_); 209 } 210 ValueSliceSyncStepInner(void)211 int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void) 212 { 213 return valueSliceSync_->SyncStart(context_); 214 } 215 SyncStepInnerLocked()216 void MultiVerSyncStateMachine::SyncStepInnerLocked() 217 { 218 if (context_->IncUsedCount() != E_OK) { 219 goto SYNC_STEP_OUT; 220 } 221 222 LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_); 223 int errCode; 224 { 225 std::lock_guard<std::mutex> lock(stateMachineLock_); 226 switch (currentState_) { 227 case COMMIT_HISTORY_SYNC: 228 errCode = CommitHistorySyncStepInner(); 229 if (errCode != E_OK) { 230 Abort(); 231 } 232 break; 233 case MULTI_VER_DATA_ENTRY_SYNC: 234 errCode = MultiVerDataSyncStepInner(); 235 if (errCode == -E_NOT_FOUND) { 236 Finish(); 237 goto SYNC_STEP_SAFE_OUT; 238 } 239 break; 240 case MULTI_VER_VALUE_SLICE_SYNC: 241 errCode = ValueSliceSyncStepInner(); 242 if (errCode == -E_NOT_FOUND) { 243 int err = OneCommitSyncFinish(); 244 if (err != E_OK) { 245 valueSliceSync_->SendFinishedRequest(context_); 246 Abort(); 247 goto SYNC_STEP_SAFE_OUT; 248 } 249 currentState_ = MULTI_VER_DATA_ENTRY_SYNC; 250 SyncStep(); 251 goto SYNC_STEP_SAFE_OUT; 252 } 253 break; 254 default: 255 break; 256 } 257 } 258 259 SYNC_STEP_SAFE_OUT: 260 context_->SafeExit(); 261 262 SYNC_STEP_OUT: 263 RefObject::DecObjRef(communicator_); 264 RefObject::DecObjRef(context_); 265 } 266 SyncStepInner()267 void MultiVerSyncStateMachine::SyncStepInner() 268 { 269 } 270 StartSyncInner()271 int MultiVerSyncStateMachine::StartSyncInner() 272 { 273 LOGI("[MultiVerSyncStateMachine] StartSync"); 274 currentState_ = COMMIT_HISTORY_SYNC; 275 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 276 if (performance != nullptr) { 277 performance->TimeRecordStart(); 278 } 279 int errCode = StartWatchDog(); 280 if (errCode != E_OK) { 281 LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode); 282 return errCode; 283 } 284 SyncStep(); 285 return E_OK; 286 } 287 AbortInner()288 void MultiVerSyncStateMachine::AbortInner() 289 { 290 context_->Clear(); 291 StepToIdle(); 292 ExecNextTask(); 293 } 294 GetStateSwitchTables() const295 const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const 296 { 297 return stateSwitchTables_; 298 } 299 PrepareNextSyncTask()300 int MultiVerSyncStateMachine::PrepareNextSyncTask() 301 { 302 return StartSyncInner(); 303 } 304 SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)305 void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) 306 { 307 (void)sessionId; 308 (void)sequenceId; 309 (void)inMsgId; 310 } 311 CommErrAbort(uint32_t sessionId)312 void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId) 313 { 314 (void)sessionId; 315 std::lock_guard<std::mutex> lock(stateMachineLock_); 316 Abort(); 317 RefObject::DecObjRef(context_); 318 } 319 TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)320 int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg) 321 { 322 int errCode; 323 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) { 324 return -E_INVALID_ARGS; 325 } 326 switch (inMsg->GetMessageType()) { 327 case TYPE_REQUEST: 328 errCode = timeSync_->RequestRecv(inMsg); 329 return errCode; 330 case TYPE_RESPONSE: 331 errCode = timeSync_->AckRecv(inMsg); 332 if (errCode != E_OK) { 333 LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode); 334 } 335 return errCode; 336 default: 337 return -E_INVALID_ARGS; 338 } 339 } 340 CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)341 int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg) 342 { 343 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) { 344 return -E_INVALID_ARGS; 345 } 346 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 347 int errCode; 348 switch (inMsg->GetMessageType()) { 349 case TYPE_REQUEST: 350 if (performance != nullptr) { 351 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV); 352 } 353 return commitHistorySync_->RequestRecvCallback(context, inMsg); 354 case TYPE_RESPONSE: 355 if (performance != nullptr) { 356 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV); 357 } 358 errCode = commitHistorySync_->AckRecvCallback(context, inMsg); 359 if (errCode != E_OK) { 360 return errCode; 361 } 362 currentState_ = MULTI_VER_DATA_ENTRY_SYNC; 363 SyncStep(); 364 return errCode; 365 default: 366 return -E_INVALID_ARGS; 367 } 368 } 369 MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)370 int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg) 371 { 372 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) { 373 return -E_INVALID_ARGS; 374 } 375 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 376 int errCode; 377 switch (inMsg->GetMessageType()) { 378 case TYPE_REQUEST: 379 return multiVerDataSync_->RequestRecvCallback(context, inMsg); 380 case TYPE_RESPONSE: 381 if (performance != nullptr) { 382 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV); 383 } 384 errCode = multiVerDataSync_->AckRecvCallback(context, inMsg); 385 if (errCode != E_OK) { 386 multiVerDataSync_->SendFinishedRequest(context); 387 return errCode; 388 } 389 currentState_ = MULTI_VER_VALUE_SLICE_SYNC; 390 SyncStep(); 391 return errCode; 392 default: 393 return -E_INVALID_ARGS; 394 } 395 } 396 ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)397 int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg) 398 { 399 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) { 400 return -E_INVALID_ARGS; 401 } 402 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 403 int errCode; 404 switch (inMsg->GetMessageType()) { 405 case TYPE_REQUEST: 406 return valueSliceSync_->RequestRecvCallback(context, inMsg); 407 case TYPE_RESPONSE: 408 if (performance != nullptr) { 409 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV); 410 } 411 errCode = valueSliceSync_->AckRecvCallback(context, inMsg); 412 if (errCode != E_OK) { 413 valueSliceSync_->SendFinishedRequest(context); 414 return errCode; 415 } 416 currentState_ = MULTI_VER_VALUE_SLICE_SYNC; 417 SyncStep(); 418 return errCode; 419 default: 420 return -E_INVALID_ARGS; 421 } 422 } 423 Finish()424 void MultiVerSyncStateMachine::Finish() 425 { 426 MultiVerCommitNode commit; 427 int commitsSize = context_->GetCommitsSize(); 428 if (commitsSize > 0) { 429 context_->GetCommit(commitsSize - 1, commit); 430 std::vector<MultiVerCommitNode> commits; 431 context_->GetCommits(commits); 432 LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId())); 433 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 434 if (performance != nullptr) { 435 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE); 436 } 437 int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits); 438 LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d", 439 STR_MASK(context_->GetDeviceId()), errCode); 440 if (performance != nullptr) { 441 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE); 442 } 443 } 444 RefObject::AutoLock lock(context_); 445 context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL); 446 StepToIdle(); 447 ExecNextTask(); 448 } 449 OneCommitSyncFinish()450 int MultiVerSyncStateMachine::OneCommitSyncFinish() 451 { 452 MultiVerCommitNode commit; 453 TimeOffset outOffset = 0; 454 int errCode = E_OK; 455 int commitIndex = context_->GetCommitIndex(); 456 457 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()), 458 commitIndex); 459 if (commitIndex > 0) { 460 context_->GetCommit(commitIndex - 1, commit); 461 std::string deviceName = context_->GetDeviceId(); 462 std::vector<MultiVerKvEntry *> entries; 463 context_->GetEntries(entries); 464 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu", 465 STR_MASK(context_->GetDeviceId()), entries.size()); 466 errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME); 467 if (errCode != E_OK) { 468 LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode); 469 return errCode; 470 } 471 Timestamp currentLocalTime = context_->GetCurrentLocalTime(); 472 commit.timestamp -= static_cast<Timestamp>(outOffset); 473 474 // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp 475 TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp - 476 static_cast<Timestamp>(currentLocalTime)); 477 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64, 478 STR_MASK(context_->GetDeviceId()), timefixOffset); 479 commit.timestamp -= static_cast<Timestamp>(timefixOffset); 480 ChangeEntriesTimestamp(entries, outOffset, timefixOffset); 481 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance(); 482 if (performance != nullptr) { 483 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA); 484 } 485 errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName); 486 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d", 487 STR_MASK(context_->GetDeviceId()), errCode); 488 if (performance != nullptr) { 489 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA); 490 } 491 if (errCode == E_OK) { 492 context_->ReleaseEntries(); 493 } 494 } 495 DBCommon::PrintHexVector(commit.commitId, __LINE__); 496 return errCode; 497 } 498 IsPacketValid(const Message * inMsg) const499 bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const 500 { 501 if (inMsg == nullptr) { 502 return false; 503 } 504 505 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) || 506 (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) { 507 LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId()); 508 return false; 509 } 510 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) { 511 return true; 512 } 513 if (inMsg->GetMessageType() == TYPE_RESPONSE) { 514 if ((inMsg->GetSequenceId() != context_->GetSequenceId()) || 515 (inMsg->GetSessionId() != context_->GetRequestSessionId())) { 516 LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d," 517 "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(), 518 inMsg->GetSessionId(), context_->GetRequestSessionId()); 519 return false; 520 } 521 } 522 return true; 523 } 524 Clear()525 void MultiVerSyncStateMachine::Clear() 526 { 527 commitHistorySync_ = nullptr; 528 multiVerDataSync_ = nullptr; 529 timeSync_ = nullptr; 530 valueSliceSync_ = nullptr; 531 multiVerStorage_ = nullptr; 532 context_ = nullptr; 533 } 534 SyncResponseBegin(uint32_t sessionId)535 void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId) 536 { 537 { 538 std::lock_guard<std::mutex> lock(responseInfosLock_); 539 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) { 540 return info.sessionId == sessionId; 541 }); 542 if (iter != responseInfos_.end()) { 543 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit."); 544 return; 545 } 546 TimerAction timeOutCallback = [this](TimerId timerId) -> int { return SyncResponseTimeout(timerId); }; 547 // To make sure context_ alive in timeout callback, we should IncObjRef for the context_. 548 RefObject::IncObjRef(context_); 549 TimerId timerId = 0; 550 int errCode = RuntimeContext::GetInstance()->SetTimer( 551 RESPONSE_TIME_OUT, timeOutCallback, 552 [this]() { 553 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(context_); }); 554 if (ret != E_OK) { 555 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret); 556 } 557 }, 558 timerId); 559 if (errCode != E_OK) { 560 LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode); 561 RefObject::DecObjRef(context_); 562 return; 563 } 564 ResponseInfo info{sessionId, timerId}; 565 responseInfos_.push_back(info); 566 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin"); 567 } 568 multiVerStorage_->NotifyStartSyncOperation(); 569 } 570 SyncResponseEnd(uint32_t sessionId)571 void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId) 572 { 573 { 574 std::lock_guard<std::mutex> lock(responseInfosLock_); 575 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) { 576 return info.sessionId == sessionId; 577 }); 578 if (iter == responseInfos_.end()) { 579 LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId); 580 return; 581 } 582 RuntimeContext::GetInstance()->RemoveTimer(iter->timerId); 583 responseInfos_.erase(iter); 584 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response"); 585 } 586 multiVerStorage_->NotifyFinishSyncOperation(); 587 } 588 SyncResponseTimeout(TimerId timerId)589 int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId) 590 { 591 uint32_t sessionId; 592 { 593 std::lock_guard<std::mutex> lock(responseInfosLock_); 594 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) { 595 return info.timerId == timerId; 596 }); 597 if (iter == responseInfos_.end()) { 598 LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId); 599 return E_OK; 600 } 601 sessionId = iter->sessionId; 602 } 603 SyncResponseEnd(sessionId); 604 return E_OK; 605 } 606 IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)607 bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) 608 { 609 (void) inMsg; 610 (void) query; 611 return false; 612 } 613 } 614 #endif