1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H 17 #define SINGLE_VER_SYNC_STATE_MACHINE_H 18 19 #include <condition_variable> 20 #include <memory> 21 22 #include "ability_sync.h" 23 #include "message.h" 24 #include "meta_data.h" 25 #include "semaphore_utils.h" 26 #include "single_ver_data_sync.h" 27 #include "single_ver_sync_task_context.h" 28 #include "sync_state_machine.h" 29 #include "sync_target.h" 30 31 #include "time_sync.h" 32 #include "time_helper.h" 33 34 namespace DistributedDB { 35 using StateMappingHandler = std::function<uint8_t(void)>; 36 class SingleVerSyncStateMachine : public SyncStateMachine { 37 public: 38 enum State { 39 IDLE = 0, 40 TIME_SYNC, 41 ABILITY_SYNC, 42 WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request 43 SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task 44 SYNC_TIME_OUT, 45 INNER_ERR, 46 START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window 47 START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window 48 SYNC_CONTROL_CMD // used to send control cmd. 49 }; 50 51 enum Event { 52 START_SYNC_EVENT = 1, 53 TIME_SYNC_FINISHED_EVENT, 54 ABILITY_SYNC_FINISHED_EVENT, 55 VERSION_NOT_SUPPOR_EVENT, 56 SEND_DATA_EVENT, 57 SEND_FINISHED_EVENT, 58 RECV_FINISHED_EVENT, 59 NEED_ABILITY_SYNC_EVENT, 60 RESPONSE_TASK_FINISHED_EVENT, 61 START_PULL_RESPONSE_EVENT, 62 WAIT_ACK_EVENT, 63 ALL_TASK_FINISHED_EVENT, 64 TIME_OUT_EVENT, 65 INNER_ERR_EVENT, 66 WAIT_TIME_OUT_EVENT, 67 RE_SEND_DATA_EVENT, 68 CONTROL_CMD_EVENT, 69 NEED_TIME_SYNC_EVENT, 70 ANY_EVENT 71 }; 72 SingleVerSyncStateMachine(); 73 ~SingleVerSyncStateMachine() override; 74 75 // Init the SingleVerSyncStateMachine 76 int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metaData, 77 ICommunicator *communicator) override; 78 79 // send Message to the StateMachine 80 int ReceiveMessageCallback(Message *inMsg) override; 81 82 // Called by CommErrHandler, used to abort sync when handle err 83 void CommErrAbort(uint32_t sessionId = 0) override; 84 85 int HandleDataRequestRecv(const Message *inMsg); 86 87 bool IsNeedErrCodeHandle(uint32_t sessionId) const; 88 89 void PushPullDataRequestEvokeErrHandle(); 90 91 void DataRecvErrCodeHandle(uint32_t sessionId, int errCode); 92 93 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 94 95 void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue); 96 97 int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, bool isAutoLift, 98 uint64_t &outValue); 99 100 void InnerErrorAbort(uint32_t sessionId) override; 101 102 void NotifyClosing() override; 103 104 void SchemaChange() override; 105 106 void TimeChange() override; 107 protected: 108 // Step the SingleVerSyncStateMachine 109 void SyncStep() override; 110 111 // SyncOperation is timeout, step to timeout state 112 void StepToTimeout(TimerId timerId) override; 113 114 void SyncStepInnerLocked() override; 115 116 // Do state machine step with no lock, for inner use 117 void SyncStepInner() override; 118 119 int StartSyncInner() override; 120 121 void AbortInner() override; 122 123 void SetCurStateErrStatus() override; 124 125 // Used to get instance class' stateSwitchTables 126 const std::vector<StateSwitchTable> &GetStateSwitchTables() const override; 127 128 // Do some init for run a next sync task 129 int PrepareNextSyncTask() override; 130 131 // Called by StartSaveDataNotifyTimer, used to send a save data notify packet 132 void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override; 133 134 int TimeMarkSyncRecv(const Message *inMsg); 135 136 void DataAckRecvErrCodeHandle(int errCode, bool handleError); 137 138 void ResponsePullError(int errCode, bool ignoreInnerErr); 139 140 private: 141 // Used to init sync state machine switchbables 142 static void InitStateSwitchTables(); 143 144 // To generate the statemachine switchtable with the given version 145 static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable); 146 147 void InitStateMapping(); 148 149 // Do TimeSync, for first sync 150 Event DoTimeSync() const; 151 152 // Do AbilitySync, for first sync 153 Event DoAbilitySync() const; 154 155 // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state 156 Event DoWaitForDataRecv() const; 157 158 // Sync task finihsed, should do some data clear and exec next task. 159 Event DoSyncTaskFinished(); 160 161 // Do something when sync timeout. 162 Event DoTimeout(); 163 164 // Do something when sync get some err. 165 Event DoInnerErr(); 166 167 Event DoInitiactiveDataSyncWithSlidingWindow() const; 168 169 Event DoPassiveDataSyncWithSlidingWindow(); 170 171 Event DoInitiactiveControlSync() const; 172 173 Event GetEventAfterTimeSync(int mode) const; 174 175 int HandleControlAckRecv(const Message *inMsg); 176 177 int GetSyncOperationStatus(int errCode) const; 178 179 int AbilitySyncRecv(const Message *inMsg); 180 181 int DataPktRecv(Message *inMsg); 182 183 void ScheduleMsgAndHandle(Message *inMsg); 184 185 int ControlPktRecv(Message *inMsg); 186 187 void NeedAbilitySyncHandle(); 188 189 int HandleDataAckRecv(const Message *inMsg); 190 191 void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr); 192 193 void Clear(); 194 195 bool IsPacketValid(const Message *inMsg) const; 196 197 void PreStartPullResponse(); 198 199 bool CheckIsStartPullResponse() const; 200 201 int MessageCallbackPre(const Message *inMsg); 202 203 void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark); 204 205 Event TransformErrCodeToEvent(int errCode) const; 206 207 bool IsNeedResetWatchdog(const Message *inMsg) const; 208 209 Event TransforTimeOutErrCodeToEvent() const; 210 211 bool AbilityMsgSessionIdCheck(const Message *inMsg); 212 213 SyncType GetSyncType(uint32_t messageId) const; 214 215 void JumpStatusAfterAbilitySync(int mode); 216 217 void ControlAckRecvErrCodeHandle(int errCode); 218 219 int AbilitySyncResponseRecv(const Message *inMsg); 220 221 int AbilitySyncNotifyRecv(const Message *inMsg); 222 223 DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine); 224 225 static std::mutex stateSwitchTableLock_; 226 static bool isStateSwitchTableInited_; 227 static std::vector<StateSwitchTable> stateSwitchTables_; 228 SingleVerSyncTaskContext *context_; 229 SyncGenericInterface *syncInterface_; 230 std::shared_ptr<TimeSync> timeSync_; 231 std::unique_ptr<AbilitySync> abilitySync_; 232 std::shared_ptr<SingleVerDataSync> dataSync_; 233 uint64_t currentRemoteVersionId_; 234 std::map<uint8_t, StateMappingHandler> stateMapping_; 235 }; 236 } // namespace DistributedDB 237 238 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H 239