1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_DATA_PACKET_NEW_H
17 #define SINGLE_VER_DATA_PACKET_NEW_H
18 
19 #include "icommunicator.h"
20 #include "parcel.h"
21 #include "query_sync_object.h"
22 #include "single_ver_kvdb_sync_interface.h"
23 #include "sync_types.h"
24 #include "version.h"
25 
26 namespace DistributedDB {
27 using SendDataItem = SingleVerKvEntry *;
28 
29 class DataRequestPacket {
30 public:
DataRequestPacket()31     DataRequestPacket() {};
32     virtual ~DataRequestPacket();
33 
34     void SetData(std::vector<SendDataItem> &data);
35 
36     const std::vector<SendDataItem> &GetData() const;
37     const std::vector<uint8_t> &GetCompressedData() const;
38 
39     void SetCompressData(std::vector<uint8_t> &compressData);
40 
41     const std::vector<uint8_t> &GetCompressData() const;
42 
43     void SetEndWaterMark(WaterMark waterMark);
44 
45     WaterMark GetEndWaterMark() const;
46 
47     void SetLocalWaterMark(WaterMark waterMark);
48 
49     WaterMark GetLocalWaterMark() const;
50 
51     void SetPeerWaterMark(WaterMark waterMark);
52 
53     WaterMark GetPeerWaterMark() const;
54 
55     void SetSendCode(int32_t errCode);
56 
57     int32_t GetSendCode() const;
58 
59     void SetMode(int32_t mode);
60 
61     int32_t GetMode() const;
62 
63     void SetSessionId(uint32_t sessionId);
64 
65     uint32_t GetSessionId() const;
66 
67     void SetVersion(uint32_t version);
68 
69     uint32_t GetVersion() const;
70 
71     uint32_t CalculateLen(uint32_t messageId) const;
72 
73     void SetReserved(std::vector<uint64_t> &reserved);
74     void SetReserved(std::vector<uint64_t> &&reserved);
75 
76     std::vector<uint64_t> GetReserved() const;
77 
78     uint64_t GetPacketId() const;
79 
80     void SetFlag(uint32_t flag);
81 
82     uint32_t GetFlag() const;
83 
84     bool IsLastSequence() const;
85 
86     void SetLastSequence();
87 
88     bool IsNeedUpdateWaterMark() const;
89 
90     void SetUpdateWaterMark();
91 
92     void SetBasicInfo(int sendCode, uint32_t version, int32_t mode);
93 
94     void SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark);
95 
96     void SetQuery(const QuerySyncObject &query);
97     QuerySyncObject GetQuery() const;
98 
99     void SetQueryId(const std::string &queryId);
100     std::string GetQueryId() const;
101 
102     void SetDeletedWaterMark(WaterMark watermark);
103     WaterMark GetDeletedWaterMark() const;
104 
105     void SetCompressDataMark();
106     bool IsCompressData() const;
107 
108     void SetCompressAlgo(CompressAlgorithm algo);
109     CompressAlgorithm GetCompressAlgo() const;
110 
111     void SetExtraConditions(const std::map<std::string, std::string> &extraConditions);
112     std::map<std::string, std::string> GetExtraConditions() const;
113     bool IsExtraConditionData() const;
114 
115     void SetSchemaVersion(uint64_t schemaVersion);
116     uint64_t GetSchemaVersion() const;
117 
118     void SetSystemTimeOffset(int64_t systemTimeOffset);
119     int64_t GetSystemTimeOffset() const;
120 
121     void SetSenderTimeOffset(int64_t senderTimeOffset);
122     int64_t GetSenderTimeOffset() const;
123 
124     void SetSecurityOption(const SecurityOption &option);
125     SecurityOption GetSecurityOption() const;
126 protected:
127     std::vector<SendDataItem> data_;
128     WaterMark endWaterMark_ = 0;
129     WaterMark localWaterMark_ = 0;
130     WaterMark peerWaterMark_ = 0;
131     int32_t sendCode_ = 0;
132     int32_t mode_ = SyncModeType::INVALID_MODE;
133     uint32_t sessionId_ = 0;
134     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
135     std::vector<uint64_t> reserved_;
136     uint32_t flag_ = 0; // bit 0 used for isLastSequence
137     // add for query sync mode
138     QuerySyncObject query_;
139     std::string queryId_;
140     WaterMark deletedWatermark_ = 0;
141     std::vector<uint8_t> compressData_; // if compressData size is above 0, means use compressData and ignore data_
142     CompressAlgorithm algo_ = CompressAlgorithm::NONE; // used for param while serialize compress data
143     std::map<std::string, std::string> extraConditions_; // use for checkpermission in annother device
144     uint64_t schemaVersion_ = 0; // sender schema version, add in 109
145     int64_t systemTimeOffset_ = 0; // sender device time offset with receiver, add in 109
146     int64_t senderTimeOffset_ = 0; // sender local time offset, add in 109
147     SecurityOption securityOption_;
148     static const uint32_t IS_LAST_SEQUENCE = 0x1; // bit 0 used for isLastSequence, 1: is last, 0: not last
149     static const uint32_t IS_UPDATE_WATER = 0x2; // bit 1 used for update watermark, 0: update, 1: not update
150     static const uint32_t IS_COMPRESS_DATA = 0x4; // bit 3 used for compress data, 0: raw data, 1: compress data
151     static const uint32_t IS_CONDITION_DATA = 0x8; // bit 4 used for extra condition data, 0: raw data
152 };
153 
154 class DataAckPacket {
155 public:
DataAckPacket()156     DataAckPacket() {};
~DataAckPacket()157     virtual ~DataAckPacket() {};
158 
159     void SetData(uint64_t data);
160 
161     uint64_t GetData() const;
162 
163     void SetRecvCode(int32_t errorCode);
164 
165     int32_t GetRecvCode() const;
166 
167     void SetVersion(uint32_t version);
168 
169     uint32_t GetVersion() const;
170 
171     void SetReserved(std::vector<uint64_t> &reserved);
172 
173     std::vector<uint64_t> GetReserved() const;
174 
175     uint64_t GetPacketId() const;
176 
177     static bool IsPacketIdValid(uint64_t packetId);
178 
179     uint32_t CalculateLen() const;
180 
181 private:
182     /*
183      * data_ is waterMark when revCode_ == LOCAL_WATER_MARK_NOT_INIT || revCode_ == E_OK;
184      * data_ is timer in milliSeconds when revCode_ == -E_SAVE_DATA_NOTIFY && data_ != 0.
185      */
186     uint64_t data_ = 0;
187     int32_t recvCode_ = 0;
188     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
189     std::vector<uint64_t> reserved_;
190 };
191 
192 class ControlRequestPacket {
193 public:
ControlRequestPacket()194     ControlRequestPacket() {};
~ControlRequestPacket()195     virtual ~ControlRequestPacket() {};
196     void SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag);
197 
198     int32_t GetSendCode() const;
199     uint32_t GetVersion() const;
200     uint32_t GetcontrolCmdType() const;
201     uint32_t GetFlag() const;
202     virtual void SetQuery(const QuerySyncObject &query);
203     virtual uint32_t CalculateLen() const;
204 private:
205     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
206     int32_t sendCode_ = 0;
207     uint32_t controlCmdType_ = 0;
208     uint32_t flag_ = 0;
209 };
210 
211 class SubscribeRequest : public ControlRequestPacket {
212 public:
SubscribeRequest()213     SubscribeRequest() {};
~SubscribeRequest()214     ~SubscribeRequest() override {};
215     QuerySyncObject GetQuery() const;
216     bool IsAutoSubscribe() const;
217     void SetQuery(const QuerySyncObject &query) override;
218     uint32_t CalculateLen() const override;
219     static const uint32_t IS_AUTO_SUBSCRIBE = 0x1;
220 private:
221     QuerySyncObject query_;
222 };
223 
224 class ControlAckPacket {
225 public:
ControlAckPacket()226     ControlAckPacket() {};
~ControlAckPacket()227     virtual ~ControlAckPacket() {};
228     void SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag);
229     int32_t GetRecvCode() const;
230     uint32_t GetVersion() const;
231     uint32_t GetcontrolCmdType() const;
232     uint32_t GetFlag() const;
233     uint32_t CalculateLen() const;
234 
235 private:
236     uint32_t version_ = SOFTWARE_VERSION_CURRENT;
237     int32_t recvCode_ = 0;
238     uint32_t controlCmdType_ = 0;
239     uint32_t flag_ = 0;
240 };
241 }  // namespace DistributedDB
242 
243 #endif // SINGLE_VER_DATA_SYNC_NEW_H