1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef MULTI_VER_SYNC_STATE_MACHINE_H
17 #define MULTI_VER_SYNC_STATE_MACHINE_H
18 
19 #ifndef OMIT_MULTI_VER
20 #include <memory>
21 
22 #include "commit_history_sync.h"
23 #include "db_types.h"
24 #include "meta_data.h"
25 #include "multi_ver_data_sync.h"
26 #include "multi_ver_sync_task_context.h"
27 #include "sync_state_machine.h"
28 #include "time_sync.h"
29 #include "value_slice_sync.h"
30 
31 namespace DistributedDB {
32 class MultiVerSyncStateMachine final : public SyncStateMachine {
33 public:
34     struct ResponseInfo {
35         uint32_t sessionId = 0;
36         TimerId timerId = 0;
37     };
38 
39     MultiVerSyncStateMachine();
40     ~MultiVerSyncStateMachine() override;
41 
42     // Init the MultiVerSyncStateMachine
43     int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
44         ICommunicator *communicator) override;
45 
46     // send Message to the StateMachine
47     int ReceiveMessageCallback(Message *inMsg) override;
48 
49     // Called by CommErrHandler, used to abort sync when handle err
50     void CommErrAbort(uint32_t sessionId = 0) override;
51 
52     DISABLE_COPY_ASSIGN_MOVE(MultiVerSyncStateMachine);
53 
54 protected:
55     // Step the MultiVerSyncStateMachine
56     void SyncStep() override;
57 
58     // SyncOperation is timeout, step to timeout state
59     void StepToTimeout(TimerId timerId) override;
60 
61     void SyncStepInnerLocked() override;
62 
63     void SyncStepInner() override;
64 
65     void AbortInner() override;
66 
67     int StartSyncInner() override;
68 
69     const std::vector<StateSwitchTable> &GetStateSwitchTables() const override;
70 
71     // Do some init for run a next sync task
72     int PrepareNextSyncTask() override;
73 
74     // Called by StartSaveDataNotifyTimer, used to send a save data notify packet
75     void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override;
76 
77     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
78 
79 private:
80     enum State {
81         IDLE,
82         TIME_SYNC,
83         COMMIT_HISTORY_SYNC,
84         MULTI_VER_DATA_ENTRY_SYNC,
85         MULTI_VER_VALUE_SLICE_SYNC,
86         SYNC_TIME_OUT,
87         INNER_ERR
88     };
89 
90     void StepToIdle();
91 
92     int MessageCallbackCheck(const Message *inMsg);
93 
94     int CommitHistorySyncStepInner(void);
95 
96     int MultiVerDataSyncStepInner(void);
97 
98     int ValueSliceSyncStepInner(void);
99 
100     int TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg);
101 
102     int CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg);
103 
104     int MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg);
105 
106     int ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg);
107 
108     void Finish();
109 
110     int OneCommitSyncFinish();
111 
112     bool IsPacketValid(const Message *inMsg) const;
113 
114     void Clear();
115 
116     // Mark sync response is begin now, we should disable real delete
117     void SyncResponseBegin(uint32_t sessionId);
118 
119     // Mark sync response is finished, we should enable real delete
120     void SyncResponseEnd(uint32_t sessionId);
121 
122     // Mark sync response may has an err, has not received finish ack, we should enable real delete
123     int SyncResponseTimeout(TimerId timerId);
124 
125     static const int RESPONSE_TIME_OUT = 30 * 1000; // 30s
126 
127     static std::vector<StateSwitchTable> stateSwitchTables_;
128     MultiVerSyncTaskContext *context_;
129     MultiVerKvDBSyncInterface *multiVerStorage_;
130     std::mutex responseInfosLock_;
131     std::list<ResponseInfo> responseInfos_;
132     std::unique_ptr<TimeSync> timeSync_;
133     std::unique_ptr<CommitHistorySync> commitHistorySync_;
134     std::unique_ptr<MultiVerDataSync> multiVerDataSync_;
135     std::unique_ptr<ValueSliceSync> valueSliceSync_;
136 };
137 } // namespace DistributedDB
138 
139 #endif // MULTI_VER_SYNC_STATE_MACHINE_H
140 #endif
141