1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_SYNC_NEW_H 17 #define SINGLE_VER_DATA_SYNC_NEW_H 18 19 #include "icommunicator.h" 20 #include "isync_interface.h" 21 #include "meta_data.h" 22 #include "parcel.h" 23 #include "single_ver_data_message_schedule.h" 24 #include "single_ver_data_packet.h" 25 #include "single_ver_kvdb_sync_interface.h" 26 #include "single_ver_sync_task_context.h" 27 #include "sync_generic_interface.h" 28 #include "sync_types.h" 29 #include "version.h" 30 31 namespace DistributedDB { 32 using SendDataItem = SingleVerKvEntry *; 33 struct ReSendInfo { 34 Timestamp start = 0; 35 Timestamp end = 0; 36 Timestamp deleteBeginTime = 0; 37 Timestamp deleteEndTime = 0; 38 // packetId is used for matched ackpacket packetId which saved in ackPacket.reserve 39 // if equaled, means need to handle the ack, or drop. it is always increased 40 uint64_t packetId = 0; 41 }; 42 43 struct DataSyncReSendInfo { 44 uint32_t sessionId = 0; 45 uint32_t sequenceId = 0; 46 Timestamp start = 0; // means normal or sync data localwatermark 47 Timestamp end = 0; 48 Timestamp deleteDataStart = 0; // means delete data localwatermark 49 Timestamp deleteDataEnd = 0; 50 uint64_t packetId = 0; 51 }; 52 53 struct SyncEntry { 54 std::vector<SendDataItem> entries; 55 std::vector<uint8_t> compressedEntries; 56 }; 57 58 class SingleVerDataSync { 59 public: 60 SingleVerDataSync(); 61 virtual ~SingleVerDataSync(); 62 63 DISABLE_COPY_ASSIGN_MOVE(SingleVerDataSync); 64 65 int Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle, 66 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId); 67 68 int SyncStart(int mode, SingleVerSyncTaskContext *context); 69 70 int TryContinueSync(SingleVerSyncTaskContext *context, const Message *message); 71 72 void ClearSyncStatus(); 73 74 int PushStart(SingleVerSyncTaskContext *context); 75 76 int PushPullStart(SingleVerSyncTaskContext *context); 77 78 int PullRequestStart(SingleVerSyncTaskContext *context); 79 80 int PullResponseStart(SingleVerSyncTaskContext *context); 81 82 int DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message, WaterMark &pullEndWatermark); 83 84 bool AckPacketIdCheck(const Message *message); 85 86 int AckRecv(SingleVerSyncTaskContext *context, const Message *message); 87 88 void SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion, uint32_t sessionId, 89 uint32_t sequenceId, uint32_t inMsgId); 90 91 virtual int SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 92 WaterMark maxSendDataTime); 93 94 int CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context); 95 96 std::string GetLabel() const; 97 98 std::string GetDeviceId() const; 99 100 bool WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message); 101 102 int ControlCmdStart(SingleVerSyncTaskContext *context); 103 104 int ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 105 106 int ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message); 107 108 void PutDataMsg(Message *message); 109 110 Message *MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle, bool &isNeedContinue); 111 112 bool IsNeedReloadQueue(); 113 114 void SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message); 115 116 void ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message); 117 118 void ClearDataMsg(); 119 120 protected: 121 static const int SEND_FINISHED = 0xff; 122 static const int LOCAL_WATER_MARK_NOT_INIT = 0xaa; 123 static const int PEER_WATER_MARK_NOT_INIT = 0x55; 124 static const int WATER_MARK_INVALID = 0xbb; 125 static const int MTU_SIZE = 28311552; // 27MB 126 127 void ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context); 128 129 int InnerSyncStart(SingleVerSyncTaskContext *context); 130 131 void InnerClearSyncStatus(); 132 133 int ReSendData(SingleVerSyncTaskContext *context); 134 135 int32_t ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo); 136 137 void SetSessionEndTimestamp(Timestamp end); 138 139 Timestamp GetSessionEndTimestamp() const; 140 141 void FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context, 142 SyncEntry &syncData, int sendCode, int mode); 143 144 int RequestStart(SingleVerSyncTaskContext *context, int mode); 145 146 SyncTimeRange GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context, 147 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate); 148 149 int GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, 150 size_t packetSize); 151 152 int GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData); 153 154 int GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData); 155 156 int Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler, 157 uint32_t packetLen); 158 159 int GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 160 int GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, 161 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo); 162 163 int GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize); 164 165 int SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData, SyncType curType, 166 const QuerySyncObject &query); 167 168 int SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context, 169 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate = false) const; 170 171 void GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify, const SingleVerSyncTaskContext *context, 172 WaterMark &waterMark) const; 173 174 void GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify, const DeviceID &deviceId, 175 WaterMark &waterMark) const; 176 177 void GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark); 178 179 void GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context, WaterMark &waterMark) const; 180 181 int RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message, WaterMark maxSendDataTime); 182 183 int DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 184 const std::vector<uint64_t> &reserved); 185 186 int SendDataPacket(SyncType syncType, DataRequestPacket *packet, SingleVerSyncTaskContext *context); 187 188 void UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId, const SyncTimeRange &dataTime, 189 const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark); 190 191 void UpdatePeerWaterMark(SyncType syncType, const std::string &queryId, const SingleVerSyncTaskContext *context, 192 WaterMark peerWatermark, WaterMark peerDeletedWatermark); 193 194 std::string GetLocalDeviceName(); 195 196 int DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg = false); 197 198 int DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 199 200 void GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 201 WaterMark &pullEndWatermark) const; 202 203 int DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark, 204 const std::vector<uint64_t> &reserved); 205 206 int RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message, 207 const DataRequestPacket *packet); 208 209 void SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen); 210 211 int SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context, 212 uint32_t sessionId, uint32_t sequenceId); 213 214 int SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData, SingleVerSyncTaskContext *context); 215 216 int CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message); 217 218 void RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId); 219 220 void SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context, const DataRequestPacket *packet, 221 int32_t recvCode, WaterMark maxSendDataTime); 222 223 int GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context, 224 DataSyncReSendInfo reSendInfo); 225 226 virtual int RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context); 227 228 virtual void UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context); 229 230 void FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet, 231 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode); 232 233 void UpdateMtuSize(); 234 235 DataSizeSpecInfo GetDataSizeSpecInfo(size_t packetSize); 236 237 int InterceptData(SyncEntry &syncEntry); 238 239 int ControlCmdStartCheck(SingleVerSyncTaskContext *context); 240 241 int SendControlPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context); 242 243 int ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message); 244 int SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet, 245 const Message *message); 246 int SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 247 int UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message); 248 int SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, 249 uint32_t controlCmdType, const CommErrHandler &handler = nullptr); 250 int QuerySyncCheck(SingleVerSyncTaskContext *context); 251 252 void RemoveSubscribeIfNeed(const std::string &queryId, const std::shared_ptr<SubscribeManager> &subscribeManager); 253 254 uint32_t mtuSize_; 255 SyncGenericInterface* storage_; 256 ICommunicator* communicateHandle_; 257 std::shared_ptr<Metadata> metadata_; 258 std::string label_; 259 std::string deviceId_; 260 261 SingleVerDataMessageSchedule msgSchedule_; 262 263 static const int HIGH_VERSION_WINDOW_SIZE = 3; 264 static const int LOW_VERSION_WINDOW_SIZE = 1; 265 // below param is about sliding sync info, is different from every sync task 266 std::mutex lock_; 267 int mode_ = 0; // sync mode, may diff from context mode if trigger pull_response while push finish 268 uint32_t sessionId_ = 0; 269 // sequenceId as key 270 std::map<uint32_t, ReSendInfo> reSendMap_; 271 // remaining sending window 272 int32_t windowSize_ = 0; 273 // max sequenceId has been sent 274 uint32_t maxSequenceIdHasSent_ = 0; 275 bool isAllDataHasSent_ = false; 276 // in a sync session, the last data timestamp 277 Timestamp sessionEndTimestamp_ = 0; 278 279 std::mutex removeDeviceDataLock_; 280 std::mutex unsubscribeLock_; 281 }; 282 } // namespace DistributedDB 283 284 #endif // SINGLE_VER_DATA_SYNC_NEW_H 285