1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_state_machine.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "version.h"
22 
23 namespace DistributedDB {
SyncStateMachine()24 SyncStateMachine::SyncStateMachine()
25     : syncContext_(nullptr),
26       storageInterface_(nullptr),
27       communicator_(nullptr),
28       metadata_(nullptr),
29       currentState_(0),
30       watchDogStarted_(false),
31       currentSyncProctolVersion_(SINGLE_VER_SYNC_PROCTOL_V3),
32       saveDataNotifyTimerId_(0),
33       saveDataNotifyCount_(0),
34       waitingResetLockBySaveData_(false),
35       saveDataNotifyRefCount_(0),
36       getDataNotifyTimerId_(0),
37       getDataNotifyCount_(0)
38 {
39 }
40 
~SyncStateMachine()41 SyncStateMachine::~SyncStateMachine()
42 {
43     syncContext_ = nullptr;
44     storageInterface_ = nullptr;
45     watchDogStarted_ = false;
46     metadata_ = nullptr;
47     if (communicator_ != nullptr) {
48         RefObject::DecObjRef(communicator_);
49         communicator_ = nullptr;
50     }
51 }
52 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)53 int SyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
54     const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
55 {
56     if ((context == nullptr) || (syncInterface == nullptr) || (metadata == nullptr) || (communicator == nullptr)) {
57         return -E_INVALID_ARGS;
58     }
59     syncContext_ = context;
60     storageInterface_ = syncInterface;
61     metadata_ = metadata;
62     RefObject::IncObjRef(communicator);
63     communicator_ = communicator;
64     return E_OK;
65 }
66 
StartSync()67 int SyncStateMachine::StartSync()
68 {
69     int errCode = syncContext_->IncUsedCount();
70     if (errCode != E_OK) {
71         return errCode;
72     }
73     std::lock_guard<std::mutex> lock(stateMachineLock_);
74     errCode = StartSyncInner();
75     syncContext_->SafeExit();
76     return errCode;
77 }
78 
TimeoutCallback(TimerId timerId)79 int SyncStateMachine::TimeoutCallback(TimerId timerId)
80 {
81     RefObject::AutoLock lock(syncContext_);
82     if (syncContext_->IsKilled()) {
83         return -E_OBJ_IS_KILLED;
84     }
85     TimerId timer = syncContext_->GetTimerId();
86     if (timer != timerId) {
87         return -E_UNEXPECTED_DATA;
88     }
89 
90     int retryTime = syncContext_->GetRetryTime();
91     if (retryTime >= syncContext_->GetSyncRetryTimes() || !syncContext_->IsSyncTaskNeedRetry()) {
92         LOGI("[SyncStateMachine][Timeout] TimeoutCallback retryTime:%d", retryTime);
93         syncContext_->UnlockObj();
94         StepToTimeout(timerId);
95         syncContext_->LockObj();
96         return E_OK;
97     }
98     retryTime++;
99     syncContext_->SetRetryTime(retryTime);
100     // the sequenceid will be managed by dataSync slide windows.
101     syncContext_->SetRetryStatus(SyncTaskContext::NEED_RETRY);
102     int timeoutTime = syncContext_->GetSyncRetryTimeout(retryTime);
103     syncContext_->ModifyTimer(timeoutTime);
104     LOGI("[SyncStateMachine][Timeout] Schedule task, timeoutTime = %d, retryTime = %d", timeoutTime, retryTime);
105     SyncStep();
106     return E_OK;
107 }
108 
Abort()109 void SyncStateMachine::Abort()
110 {
111     RefObject::IncObjRef(syncContext_);
112     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
113         this->AbortImmediately();
114         RefObject::DecObjRef(this->syncContext_);
115     });
116     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
117         LOGE("[SyncStateMachine][Abort] Abort failed, errCode %d", errCode);
118         RefObject::DecObjRef(syncContext_);
119     }
120 }
121 
AbortImmediately()122 void SyncStateMachine::AbortImmediately()
123 {
124     std::lock_guard<std::mutex> lock(stateMachineLock_);
125     AbortInner();
126     StopWatchDog();
127     currentState_ = 0;
128 }
129 
SwitchMachineState(uint8_t event)130 int SyncStateMachine::SwitchMachineState(uint8_t event)
131 {
132     const std::vector<StateSwitchTable> &tables = GetStateSwitchTables();
133     auto tableIter = std::find_if(tables.begin(), tables.end(),
134         [this](const StateSwitchTable &table) {
135             return table.version <= currentSyncProctolVersion_;
136         });
137     if (tableIter == tables.end()) {
138         LOGE("[SyncStateMachine][SwitchState] Can't find a compatible version by version %u",
139             currentSyncProctolVersion_);
140         return -E_NOT_FOUND;
141     }
142 
143     const std::map<uint8_t, EventToState> &table = (*tableIter).switchTable;
144     auto eventToStateIter = table.find(currentState_);
145     if (eventToStateIter == table.end()) {
146         LOGW("[SyncStateMachine][SwitchState] Can't find EventToState with currentSate %u",
147             currentState_);
148         SetCurStateErrStatus();
149         return E_OK;
150     }
151 
152     const EventToState &eventToState = eventToStateIter->second;
153     auto stateIter = eventToState.find(event);
154     if (stateIter == eventToState.end()) {
155         LOGD("[SyncStateMachine][SwitchState] Can't find event %u int currentSate %u ignore",
156             event, currentState_);
157         return -E_NOT_FOUND;
158     }
159 
160     currentState_ = stateIter->second;
161     LOGD("[SyncStateMachine][SwitchState] from state %u move to state %u with event %u dev %s{private}",
162         eventToStateIter->first, currentState_, event, syncContext_->GetDeviceId().c_str());
163     return E_OK;
164 }
165 
SwitchStateAndStep(uint8_t event)166 void SyncStateMachine::SwitchStateAndStep(uint8_t event)
167 {
168     if (SwitchMachineState(event) == E_OK) {
169         SyncStepInner();
170     }
171 }
172 
ExecNextTask()173 int SyncStateMachine::ExecNextTask()
174 {
175     syncContext_->Clear();
176     while (!syncContext_->IsTargetQueueEmpty()) {
177         int errCode = syncContext_->GetNextTarget();
178         if (errCode != E_OK) {
179             continue;
180         }
181         if (syncContext_->IsCurrentSyncTaskCanBeSkipped()) {
182             syncContext_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
183             syncContext_->Clear();
184             continue;
185         }
186         errCode = PrepareNextSyncTask();
187         if (errCode != E_OK) {
188             LOGE("[SyncStateMachine] PrepareSync failed");
189             syncContext_->SetOperationStatus(SyncOperation::OP_FAILED);
190             syncContext_->Clear();
191             continue; // try to execute next task
192         }
193         return errCode;
194     }
195     syncContext_->SetTaskExecStatus(ISyncTaskContext::FINISHED);
196     // no task left
197     LOGD("[SyncStateMachine] All sync task finished!");
198     return -E_NO_SYNC_TASK;
199 }
200 
StartWatchDog()201 int SyncStateMachine::StartWatchDog()
202 {
203     int errCode = syncContext_->StartTimer();
204     if (errCode == E_OK) {
205         watchDogStarted_ = true;
206     }
207     return errCode;
208 }
209 
ResetWatchDog()210 int SyncStateMachine::ResetWatchDog()
211 {
212     if (!watchDogStarted_) {
213         return E_OK;
214     }
215     LOGD("[SyncStateMachine][WatchDog] ResetWatchDog.");
216     syncContext_->StopTimer();
217     syncContext_->SetRetryTime(0);
218     return syncContext_->StartTimer();
219 }
220 
StopWatchDog()221 void SyncStateMachine::StopWatchDog()
222 {
223     watchDogStarted_ = false;
224     LOGD("[SyncStateMachine][WatchDog] StopWatchDog.");
225     syncContext_->StopTimer();
226 }
227 
StartSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)228 bool SyncStateMachine::StartSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
229 {
230     std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
231     saveDataNotifyRefCount_++;
232     if (saveDataNotifyTimerId_ > 0) {
233         saveDataNotifyCount_ = 0;
234         LOGW("[SyncStateMachine][SaveDataNotify] timer has been started!");
235         return true;
236     }
237 
238     // Incref to make sure context still alive before timer stopped.
239     RefObject::IncObjRef(syncContext_);
240     int errCode = RuntimeContext::GetInstance()->SetTimer(
241         DATA_NOTIFY_INTERVAL,
242         [this, sessionId, sequenceId, inMsgId](TimerId timerId) {
243             RefObject::IncObjRef(syncContext_);
244             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, sequenceId, inMsgId]() {
245                 DoSaveDataNotify(sessionId, sequenceId, inMsgId);
246                 RefObject::DecObjRef(syncContext_);
247             });
248             if (ret != E_OK) {
249                 LOGE("[SyncStateMachine] [DoSaveDataNotify] ScheduleTask failed errCode %d", ret);
250                 RefObject::DecObjRef(syncContext_);
251             }
252             return ret;
253         },
254         [this]() { RefObject::DecObjRef(syncContext_); },
255         saveDataNotifyTimerId_);
256     if (errCode != E_OK) {
257         LOGW("[SyncStateMachine][SaveDataNotify] start timer failed err %d !", errCode);
258         saveDataNotifyRefCount_--;
259         return false;
260     }
261     return true;
262 }
263 
StopSaveDataNotify()264 void SyncStateMachine::StopSaveDataNotify()
265 {
266     std::lock_guard<std::mutex> lockGuard(saveDataNotifyLock_);
267     StopSaveDataNotifyNoLock();
268 }
269 
StopSaveDataNotifyNoLock()270 void SyncStateMachine::StopSaveDataNotifyNoLock()
271 {
272     if (saveDataNotifyTimerId_ == 0) {
273         LOGI("[SyncStateMachine][SaveDataNotify] timer is not started!");
274         return;
275     }
276     saveDataNotifyRefCount_--;
277     if (saveDataNotifyRefCount_ > 0) {
278         return;
279     }
280     RuntimeContext::GetInstance()->RemoveTimer(saveDataNotifyTimerId_);
281     saveDataNotifyTimerId_ = 0;
282     saveDataNotifyCount_ = 0;
283     saveDataNotifyRefCount_ = 0;
284 }
285 
StartFeedDogForSync(uint32_t time,SyncDirectionFlag flag)286 bool SyncStateMachine::StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag)
287 {
288     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
289         LOGE("[SyncStateMachine][feedDog] start wrong flag:%d", flag);
290         return false;
291     }
292 
293     uint8_t cnt = GetFeedDogTimeout(time / DATA_NOTIFY_INTERVAL);
294     LOGI("[SyncStateMachine][feedDog] start cnt:%d, flag:%d", cnt, flag);
295 
296     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
297     watchDogController_[flag].refCount++;
298     LOGD("af incr refCount = %d", watchDogController_[flag].refCount);
299 
300     if (watchDogController_[flag].feedDogTimerId > 0) {
301         // update the upperLimit, if the new cnt is bigger then last upperLimit
302         if (cnt > watchDogController_[flag].feedDogUpperLimit) {
303             LOGD("update feedDogUpperLimit = %d", cnt);
304             watchDogController_[flag].feedDogUpperLimit = cnt;
305         }
306         watchDogController_[flag].feedDogCnt = 0u;
307         LOGW("[SyncStateMachine][feedDog] timer has been started!, flag:%d", flag);
308         return false;
309     }
310 
311     // Incref to make sure context still alive before timer stopped.
312     RefObject::IncObjRef(syncContext_);
313     watchDogController_[flag].feedDogUpperLimit = cnt;
314     int errCode = RuntimeContext::GetInstance()->SetTimer(
315         DATA_NOTIFY_INTERVAL,
316         [this, flag](TimerId timerId) {
317             RefObject::IncObjRef(syncContext_);
318             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, flag]() {
319                 DoFeedDogForSync(flag);
320                 RefObject::DecObjRef(syncContext_);
321             });
322             if (ret != E_OK) {
323                 LOGE("[SyncStateMachine] [DoFeedDogForSync] ScheduleTask failed errCode %d", ret);
324                 RefObject::DecObjRef(syncContext_);
325             }
326             return ret;
327         },
328         [this]() { RefObject::DecObjRef(syncContext_); },
329         watchDogController_[flag].feedDogTimerId);
330     if (errCode != E_OK) {
331         LOGW("[SyncStateMachine][feedDog] start timer failed err %d !", errCode);
332         return false;
333     }
334     return true;
335 }
336 
GetFeedDogTimeout(int timeoutCount) const337 uint8_t SyncStateMachine::GetFeedDogTimeout(int timeoutCount) const
338 {
339     if (timeoutCount > UINT8_MAX) {
340         return UINT8_MAX;
341     }
342     return timeoutCount;
343 }
344 
StopFeedDogForSync(SyncDirectionFlag flag)345 void SyncStateMachine::StopFeedDogForSync(SyncDirectionFlag flag)
346 {
347     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
348         LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
349         return;
350     }
351     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
352     StopFeedDogForSyncNoLock(flag);
353 }
354 
StopFeedDogForSyncNoLock(SyncDirectionFlag flag)355 void SyncStateMachine::StopFeedDogForSyncNoLock(SyncDirectionFlag flag)
356 {
357     if (flag != SyncDirectionFlag::SEND && flag != SyncDirectionFlag::RECEIVE) {
358         LOGE("[SyncStateMachine][feedDog] stop wrong flag:%d", flag);
359         return;
360     }
361     if (watchDogController_[flag].feedDogTimerId == 0) {
362         return;
363     }
364     LOGI("[SyncStateMachine][feedDog] stop flag:%d", flag);
365     RuntimeContext::GetInstance()->RemoveTimer(watchDogController_[flag].feedDogTimerId);
366     watchDogController_[flag].feedDogTimerId = 0;
367     watchDogController_[flag].feedDogCnt = 0;
368     watchDogController_[flag].refCount = 0;
369 }
370 
SetCurStateErrStatus()371 void SyncStateMachine::SetCurStateErrStatus()
372 {
373 }
374 
DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)375 void SyncStateMachine::DecRefCountOfFeedDogTimer(SyncDirectionFlag flag)
376 {
377     std::lock_guard<std::mutex> lockGuard(feedDogLock_[flag]);
378     if (watchDogController_[flag].feedDogTimerId == 0) {
379         return;
380     }
381     if (--watchDogController_[flag].refCount <= 0) {
382         LOGD("stop feed dog timer, refcount = %d", watchDogController_[flag].refCount);
383         StopFeedDogForSyncNoLock(flag);
384     }
385     LOGD("af dec refcount = %d", watchDogController_[flag].refCount);
386 }
387 
DoSaveDataNotify(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)388 void SyncStateMachine::DoSaveDataNotify(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
389 {
390     // we send notify packet at first, because it will cost a lot of time to get machine lock
391     {
392         std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
393         if (saveDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
394             StopSaveDataNotifyNoLock();
395             return;
396         }
397         SendNotifyPacket(sessionId, sequenceId, inMsgId);
398         saveDataNotifyCount_++;
399         if (waitingResetLockBySaveData_) {
400             return;
401         }
402         waitingResetLockBySaveData_ = true;
403     }
404     std::lock_guard<std::mutex> lock(stateMachineLock_);
405     {
406         std::lock_guard<std::mutex> innerLock(saveDataNotifyLock_);
407         waitingResetLockBySaveData_ = false;
408     }
409     (void)ResetWatchDog();
410 }
411 
DoFeedDogForSync(SyncDirectionFlag flag)412 void SyncStateMachine::DoFeedDogForSync(SyncDirectionFlag flag)
413 {
414     {
415         std::lock_guard<std::mutex> lock(stateMachineLock_);
416         (void)ResetWatchDog();
417     }
418     std::lock_guard<std::mutex> innerLock(feedDogLock_[flag]);
419     if (watchDogController_[flag].feedDogCnt >= watchDogController_[flag].feedDogUpperLimit) { // LCOV_EXCL_BR_LINE
420         StopFeedDogForSyncNoLock(flag);
421         return;
422     }
423     watchDogController_[flag].feedDogCnt++;
424 }
425 
InnerErrorAbort(uint32_t sessionId)426 void SyncStateMachine::InnerErrorAbort(uint32_t sessionId)
427 {
428     // do nothing
429     (void) sessionId;
430 }
431 
NotifyClosing()432 void SyncStateMachine::NotifyClosing()
433 {
434     // do nothing
435 }
436 
StartFeedDogForGetData(uint32_t sessionId)437 void SyncStateMachine::StartFeedDogForGetData(uint32_t sessionId)
438 {
439     std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
440     if (getDataNotifyTimerId_ > 0) {
441         getDataNotifyCount_ = 0;
442         LOGW("[SyncStateMachine][StartFeedDogForGetData] timer has been started!");
443     }
444 
445     // Incref to make sure context still alive before timer stopped.
446     RefObject::IncObjRef(syncContext_);
447     int errCode = RuntimeContext::GetInstance()->SetTimer(
448         DATA_NOTIFY_INTERVAL,
449         [this, sessionId](TimerId timerId) {
450             RefObject::IncObjRef(syncContext_);
451             int ret = RuntimeContext::GetInstance()->ScheduleTask([this, sessionId, timerId]() {
452                 DoGetAndSendDataNotify(sessionId);
453                 int getDataNotifyCount = 0;
454                 {
455                     std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
456                     getDataNotifyCount = getDataNotifyCount_;
457                 }
458                 if (getDataNotifyCount >= MAX_DATA_NOTIFY_COUNT) {
459                     StopFeedDogForGetDataInner(timerId);
460                 }
461                 RefObject::DecObjRef(syncContext_);
462             });
463             if (ret != E_OK) {
464                 LOGE("[SyncStateMachine] [StartFeedDogForGetData] ScheduleTask failed errCode %d", ret);
465                 RefObject::DecObjRef(syncContext_);
466             }
467             return ret;
468         },
469         [this]() { RefObject::DecObjRef(syncContext_); },
470         getDataNotifyTimerId_);
471     if (errCode != E_OK) {
472         LOGW("[SyncStateMachine][StartFeedDogForGetData] start timer failed err %d !", errCode);
473     }
474 }
475 
StopFeedDogForGetData()476 void SyncStateMachine::StopFeedDogForGetData()
477 {
478     TimerId timerId = 0;
479     {
480         std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
481         timerId = getDataNotifyTimerId_;
482     }
483     if (timerId == 0) {
484         return;
485     }
486     StopFeedDogForGetDataInner(timerId);
487 }
488 
DoGetAndSendDataNotify(uint32_t sessionId)489 void SyncStateMachine::DoGetAndSendDataNotify(uint32_t sessionId)
490 {
491     (void)ResetWatchDog();
492     std::lock_guard<std::mutex> autoLock(getDataNotifyLock_);
493     if (getDataNotifyCount_ >= MAX_DATA_NOTIFY_COUNT) {
494         return;
495     }
496     if (sessionId != 0) {
497         SendNotifyPacket(sessionId, 0, DATA_SYNC_MESSAGE);
498     }
499     getDataNotifyCount_++;
500 }
501 
StopFeedDogForGetDataInner(TimerId timerId)502 void SyncStateMachine::StopFeedDogForGetDataInner(TimerId timerId)
503 {
504     std::lock_guard<std::mutex> lockGuard(getDataNotifyLock_);
505     if (getDataNotifyTimerId_ == 0 || getDataNotifyTimerId_ != timerId) {
506         return;
507     }
508     RuntimeContext::GetInstance()->RemoveTimer(timerId);
509     getDataNotifyTimerId_ = 0;
510     getDataNotifyCount_ = 0;
511 }
512 
SchemaChange()513 void SyncStateMachine::SchemaChange()
514 {
515 }
516 
TimeChange()517 void SyncStateMachine::TimeChange()
518 {
519 }
520 } // namespace DistributedDB
521