1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H 17 #define SINGLE_VER_DATA_PACKET_NEW_H 18 19 #include "icommunicator.h" 20 #include "parcel.h" 21 #include "query_sync_object.h" 22 #include "single_ver_kvdb_sync_interface.h" 23 #include "sync_types.h" 24 #include "version.h" 25 26 namespace DistributedDB { 27 using SendDataItem = SingleVerKvEntry *; 28 29 class DataRequestPacket { 30 public: DataRequestPacket()31 DataRequestPacket() {}; 32 virtual ~DataRequestPacket(); 33 34 void SetData(std::vector<SendDataItem> &data); 35 36 const std::vector<SendDataItem> &GetData() const; 37 const std::vector<uint8_t> &GetCompressedData() const; 38 39 void SetCompressData(std::vector<uint8_t> &compressData); 40 41 const std::vector<uint8_t> &GetCompressData() const; 42 43 void SetEndWaterMark(WaterMark waterMark); 44 45 WaterMark GetEndWaterMark() const; 46 47 void SetLocalWaterMark(WaterMark waterMark); 48 49 WaterMark GetLocalWaterMark() const; 50 51 void SetPeerWaterMark(WaterMark waterMark); 52 53 WaterMark GetPeerWaterMark() const; 54 55 void SetSendCode(int32_t errCode); 56 57 int32_t GetSendCode() const; 58 59 void SetMode(int32_t mode); 60 61 int32_t GetMode() const; 62 63 void SetSessionId(uint32_t sessionId); 64 65 uint32_t GetSessionId() const; 66 67 void SetVersion(uint32_t version); 68 69 uint32_t GetVersion() const; 70 71 uint32_t CalculateLen(uint32_t messageId) const; 72 73 void SetReserved(std::vector<uint64_t> &reserved); 74 void SetReserved(std::vector<uint64_t> &&reserved); 75 76 std::vector<uint64_t> GetReserved() const; 77 78 uint64_t GetPacketId() const; 79 80 void SetFlag(uint32_t flag); 81 82 uint32_t GetFlag() const; 83 84 bool IsLastSequence() const; 85 86 void SetLastSequence(); 87 88 bool IsNeedUpdateWaterMark() const; 89 90 void SetUpdateWaterMark(); 91 92 void SetBasicInfo(int sendCode, uint32_t version, int32_t mode); 93 94 void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark); 95 96 void SetQuery(const QuerySyncObject &query); 97 QuerySyncObject GetQuery() const; 98 99 void SetQueryId(const std::string &queryId); 100 std::string GetQueryId() const; 101 102 void SetDeletedWaterMark(WaterMark watermark); 103 WaterMark GetDeletedWaterMark() const; 104 105 void SetCompressDataMark(); 106 bool IsCompressData() const; 107 108 void SetCompressAlgo(CompressAlgorithm algo); 109 CompressAlgorithm GetCompressAlgo() const; 110 111 void SetExtraConditions(const std::map<std::string, std::string> &extraConditions); 112 std::map<std::string, std::string> GetExtraConditions() const; 113 bool IsExtraConditionData() const; 114 115 void SetSchemaVersion(uint64_t schemaVersion); 116 uint64_t GetSchemaVersion() const; 117 118 void SetSystemTimeOffset(int64_t systemTimeOffset); 119 int64_t GetSystemTimeOffset() const; 120 121 void SetSenderTimeOffset(int64_t senderTimeOffset); 122 int64_t GetSenderTimeOffset() const; 123 124 void SetSecurityOption(const SecurityOption &option); 125 SecurityOption GetSecurityOption() const; 126 protected: 127 std::vector<SendDataItem> data_; 128 WaterMark endWaterMark_ = 0; 129 WaterMark localWaterMark_ = 0; 130 WaterMark peerWaterMark_ = 0; 131 int32_t sendCode_ = 0; 132 int32_t mode_ = SyncModeType::INVALID_MODE; 133 uint32_t sessionId_ = 0; 134 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 135 std::vector<uint64_t> reserved_; 136 uint32_t flag_ = 0; // bit 0 used for isLastSequence 137 // add for query sync mode 138 QuerySyncObject query_; 139 std::string queryId_; 140 WaterMark deletedWatermark_ = 0; 141 std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_ 142 CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data 143 std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device 144 uint64_t schemaVersion_ = 0; // sender schema version, add in 109 145 int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109 146 int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109 147 SecurityOption securityOption_; 148 static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last 149 static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update 150 static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data 151 static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data 152 }; 153 154 class DataAckPacket { 155 public: DataAckPacket()156 DataAckPacket() {}; ~DataAckPacket()157 virtual ~DataAckPacket() {}; 158 159 void SetData(uint64_t data); 160 161 uint64_t GetData() const; 162 163 void SetRecvCode(int32_t errorCode); 164 165 int32_t GetRecvCode() const; 166 167 void SetVersion(uint32_t version); 168 169 uint32_t GetVersion() const; 170 171 void SetReserved(std::vector<uint64_t> &reserved); 172 173 std::vector<uint64_t> GetReserved() const; 174 175 uint64_t GetPacketId() const; 176 177 static bool IsPacketIdValid(uint64_t packetId); 178 179 uint32_t CalculateLen() const; 180 181 private: 182 /* 183 * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK; 184 * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0. 185 */ 186 uint64_t data_ = 0; 187 int32_t recvCode_ = 0; 188 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 189 std::vector<uint64_t> reserved_; 190 }; 191 192 class ControlRequestPacket { 193 public: ControlRequestPacket()194 ControlRequestPacket() {}; ~ControlRequestPacket()195 virtual ~ControlRequestPacket() {}; 196 void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag); 197 198 int32_t GetSendCode() const; 199 uint32_t GetVersion() const; 200 uint32_t GetcontrolCmdType() const; 201 uint32_t GetFlag() const; 202 virtual void SetQuery(const QuerySyncObject &query); 203 virtual uint32_t CalculateLen() const; 204 private: 205 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 206 int32_t sendCode_ = 0; 207 uint32_t controlCmdType_ = 0; 208 uint32_t flag_ = 0; 209 }; 210 211 class SubscribeRequest : public ControlRequestPacket { 212 public: SubscribeRequest()213 SubscribeRequest() {}; ~SubscribeRequest()214 ~SubscribeRequest() override {}; 215 QuerySyncObject GetQuery() const; 216 bool IsAutoSubscribe() const; 217 void SetQuery(const QuerySyncObject &query) override; 218 uint32_t CalculateLen() const override; 219 static const uint32_t IS_AUTO_SUBSCRIBE = 0x1; 220 private: 221 QuerySyncObject query_; 222 }; 223 224 class ControlAckPacket { 225 public: ControlAckPacket()226 ControlAckPacket() {}; ~ControlAckPacket()227 virtual ~ControlAckPacket() {}; 228 void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag); 229 int32_t GetRecvCode() const; 230 uint32_t GetVersion() const; 231 uint32_t GetcontrolCmdType() const; 232 uint32_t GetFlag() const; 233 uint32_t CalculateLen() const; 234 235 private: 236 uint32_t version_ = SOFTWARE_VERSION_CURRENT; 237 int32_t recvCode_ = 0; 238 uint32_t controlCmdType_ = 0; 239 uint32_t flag_ = 0; 240 }; 241 } // namespace DistributedDB 242 243 #endif // SINGLE_VER_DATA_SYNC_NEW_H