1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_STATE_MACHINE_H
17 #define SINGLE_VER_SYNC_STATE_MACHINE_H
18 
19 #include <condition_variable>
20 #include <memory>
21 
22 #include "ability_sync.h"
23 #include "message.h"
24 #include "meta_data.h"
25 #include "semaphore_utils.h"
26 #include "single_ver_data_sync.h"
27 #include "single_ver_sync_task_context.h"
28 #include "sync_state_machine.h"
29 #include "sync_target.h"
30 
31 #include "time_sync.h"
32 #include "time_helper.h"
33 
34 namespace DistributedDB {
35 using StateMappingHandler = std::function<uint8_t(void)>;
36 class SingleVerSyncStateMachine : public SyncStateMachine {
37 public:
38     enum State {
39         IDLE = 0,
40         TIME_SYNC,
41         ABILITY_SYNC,
42         WAIT_FOR_RECEIVE_DATA_FINISH, // all data send finished, wait for data revice if has pull request
43         SYNC_TASK_FINISHED, // current sync task finihsed, try to schedule next sync task
44         SYNC_TIME_OUT,
45         INNER_ERR,
46         START_INITIACTIVE_DATA_SYNC, // used to do sync started by local device, use sliding window
47         START_PASSIVE_DATA_SYNC, // used to do pull response, use sliding window
48         SYNC_CONTROL_CMD // used to send control cmd.
49     };
50 
51     enum Event {
52         START_SYNC_EVENT = 1,
53         TIME_SYNC_FINISHED_EVENT,
54         ABILITY_SYNC_FINISHED_EVENT,
55         VERSION_NOT_SUPPOR_EVENT,
56         SEND_DATA_EVENT,
57         SEND_FINISHED_EVENT,
58         RECV_FINISHED_EVENT,
59         NEED_ABILITY_SYNC_EVENT,
60         RESPONSE_TASK_FINISHED_EVENT,
61         START_PULL_RESPONSE_EVENT,
62         WAIT_ACK_EVENT,
63         ALL_TASK_FINISHED_EVENT,
64         TIME_OUT_EVENT,
65         INNER_ERR_EVENT,
66         WAIT_TIME_OUT_EVENT,
67         RE_SEND_DATA_EVENT,
68         CONTROL_CMD_EVENT,
69         NEED_TIME_SYNC_EVENT,
70         ANY_EVENT
71     };
72     SingleVerSyncStateMachine();
73     ~SingleVerSyncStateMachine() override;
74 
75     // Init the SingleVerSyncStateMachine
76     int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metaData,
77         ICommunicator *communicator) override;
78 
79     // send Message to the StateMachine
80     int ReceiveMessageCallback(Message *inMsg) override;
81 
82     // Called by CommErrHandler, used to abort sync when handle err
83     void CommErrAbort(uint32_t sessionId = 0) override;
84 
85     int HandleDataRequestRecv(const Message *inMsg);
86 
87     bool IsNeedErrCodeHandle(uint32_t sessionId) const;
88 
89     void PushPullDataRequestEvokeErrHandle();
90 
91     void DataRecvErrCodeHandle(uint32_t sessionId, int errCode);
92 
93     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
94 
95     void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue);
96 
97     int GetSendQueryWaterMark(const std::string &queryId, const DeviceID &deviceId, bool isAutoLift,
98         uint64_t &outValue);
99 
100     void InnerErrorAbort(uint32_t sessionId) override;
101 
102     void NotifyClosing() override;
103 
104     void SchemaChange() override;
105 
106     void TimeChange() override;
107 protected:
108     // Step the SingleVerSyncStateMachine
109     void SyncStep() override;
110 
111     // SyncOperation is timeout, step to timeout state
112     void StepToTimeout(TimerId timerId) override;
113 
114     void SyncStepInnerLocked() override;
115 
116     // Do state machine step with no lock, for inner use
117     void SyncStepInner() override;
118 
119     int StartSyncInner() override;
120 
121     void AbortInner() override;
122 
123     void SetCurStateErrStatus() override;
124 
125     // Used to get instance class' stateSwitchTables
126     const std::vector<StateSwitchTable> &GetStateSwitchTables() const override;
127 
128     // Do some init for run a next sync task
129     int PrepareNextSyncTask() override;
130 
131     // Called by StartSaveDataNotifyTimer, used to send a save data notify packet
132     void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override;
133 
134     int TimeMarkSyncRecv(const Message *inMsg);
135 
136     void DataAckRecvErrCodeHandle(int errCode, bool handleError);
137 
138     void ResponsePullError(int errCode, bool ignoreInnerErr);
139 
140 private:
141     // Used to init sync state machine switchbables
142     static void InitStateSwitchTables();
143 
144     // To generate the statemachine switchtable with the given version
145     static void InitStateSwitchTable(uint32_t version, const std::vector<std::vector<uint8_t>> &switchTable);
146 
147     void InitStateMapping();
148 
149     // Do TimeSync, for first sync
150     Event DoTimeSync() const;
151 
152     // Do AbilitySync, for first sync
153     Event DoAbilitySync() const;
154 
155     // Waiting for pull data revice finish, if coming a pull request, should goto START_PASSIVE_DATA_SYNC state
156     Event DoWaitForDataRecv() const;
157 
158     // Sync task finihsed, should do some data clear and exec next task.
159     Event DoSyncTaskFinished();
160 
161     // Do something when sync timeout.
162     Event DoTimeout();
163 
164     // Do something when sync get some err.
165     Event DoInnerErr();
166 
167     Event DoInitiactiveDataSyncWithSlidingWindow() const;
168 
169     Event DoPassiveDataSyncWithSlidingWindow();
170 
171     Event DoInitiactiveControlSync() const;
172 
173     Event GetEventAfterTimeSync(int mode) const;
174 
175     int HandleControlAckRecv(const Message *inMsg);
176 
177     int GetSyncOperationStatus(int errCode) const;
178 
179     int AbilitySyncRecv(const Message *inMsg);
180 
181     int DataPktRecv(Message *inMsg);
182 
183     void ScheduleMsgAndHandle(Message *inMsg);
184 
185     int ControlPktRecv(Message *inMsg);
186 
187     void NeedAbilitySyncHandle();
188 
189     int HandleDataAckRecv(const Message *inMsg);
190 
191     void HandleDataAckRecvWithSlidingWindow(int errCode, const Message *inMsg, bool ignoreInnerErr);
192 
193     void Clear();
194 
195     bool IsPacketValid(const Message *inMsg) const;
196 
197     void PreStartPullResponse();
198 
199     bool CheckIsStartPullResponse() const;
200 
201     int MessageCallbackPre(const Message *inMsg);
202 
203     void AddPullResponseTarget(const Message *inMsg, WaterMark pullEndWatermark);
204 
205     Event TransformErrCodeToEvent(int errCode) const;
206 
207     bool IsNeedResetWatchdog(const Message *inMsg) const;
208 
209     Event TransforTimeOutErrCodeToEvent() const;
210 
211     bool AbilityMsgSessionIdCheck(const Message *inMsg);
212 
213     SyncType GetSyncType(uint32_t messageId) const;
214 
215     void JumpStatusAfterAbilitySync(int mode);
216 
217     void ControlAckRecvErrCodeHandle(int errCode);
218 
219     int AbilitySyncResponseRecv(const Message *inMsg);
220 
221     int AbilitySyncNotifyRecv(const Message *inMsg);
222 
223     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncStateMachine);
224 
225     static std::mutex stateSwitchTableLock_;
226     static bool isStateSwitchTableInited_;
227     static std::vector<StateSwitchTable> stateSwitchTables_;
228     SingleVerSyncTaskContext *context_;
229     SyncGenericInterface *syncInterface_;
230     std::shared_ptr<TimeSync> timeSync_;
231     std::unique_ptr<AbilitySync> abilitySync_;
232     std::shared_ptr<SingleVerDataSync> dataSync_;
233     uint64_t currentRemoteVersionId_;
234     std::map<uint8_t, StateMappingHandler> stateMapping_;
235 };
236 } // namespace DistributedDB
237 
238 #endif // SINGLE_VER_SYNC_STATE_MACHINE_H
239