1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_state_machine.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "version.h"
22
23 namespace DistributedDB {
SyncStateMachine()24 SyncStateMachine::SyncStateMachine()
25 : syncContext_(nullptr),
26 storageInterface_(nullptr),
27 communicator_(nullptr),
28 metadata_(nullptr),
29 currentState_(0),
30 watchDogStarted_(false),
31 currentSyncProctolVersion_(SINGLE_VER_SYNC_PROCTOL_V3),
32 saveDataNotifyTimerId_(0),
33 saveDataNotifyCount_(0),
34 waitingResetLockBySaveData_(false),
35 saveDataNotifyRefCount_(0),
36 getDataNotifyTimerId_(0),
37 getDataNotifyCount_(0)
38 {
39 }
40
~SyncStateMachine()41 SyncStateMachine::~SyncStateMachine()
42 {
43 syncContext_ = nullptr;
44 storageInterface_ = nullptr;
45 watchDogStarted_ = false;
46 metadata_ = nullptr;
47 if (communicator_ != nullptr) {
48 RefObject::DecObjRef(communicator_);
49 communicator_ = nullptr;
50 }
51 }
52
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)53 int SyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
54 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
55 {
56 if ((context == nullptr) || (syncInterface == nullptr) || (metadata == nullptr) || (communicator == nullptr)) {
57 return -E_INVALID_ARGS;
58 }
59 syncContext_ = context;
60 storageInterface_ = syncInterface;
61 metadata_ = metadata;
62 RefObject::IncObjRef(communicator);
63 communicator_ = communicator;
64 return E_OK;
65 }
66
StartSync()67 int SyncStateMachine::StartSync()
68 {
69 int errCode = syncContext_->IncUsedCount();
70 if (errCode != E_OK) {
71 return errCode;
72 }
73 std::lock_guard<std::mutex> lock(stateMachineLock_);
74 errCode = StartSyncInner();
75 syncContext_->SafeExit();
76 return errCode;
77 }
78
TimeoutCallback(TimerId timerId)79 int SyncStateMachine::TimeoutCallback(TimerId timerId)
80 {
81 RefObject::AutoLock lock(syncContext_);
82 if (syncContext_->IsKilled()) {
83 return -E_OBJ_IS_KILLED;
84 }
85 TimerId timer = syncContext_->GetTimerId();
86 if (timer != timerId) {
87 return -E_UNEXPECTED_DATA;
88 }
89
90 int retryTime = syncContext_->GetRetryTime();
91 if (retryTime >= syncContext_->GetSyncRetryTimes() || !syncContext_->IsSyncTaskNeedRetry()) {
92 LOGI("[SyncStateMachine][Timeout] TimeoutCallback retryTime:%d", retryTime);
93 syncContext_->UnlockObj();
94 StepToTimeout(timerId);
95 syncContext_->LockObj();
96 return E_OK;
97 }
98 retryTime++;
99 syncContext_->SetRetryTime(retryTime);
100 // the sequenceid will be managed by dataSync slide windows.
101 syncContext_->SetRetryStatus(SyncTaskContext::NEED_RETRY);
102 int timeoutTime = syncContext_->GetSyncRetryTimeout(retryTime);
103 syncContext_->ModifyTimer(timeoutTime);
104 LOGI("[SyncStateMachine][Timeout] Schedule task, timeoutTime = %d, retryTime = %d", timeoutTime, retryTime);
105 SyncStep();
106 return E_OK;
107 }
108
Abort()109 void SyncStateMachine::Abort()
110 {
111 RefObject::IncObjRef(syncContext_);
112 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
113 this->AbortImmediately();
114 RefObject::DecObjRef(this->syncContext_);
115 });
116 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
117 LOGE("[SyncStateMachine][Abort] Abort failed, errCode %d", errCode);
118 RefObject::DecObjRef(syncContext_);
119 }
120 }
121
AbortImmediately()122 void SyncStateMachine::AbortImmediately()
123 {
124 std::lock_guard<std::mutex> lock(stateMachineLock_);
125 AbortInner();
126 StopWatchDog();
127 currentState_ = 0;
128 }
129
SwitchMachineState(uint8_t event)130 int SyncStateMachine::SwitchMachineState(uint8_t event)
131 {
132 const std::vector<StateSwitchTable> &tables = GetStateSwitchTables();
133 auto tableIter = std::find_if(tables.begin(), tables.end(),
134 [this](const StateSwitchTable &table) {
135 return table.version <= currentSyncProctolVersion_;
136 });
137 if (tableIter == tables.end()) {
138 LOGE("[SyncStateMachine][SwitchState] Can't find a compatible version by version %u",
139 currentSyncProctolVersion_);
140 return -E_NOT_FOUND;
141 }
142
143 const std::map<uint8_t, EventToState> &table = (*tableIter).switchTable;
144 auto eventToStateIter = table.find(currentState_);
145 if (eventToStateIter == table.end()) {
146 LOGW("[SyncStateMachine][SwitchState] Can't find EventToState with currentSate %u",
147 currentState_);
148 SetCurStateErrStatus();
149 return E_OK;
150 }
151
152 const EventToState &eventToState = eventToStateIter->second;
153 auto stateIter = eventToState.find(event);
154 if (stateIter == eventToState.end()) {
155 LOGD("[SyncStateMachine][SwitchState] Can't find event %u int currentSate %u ignore",
156 event, currentState_);
157 return -E_NOT_FOUND;
158 }
159
160 currentState_ = stateIter->second;
161 LOGD("[SyncStateMachine][SwitchState] from state %u move to state %u with event %u dev %s{private}",
162 eventToStateIter->first, currentState_, event, syncContext_->GetDeviceId().c_str());
163 return E_OK;
164 }
165
SwitchStateAndStep(uint8_t event)166 void SyncStateMachine::SwitchStateAndStep(uint8_t event)
167 {
168 if (SwitchMachineState(event) == E_OK) {
169 SyncStepInner();
170 }
171 }
172
ExecNextTask()173 int SyncStateMachine::ExecNextTask()
174 {
175 syncContext_->Clear();
176 while (!syncContext_->IsTargetQueueEmpty()) {
177 int errCode = syncContext_->GetNextTarget();
178 if (errCode != E_OK) {
179 continue;
180 }
181 if (syncContext_->IsCurrentSyncTaskCanBeSkipped()) {
182 syncContext_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
183 syncContext_->Clear();
184 continue;
185 }
186 errCode = PrepareNextSyncTask();
187 if (errCode != E_OK) {
188 LOGE("[SyncStateMachine] PrepareSync failed");
189 syncContext_->SetOperationStatus(SyncOperation::OP_FAILED);
190 syncContext_->Clear();
191 continue; // try to execute next task
192 }
193 return errCode;
194 }
195 syncContext_->SetTaskExecStatus(ISyncTaskContext::FINISHED);
196 // no task left
197 LOGD("[SyncStateMachine] All sync task finished!");
198 return -E_NO_SYNC_TASK;
199 }
200
StartWatchDog()201 int SyncStateMachine::StartWatchDog()
202 {
203 int errCode = syncContext_->StartTimer();
204 if (errCode == E_OK) {
205 watchDogStarted_ = true;
206 }
207 return errCode;
208 }
209
ResetWatchDog()210 int SyncStateMachine::ResetWatchDog()
211 {
212 if (!watchDogStarted_) {
213 return E_OK;
214 }
215 LOGD("[SyncStateMachine][WatchDog] ResetWatchDog.");
216 syncContext_->StopTimer();
217 syncContext_->SetRetryTime(0);
218 return syncContext_->StartTimer();
219 }
220
StopWatchDog()221 void SyncStateMachine::StopWatchDog()
222 {
223 watchDogStarted_ = false;
224 LOGD("[SyncStateMachine][WatchDog] StopWatchDog.");
225 syncContext_->StopTimer();
226 }
227
StartSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)228 bool SyncStateMachine::StartSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
229 {
230 std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
231 saveDataNotifyRefCount_++;
232 if (saveDataNotifyTimerId_ > 0) {
233 saveDataNotifyCount_ = 0;
234 LOGW("[SyncStateMachine][SaveDataNotify] timer has been started!");
235 return true;
236 }
237
238 // Incref to make sure context still alive before timer stopped.
239 RefObject::IncObjRef(syncContext_);
240 int errCode = RuntimeContext::GetInstance()->SetTimer(
241 DATA_NOTIFY_INTERVAL,
242 [this, sessionId, sequenceId, inMsgId](TimerId timerId) {
243 RefObject::IncObjRef(syncContext_);
244 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, sequenceId, inMsgId]() {
245 DoSaveDataNotify(sessionId, sequenceId, inMsgId);
246 RefObject::DecObjRef(syncContext_);
247 });
248 if (ret != E_OK) {
249 LOGE("[SyncStateMachine] [DoSaveDataNotify] ScheduleTask failed errCode %d", ret);
250 RefObject::DecObjRef(syncContext_);
251 }
252 return ret;
253 },
254 [this]() { RefObject::DecObjRef(syncContext_); },
255 saveDataNotifyTimerId_);
256 if (errCode != E_OK) {
257 LOGW("[SyncStateMachine][SaveDataNotify] start timer failed err %d !", errCode);
258 saveDataNotifyRefCount_--;
259 return false;
260 }
261 return true;
262 }
263
StopSaveDataNotify()264 void SyncStateMachine::StopSaveDataNotify()
265 {
266 std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
267 StopSaveDataNotifyNoLock();
268 }
269
StopSaveDataNotifyNoLock()270 void SyncStateMachine::StopSaveDataNotifyNoLock()
271 {
272 if (saveDataNotifyTimerId_ == 0) {
273 LOGI("[SyncStateMachine][SaveDataNotify] timer is not started!");
274 return;
275 }
276 saveDataNotifyRefCount_--;
277 if (saveDataNotifyRefCount_ > 0) {
278 return;
279 }
280 RuntimeContext::GetInstance()->RemoveTimer(saveDataNotifyTimerId_);
281 saveDataNotifyTimerId_ = 0;
282 saveDataNotifyCount_ = 0;
283 saveDataNotifyRefCount_ = 0;
284 }
285
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)286 bool SyncStateMachine::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
287 {
288 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
289 LOGE("[SyncStateMachine][feedDog] start wrong flag:%d", flag);
290 return false;
291 }
292
293 uint8_t cnt = GetFeedDogTimeout(time / DATA_NOTIFY_INTERVAL);
294 LOGI("[SyncStateMachine][feedDog] start cnt:%d, flag:%d", cnt, flag);
295
296 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
297 watchDogController_[flag].refCount++;
298 LOGD("af incr refCount = %d", watchDogController_[flag].refCount);
299
300 if (watchDogController_[flag].feedDogTimerId > 0) {
301 // update the upperLimit, if the new cnt is bigger then last upperLimit
302 if (cnt > watchDogController_[flag].feedDogUpperLimit) {
303 LOGD("update feedDogUpperLimit = %d", cnt);
304 watchDogController_[flag].feedDogUpperLimit = cnt;
305 }
306 watchDogController_[flag].feedDogCnt = 0u;
307 LOGW("[SyncStateMachine][feedDog] timer has been started!, flag:%d", flag);
308 return false;
309 }
310
311 // Incref to make sure context still alive before timer stopped.
312 RefObject::IncObjRef(syncContext_);
313 watchDogController_[flag].feedDogUpperLimit = cnt;
314 int errCode = RuntimeContext::GetInstance()->SetTimer(
315 DATA_NOTIFY_INTERVAL,
316 [this, flag](TimerId timerId) {
317 RefObject::IncObjRef(syncContext_);
318 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, flag]() {
319 DoFeedDogForSync(flag);
320 RefObject::DecObjRef(syncContext_);
321 });
322 if (ret != E_OK) {
323 LOGE("[SyncStateMachine] [DoFeedDogForSync] ScheduleTask failed errCode %d", ret);
324 RefObject::DecObjRef(syncContext_);
325 }
326 return ret;
327 },
328 [this]() { RefObject::DecObjRef(syncContext_); },
329 watchDogController_[flag].feedDogTimerId);
330 if (errCode != E_OK) {
331 LOGW("[SyncStateMachine][feedDog] start timer failed err %d !", errCode);
332 return false;
333 }
334 return true;
335 }
336
GetFeedDogTimeout(int timeoutCount) const337 uint8_t SyncStateMachine::GetFeedDogTimeout(int timeoutCount) const
338 {
339 if (timeoutCount > UINT8_MAX) {
340 return UINT8_MAX;
341 }
342 return timeoutCount;
343 }
344
StopFeedDogForSync(SyncDirectionFlag flag)345 void SyncStateMachine::StopFeedDogForSync(SyncDirectionFlag flag)
346 {
347 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
348 LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
349 return;
350 }
351 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
352 StopFeedDogForSyncNoLock(flag);
353 }
354
StopFeedDogForSyncNoLock(SyncDirectionFlag flag)355 void SyncStateMachine::StopFeedDogForSyncNoLock(SyncDirectionFlag flag)
356 {
357 if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
358 LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
359 return;
360 }
361 if (watchDogController_[flag].feedDogTimerId == 0) {
362 return;
363 }
364 LOGI("[SyncStateMachine][feedDog] stop flag:%d", flag);
365 RuntimeContext::GetInstance()->RemoveTimer(watchDogController_[flag].feedDogTimerId);
366 watchDogController_[flag].feedDogTimerId = 0;
367 watchDogController_[flag].feedDogCnt = 0;
368 watchDogController_[flag].refCount = 0;
369 }
370
SetCurStateErrStatus()371 void SyncStateMachine::SetCurStateErrStatus()
372 {
373 }
374
DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)375 void SyncStateMachine::DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)
376 {
377 std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
378 if (watchDogController_[flag].feedDogTimerId == 0) {
379 return;
380 }
381 if (--watchDogController_[flag].refCount <= 0) {
382 LOGD("stop feed dog timer, refcount = %d", watchDogController_[flag].refCount);
383 StopFeedDogForSyncNoLock(flag);
384 }
385 LOGD("af dec refcount = %d", watchDogController_[flag].refCount);
386 }
387
DoSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)388 void SyncStateMachine::DoSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
389 {
390 // we send notify packet at first, because it will cost a lot of time to get machine lock
391 {
392 std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
393 if (saveDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
394 StopSaveDataNotifyNoLock();
395 return;
396 }
397 SendNotifyPacket(sessionId, sequenceId, inMsgId);
398 saveDataNotifyCount_++;
399 if (waitingResetLockBySaveData_) {
400 return;
401 }
402 waitingResetLockBySaveData_ = true;
403 }
404 std::lock_guard<std::mutex> lock(stateMachineLock_);
405 {
406 std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
407 waitingResetLockBySaveData_ = false;
408 }
409 (void)ResetWatchDog();
410 }
411
DoFeedDogForSync(SyncDirectionFlag flag)412 void SyncStateMachine::DoFeedDogForSync(SyncDirectionFlag flag)
413 {
414 {
415 std::lock_guard<std::mutex> lock(stateMachineLock_);
416 (void)ResetWatchDog();
417 }
418 std::lock_guard<std::mutex> innerLock(feedDogLock_[flag]);
419 if (watchDogController_[flag].feedDogCnt >= watchDogController_[flag].feedDogUpperLimit) { // LCOV_EXCL_BR_LINE
420 StopFeedDogForSyncNoLock(flag);
421 return;
422 }
423 watchDogController_[flag].feedDogCnt++;
424 }
425
InnerErrorAbort(uint32_t sessionId)426 void SyncStateMachine::InnerErrorAbort(uint32_t sessionId)
427 {
428 // do nothing
429 (void) sessionId;
430 }
431
NotifyClosing()432 void SyncStateMachine::NotifyClosing()
433 {
434 // do nothing
435 }
436
StartFeedDogForGetData(uint32_t sessionId)437 void SyncStateMachine::StartFeedDogForGetData(uint32_t sessionId)
438 {
439 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
440 if (getDataNotifyTimerId_ > 0) {
441 getDataNotifyCount_ = 0;
442 LOGW("[SyncStateMachine][StartFeedDogForGetData] timer has been started!");
443 }
444
445 // Incref to make sure context still alive before timer stopped.
446 RefObject::IncObjRef(syncContext_);
447 int errCode = RuntimeContext::GetInstance()->SetTimer(
448 DATA_NOTIFY_INTERVAL,
449 [this, sessionId](TimerId timerId) {
450 RefObject::IncObjRef(syncContext_);
451 int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, timerId]() {
452 DoGetAndSendDataNotify(sessionId);
453 int getDataNotifyCount = 0;
454 {
455 std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
456 getDataNotifyCount = getDataNotifyCount_;
457 }
458 if (getDataNotifyCount >= MAX_DATA_NOTIFY_COUNT) {
459 StopFeedDogForGetDataInner(timerId);
460 }
461 RefObject::DecObjRef(syncContext_);
462 });
463 if (ret != E_OK) {
464 LOGE("[SyncStateMachine] [StartFeedDogForGetData] ScheduleTask failed errCode %d", ret);
465 RefObject::DecObjRef(syncContext_);
466 }
467 return ret;
468 },
469 [this]() { RefObject::DecObjRef(syncContext_); },
470 getDataNotifyTimerId_);
471 if (errCode != E_OK) {
472 LOGW("[SyncStateMachine][StartFeedDogForGetData] start timer failed err %d !", errCode);
473 }
474 }
475
StopFeedDogForGetData()476 void SyncStateMachine::StopFeedDogForGetData()
477 {
478 TimerId timerId = 0;
479 {
480 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
481 timerId = getDataNotifyTimerId_;
482 }
483 if (timerId == 0) {
484 return;
485 }
486 StopFeedDogForGetDataInner(timerId);
487 }
488
DoGetAndSendDataNotify(uint32_t sessionId)489 void SyncStateMachine::DoGetAndSendDataNotify(uint32_t sessionId)
490 {
491 (void)ResetWatchDog();
492 std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
493 if (getDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
494 return;
495 }
496 if (sessionId != 0) {
497 SendNotifyPacket(sessionId, 0, DATA_SYNC_MESSAGE);
498 }
499 getDataNotifyCount_++;
500 }
501
StopFeedDogForGetDataInner(TimerId timerId)502 void SyncStateMachine::StopFeedDogForGetDataInner(TimerId timerId)
503 {
504 std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
505 if (getDataNotifyTimerId_ == 0 || getDataNotifyTimerId_ != timerId) {
506 return;
507 }
508 RuntimeContext::GetInstance()->RemoveTimer(timerId);
509 getDataNotifyTimerId_ = 0;
510 getDataNotifyCount_ = 0;
511 }
512
SchemaChange()513 void SyncStateMachine::SchemaChange()
514 {
515 }
516
TimeChange()517 void SyncStateMachine::TimeChange()
518 {
519 }
520 } // namespace DistributedDB
521