1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24 
25 namespace DistributedDB {
~DataRequestPacket()26 DataRequestPacket::~DataRequestPacket()
27 {
28     for (auto &entry : data_) {
29         delete entry;
30         entry = nullptr;
31     }
32 }
33 
SetData(std::vector<SendDataItem> & data)34 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
35 {
36     data_ = std::move(data);
37 }
38 
GetData() const39 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
40 {
41     return data_;
42 }
43 
SetCompressData(std::vector<uint8_t> & compressData)44 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
45 {
46     compressData_ = std::move(compressData);
47 }
48 
GetCompressData() const49 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
50 {
51     return compressData_;
52 }
53 
SetEndWaterMark(WaterMark waterMark)54 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
55 {
56     endWaterMark_ = waterMark;
57 }
58 
GetEndWaterMark() const59 WaterMark DataRequestPacket::GetEndWaterMark() const
60 {
61     return endWaterMark_;
62 }
63 
SetLocalWaterMark(WaterMark waterMark)64 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
65 {
66     localWaterMark_ = waterMark;
67 }
68 
GetLocalWaterMark() const69 WaterMark DataRequestPacket::GetLocalWaterMark() const
70 {
71     return localWaterMark_;
72 }
73 
SetPeerWaterMark(WaterMark waterMark)74 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
75 {
76     peerWaterMark_ = waterMark;
77 }
78 
GetPeerWaterMark() const79 WaterMark DataRequestPacket::GetPeerWaterMark() const
80 {
81     return peerWaterMark_;
82 }
83 
SetSendCode(int32_t errCode)84 void DataRequestPacket::SetSendCode(int32_t errCode)
85 {
86     sendCode_ = errCode;
87 }
88 
GetSendCode() const89 int32_t DataRequestPacket::GetSendCode() const
90 {
91     return sendCode_;
92 }
93 
SetMode(int32_t mode)94 void DataRequestPacket::SetMode(int32_t mode)
95 {
96     mode_ = mode;
97 }
98 
GetMode() const99 int32_t DataRequestPacket::GetMode() const
100 {
101     return mode_;
102 }
103 
SetSessionId(uint32_t sessionId)104 void DataRequestPacket::SetSessionId(uint32_t sessionId)
105 {
106     sessionId_ = sessionId;
107 }
108 
GetSessionId() const109 uint32_t DataRequestPacket::GetSessionId() const
110 {
111     return sessionId_;
112 }
113 
SetVersion(uint32_t version)114 void DataRequestPacket::SetVersion(uint32_t version)
115 {
116     version_ = version;
117 }
118 
GetVersion() const119 uint32_t DataRequestPacket::GetVersion() const
120 {
121     return version_;
122 }
123 
SetReserved(std::vector<uint64_t> & reserved)124 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126     reserved_ = std::move(reserved);
127 }
128 
SetReserved(std::vector<uint64_t> && reserved)129 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
130 {
131     reserved_ = reserved;
132 }
133 
GetReserved() const134 std::vector<uint64_t> DataRequestPacket::GetReserved() const
135 {
136     return reserved_;
137 }
138 
GetPacketId() const139 uint64_t DataRequestPacket::GetPacketId() const
140 {
141     uint64_t packetId = 0;
142     std::vector<uint64_t> DataRequestReserve = GetReserved();
143     if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
144         return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
145     } else {
146         return packetId;
147     }
148 }
149 
CalculateLen(uint32_t messageId) const150 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
151 {
152     uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
153         IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
154     if (totalLen == 0) {
155         return 0;
156     }
157     totalLen += Parcel::GetUInt64Len(); // endWaterMark
158     totalLen += Parcel::GetUInt64Len(); // localWaterMark
159     totalLen += Parcel::GetUInt64Len(); // peerWaterMark
160     totalLen += Parcel::GetIntLen(); // sendCode
161     totalLen += Parcel::GetIntLen(); // mode
162     totalLen += Parcel::GetUInt32Len(); // sessionId
163     totalLen += Parcel::GetUInt32Len(); // version
164     totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
165 
166     if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
167         totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
168     }
169     totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
170     if (totalLen > INT32_MAX) {
171         return 0;
172     }
173     if (messageId == QUERY_SYNC_MESSAGE) {
174         // deleted watermark
175         totalLen += Parcel::GetUInt64Len();
176         // query id
177         totalLen += Parcel::GetStringLen(queryId_);
178         // add for queryObject
179         totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
180     }
181     if (IsCompressData()) {
182         totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
183     }
184 
185     if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
186         totalLen += Parcel::GetUInt32Len(); // extraCondition size
187         for (const auto &entry : extraConditions_) {
188             totalLen += Parcel::GetStringLen(entry.first);
189             totalLen += Parcel::GetStringLen(entry.second);
190         }
191         totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
192     }
193     if (version_ >= SOFTWARE_VERSION_RELEASE_9_0) {
194         totalLen += Parcel::GetUInt64Len(); // schemaVersion
195         totalLen += Parcel::GetInt64Len(); // systemTimeOffset
196         totalLen += Parcel::GetInt64Len(); // senderTimeOffset
197         totalLen += Parcel::GetIntLen();   // security label
198         totalLen += Parcel::GetIntLen();   // security flag
199     }
200     if (totalLen > INT32_MAX) {
201         return 0;
202     }
203     return totalLen;
204 }
205 
SetFlag(uint32_t flag)206 void DataRequestPacket::SetFlag(uint32_t flag)
207 {
208     flag_ = flag;
209 }
210 
GetFlag() const211 uint32_t DataRequestPacket::GetFlag() const
212 {
213     return flag_;
214 }
215 
IsLastSequence() const216 bool DataRequestPacket::IsLastSequence() const
217 {
218     return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
219 }
220 
SetLastSequence()221 void DataRequestPacket::SetLastSequence()
222 {
223     flag_ = flag_ | IS_LAST_SEQUENCE;
224 }
225 
IsNeedUpdateWaterMark() const226 bool DataRequestPacket::IsNeedUpdateWaterMark() const
227 {
228     return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
229 }
230 
SetUpdateWaterMark()231 void DataRequestPacket::SetUpdateWaterMark()
232 {
233     flag_ = flag_ | IS_UPDATE_WATER;
234 }
235 
SetCompressDataMark()236 void DataRequestPacket::SetCompressDataMark()
237 {
238     flag_ = flag_ | IS_COMPRESS_DATA;
239 }
240 
IsCompressData() const241 bool DataRequestPacket::IsCompressData() const
242 {
243     return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
244 }
245 
SetCompressAlgo(CompressAlgorithm algo)246 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
247 {
248     algo_ = algo;
249 }
250 
GetCompressAlgo() const251 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
252 {
253     return algo_;
254 }
255 
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)256 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
257 {
258     SetSendCode(sendCode);
259     SetVersion(version);
260     SetMode(mode);
261 }
262 
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)263 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
264 {
265     localWaterMark_ = localMark;
266     peerWaterMark_ = peerMark;
267     deletedWatermark_ = deletedWatermark;
268 }
269 
SetQuery(const QuerySyncObject & query)270 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
271 {
272     query_ = query;
273 }
274 
GetQuery() const275 QuerySyncObject DataRequestPacket::GetQuery() const
276 {
277     return query_;
278 }
279 
SetQueryId(const std::string & queryId)280 void DataRequestPacket::SetQueryId(const std::string &queryId)
281 {
282     queryId_ = queryId;
283 }
284 
GetQueryId() const285 std::string DataRequestPacket::GetQueryId() const
286 {
287     return queryId_;
288 }
289 
SetDeletedWaterMark(WaterMark watermark)290 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
291 {
292     deletedWatermark_ = watermark;
293 }
294 
GetDeletedWaterMark() const295 WaterMark DataRequestPacket::GetDeletedWaterMark() const
296 {
297     return deletedWatermark_;
298 }
299 
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)300 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
301 {
302     extraConditions_ = extraConditions;
303     flag_ |= IS_CONDITION_DATA;
304 }
305 
GetExtraConditions() const306 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
307 {
308     return extraConditions_;
309 }
310 
IsExtraConditionData() const311 bool DataRequestPacket::IsExtraConditionData() const
312 {
313     return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
314 }
315 
SetSchemaVersion(uint64_t schemaVersion)316 void DataRequestPacket::SetSchemaVersion(uint64_t schemaVersion)
317 {
318     schemaVersion_ = schemaVersion;
319 }
320 
GetSchemaVersion() const321 uint64_t DataRequestPacket::GetSchemaVersion() const
322 {
323     return schemaVersion_;
324 }
325 
SetSystemTimeOffset(int64_t systemTimeOffset)326 void DataRequestPacket::SetSystemTimeOffset(int64_t systemTimeOffset)
327 {
328     systemTimeOffset_ = systemTimeOffset;
329 }
330 
GetSystemTimeOffset() const331 int64_t DataRequestPacket::GetSystemTimeOffset() const
332 {
333     return systemTimeOffset_;
334 }
335 
SetSenderTimeOffset(int64_t senderTimeOffset)336 void DataRequestPacket::SetSenderTimeOffset(int64_t senderTimeOffset)
337 {
338     senderTimeOffset_ = senderTimeOffset;
339 }
340 
GetSenderTimeOffset() const341 int64_t DataRequestPacket::GetSenderTimeOffset() const
342 {
343     return senderTimeOffset_;
344 }
345 
SetSecurityOption(const SecurityOption & option)346 void DataRequestPacket::SetSecurityOption(const SecurityOption &option)
347 {
348     securityOption_ = option;
349 }
350 
GetSecurityOption() const351 SecurityOption DataRequestPacket::GetSecurityOption() const
352 {
353     return securityOption_;
354 }
355 
SetData(uint64_t data)356 void DataAckPacket::SetData(uint64_t data)
357 {
358     data_ = data;
359 }
360 
GetData() const361 uint64_t DataAckPacket::GetData() const
362 {
363     return data_;
364 }
365 
SetRecvCode(int32_t errorCode)366 void DataAckPacket::SetRecvCode(int32_t errorCode)
367 {
368     recvCode_ = errorCode;
369 }
370 
GetRecvCode() const371 int32_t DataAckPacket::GetRecvCode() const
372 {
373     return recvCode_;
374 }
375 
SetVersion(uint32_t version)376 void DataAckPacket::SetVersion(uint32_t version)
377 {
378     version_ = version;
379 }
380 
GetVersion() const381 uint32_t DataAckPacket::GetVersion() const
382 {
383     return version_;
384 }
385 
SetReserved(std::vector<uint64_t> & reserved)386 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
387 {
388     reserved_ = std::move(reserved);
389 }
390 
GetReserved() const391 std::vector<uint64_t> DataAckPacket::GetReserved() const
392 {
393     return reserved_;
394 }
395 
GetPacketId() const396 uint64_t DataAckPacket::GetPacketId() const
397 {
398     uint64_t packetId = 0;
399     std::vector<uint64_t> DataAckReserve = GetReserved();
400     if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
401         packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
402     }
403     // while remote db is close and open again, it may not carry packetId
404     // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
405     if (packetId > MAX_PACKETID) {
406         return 0;
407     }
408     return packetId;
409 }
410 
IsPacketIdValid(uint64_t packetId)411 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
412 {
413     return (packetId > 0);
414 }
415 
CalculateLen() const416 uint32_t DataAckPacket::CalculateLen() const
417 {
418     uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
419     len += Parcel::GetIntLen(); // recvCode
420     len += Parcel::GetUInt32Len(); // version
421     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
422 
423     len = Parcel::GetEightByteAlign(len);
424     if (len > INT32_MAX) {
425         return 0;
426     }
427     return len;
428 }
429 
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)430 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
431 {
432     sendCode_ = sendCode;
433     version_ = version;
434     controlCmdType_ = static_cast<uint32_t>(controlCmd);
435     flag_ = flag;
436 }
437 
GetSendCode() const438 int32_t ControlRequestPacket::GetSendCode() const
439 {
440     return sendCode_;
441 }
442 
GetVersion() const443 uint32_t ControlRequestPacket::GetVersion() const
444 {
445     return version_;
446 }
447 
GetcontrolCmdType() const448 uint32_t ControlRequestPacket::GetcontrolCmdType() const
449 {
450     return controlCmdType_;
451 }
452 
GetFlag() const453 uint32_t ControlRequestPacket::GetFlag() const
454 {
455     return flag_;
456 }
457 
SetQuery(const QuerySyncObject & query)458 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
459 {
460     (void)query;
461 }
462 
CalculateLen() const463 uint32_t ControlRequestPacket::CalculateLen() const
464 {
465     uint64_t len = Parcel::GetUInt32Len(); // version_
466     len += Parcel::GetIntLen(); // sendCode_
467     len += Parcel::GetUInt32Len(); // controlCmdType_
468     len += Parcel::GetUInt32Len(); // flag
469 
470     len = Parcel::GetEightByteAlign(len);
471     if (len > INT32_MAX) {
472         return 0;
473     }
474     return len;
475 }
476 
SetQuery(const QuerySyncObject & query)477 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
478 {
479     query_ = query;
480 }
481 
GetQuery() const482 QuerySyncObject SubscribeRequest::GetQuery() const
483 {
484     return query_;
485 }
486 
CalculateLen() const487 uint32_t SubscribeRequest::CalculateLen() const
488 {
489     uint64_t totalLen = ControlRequestPacket::CalculateLen();
490     if (totalLen == 0) {
491         LOGE("[SubscribeRequest] cal packet len failed");
492         return 0;
493     }
494     // add for queryObject
495     totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
496     if (totalLen > INT32_MAX) {
497         return 0;
498     }
499     return totalLen;
500 }
501 
IsAutoSubscribe() const502 bool SubscribeRequest::IsAutoSubscribe() const
503 {
504     return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
505 }
506 
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)507 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
508 {
509     recvCode_ = recvCode;
510     version_ = version;
511     controlCmdType_ = static_cast<uint32_t>(controlCmd);
512     flag_ = flag;
513 }
514 
GetRecvCode() const515 int32_t ControlAckPacket::GetRecvCode() const
516 {
517     return recvCode_;
518 }
519 
GetVersion() const520 uint32_t ControlAckPacket::GetVersion() const
521 {
522     return version_;
523 }
524 
GetcontrolCmdType() const525 uint32_t ControlAckPacket::GetcontrolCmdType() const
526 {
527     return controlCmdType_;
528 }
529 
GetFlag() const530 uint32_t ControlAckPacket::GetFlag() const
531 {
532     return flag_;
533 }
534 
CalculateLen() const535 uint32_t ControlAckPacket::CalculateLen() const
536 {
537     uint64_t len = Parcel::GetUInt32Len(); // version_
538     len += Parcel::GetIntLen(); // recvCode_
539     len += Parcel::GetUInt32Len(); // controlCmdType_
540     len += Parcel::GetUInt32Len(); // flag
541     len = Parcel::GetEightByteAlign(len);
542     if (len > INT32_MAX) {
543         return 0;
544     }
545     return len;
546 }
547 } // namespace DistributedDB
548