1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef MULTI_VER_SYNC_STATE_MACHINE_H 17 #define MULTI_VER_SYNC_STATE_MACHINE_H 18 19 #ifndef OMIT_MULTI_VER 20 #include <memory> 21 22 #include "commit_history_sync.h" 23 #include "db_types.h" 24 #include "meta_data.h" 25 #include "multi_ver_data_sync.h" 26 #include "multi_ver_sync_task_context.h" 27 #include "sync_state_machine.h" 28 #include "time_sync.h" 29 #include "value_slice_sync.h" 30 31 namespace DistributedDB { 32 class MultiVerSyncStateMachine final : public SyncStateMachine { 33 public: 34 struct ResponseInfo { 35 uint32_t sessionId = 0; 36 TimerId timerId = 0; 37 }; 38 39 MultiVerSyncStateMachine(); 40 ~MultiVerSyncStateMachine() override; 41 42 // Init the MultiVerSyncStateMachine 43 int Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata, 44 ICommunicator *communicator) override; 45 46 // send Message to the StateMachine 47 int ReceiveMessageCallback(Message *inMsg) override; 48 49 // Called by CommErrHandler, used to abort sync when handle err 50 void CommErrAbort(uint32_t sessionId = 0) override; 51 52 DISABLE_COPY_ASSIGN_MOVE(MultiVerSyncStateMachine); 53 54 protected: 55 // Step the MultiVerSyncStateMachine 56 void SyncStep() override; 57 58 // SyncOperation is timeout, step to timeout state 59 void StepToTimeout(TimerId timerId) override; 60 61 void SyncStepInnerLocked() override; 62 63 void SyncStepInner() override; 64 65 void AbortInner() override; 66 67 int StartSyncInner() override; 68 69 const std::vector<StateSwitchTable> &GetStateSwitchTables() const override; 70 71 // Do some init for run a next sync task 72 int PrepareNextSyncTask() override; 73 74 // Called by StartSaveDataNotifyTimer, used to send a save data notify packet 75 void SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) override; 76 77 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 78 79 private: 80 enum State { 81 IDLE, 82 TIME_SYNC, 83 COMMIT_HISTORY_SYNC, 84 MULTI_VER_DATA_ENTRY_SYNC, 85 MULTI_VER_VALUE_SLICE_SYNC, 86 SYNC_TIME_OUT, 87 INNER_ERR 88 }; 89 90 void StepToIdle(); 91 92 int MessageCallbackCheck(const Message *inMsg); 93 94 int CommitHistorySyncStepInner(void); 95 96 int MultiVerDataSyncStepInner(void); 97 98 int ValueSliceSyncStepInner(void); 99 100 int TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg); 101 102 int CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg); 103 104 int MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg); 105 106 int ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg); 107 108 void Finish(); 109 110 int OneCommitSyncFinish(); 111 112 bool IsPacketValid(const Message *inMsg) const; 113 114 void Clear(); 115 116 // Mark sync response is begin now, we should disable real delete 117 void SyncResponseBegin(uint32_t sessionId); 118 119 // Mark sync response is finished, we should enable real delete 120 void SyncResponseEnd(uint32_t sessionId); 121 122 // Mark sync response may has an err, has not received finish ack, we should enable real delete 123 int SyncResponseTimeout(TimerId timerId); 124 125 static const int RESPONSE_TIME_OUT = 30 * 1000; // 30s 126 127 static std::vector<StateSwitchTable> stateSwitchTables_; 128 MultiVerSyncTaskContext *context_; 129 MultiVerKvDBSyncInterface *multiVerStorage_; 130 std::mutex responseInfosLock_; 131 std::list<ResponseInfo> responseInfos_; 132 std::unique_ptr<TimeSync> timeSync_; 133 std::unique_ptr<CommitHistorySync> commitHistorySync_; 134 std::unique_ptr<MultiVerDataSync> multiVerDataSync_; 135 std::unique_ptr<ValueSliceSync> valueSliceSync_; 136 }; 137 } // namespace DistributedDB 138 139 #endif // MULTI_VER_SYNC_STATE_MACHINE_H 140 #endif 141