1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_packet.h"
17 #include "icommunicator.h"
18 #include "single_ver_kvdb_sync_interface.h"
19 #include "query_sync_object.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "sync_types.h"
22 #include "version.h"
23 #include "parcel.h"
24
25 namespace DistributedDB {
~DataRequestPacket()26 DataRequestPacket::~DataRequestPacket()
27 {
28 for (auto &entry : data_) {
29 delete entry;
30 entry = nullptr;
31 }
32 }
33
SetData(std::vector<SendDataItem> & data)34 void DataRequestPacket::SetData(std::vector<SendDataItem> &data)
35 {
36 data_ = std::move(data);
37 }
38
GetData() const39 const std::vector<SendDataItem> &DataRequestPacket::GetData() const
40 {
41 return data_;
42 }
43
SetCompressData(std::vector<uint8_t> & compressData)44 void DataRequestPacket::SetCompressData(std::vector<uint8_t> &compressData)
45 {
46 compressData_ = std::move(compressData);
47 }
48
GetCompressData() const49 const std::vector<uint8_t> &DataRequestPacket::GetCompressData() const
50 {
51 return compressData_;
52 }
53
SetEndWaterMark(WaterMark waterMark)54 void DataRequestPacket::SetEndWaterMark(WaterMark waterMark)
55 {
56 endWaterMark_ = waterMark;
57 }
58
GetEndWaterMark() const59 WaterMark DataRequestPacket::GetEndWaterMark() const
60 {
61 return endWaterMark_;
62 }
63
SetLocalWaterMark(WaterMark waterMark)64 void DataRequestPacket::SetLocalWaterMark(WaterMark waterMark)
65 {
66 localWaterMark_ = waterMark;
67 }
68
GetLocalWaterMark() const69 WaterMark DataRequestPacket::GetLocalWaterMark() const
70 {
71 return localWaterMark_;
72 }
73
SetPeerWaterMark(WaterMark waterMark)74 void DataRequestPacket::SetPeerWaterMark(WaterMark waterMark)
75 {
76 peerWaterMark_ = waterMark;
77 }
78
GetPeerWaterMark() const79 WaterMark DataRequestPacket::GetPeerWaterMark() const
80 {
81 return peerWaterMark_;
82 }
83
SetSendCode(int32_t errCode)84 void DataRequestPacket::SetSendCode(int32_t errCode)
85 {
86 sendCode_ = errCode;
87 }
88
GetSendCode() const89 int32_t DataRequestPacket::GetSendCode() const
90 {
91 return sendCode_;
92 }
93
SetMode(int32_t mode)94 void DataRequestPacket::SetMode(int32_t mode)
95 {
96 mode_ = mode;
97 }
98
GetMode() const99 int32_t DataRequestPacket::GetMode() const
100 {
101 return mode_;
102 }
103
SetSessionId(uint32_t sessionId)104 void DataRequestPacket::SetSessionId(uint32_t sessionId)
105 {
106 sessionId_ = sessionId;
107 }
108
GetSessionId() const109 uint32_t DataRequestPacket::GetSessionId() const
110 {
111 return sessionId_;
112 }
113
SetVersion(uint32_t version)114 void DataRequestPacket::SetVersion(uint32_t version)
115 {
116 version_ = version;
117 }
118
GetVersion() const119 uint32_t DataRequestPacket::GetVersion() const
120 {
121 return version_;
122 }
123
SetReserved(std::vector<uint64_t> & reserved)124 void DataRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126 reserved_ = std::move(reserved);
127 }
128
SetReserved(std::vector<uint64_t> && reserved)129 void DataRequestPacket::SetReserved(std::vector<uint64_t> &&reserved)
130 {
131 reserved_ = reserved;
132 }
133
GetReserved() const134 std::vector<uint64_t> DataRequestPacket::GetReserved() const
135 {
136 return reserved_;
137 }
138
GetPacketId() const139 uint64_t DataRequestPacket::GetPacketId() const
140 {
141 uint64_t packetId = 0;
142 std::vector<uint64_t> DataRequestReserve = GetReserved();
143 if (DataRequestReserve.size() > REQUEST_PACKET_RESERVED_INDEX_PACKETID) {
144 return DataRequestReserve[REQUEST_PACKET_RESERVED_INDEX_PACKETID];
145 } else {
146 return packetId;
147 }
148 }
149
CalculateLen(uint32_t messageId) const150 uint32_t DataRequestPacket::CalculateLen(uint32_t messageId) const
151 {
152 uint64_t totalLen = GenericSingleVerKvEntry::CalculateLens(
153 IsCompressData() ? std::vector<SendDataItem> {} : data_, version_); // for data
154 if (totalLen == 0) {
155 return 0;
156 }
157 totalLen += Parcel::GetUInt64Len(); // endWaterMark
158 totalLen += Parcel::GetUInt64Len(); // localWaterMark
159 totalLen += Parcel::GetUInt64Len(); // peerWaterMark
160 totalLen += Parcel::GetIntLen(); // sendCode
161 totalLen += Parcel::GetIntLen(); // mode
162 totalLen += Parcel::GetUInt32Len(); // sessionId
163 totalLen += Parcel::GetUInt32Len(); // version
164 totalLen += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
165
166 if (version_ > SOFTWARE_VERSION_RELEASE_2_0) {
167 totalLen += Parcel::GetUInt32Len(); // flag bit0 used for isLastSequence
168 }
169 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
170 if (totalLen > INT32_MAX) {
171 return 0;
172 }
173 if (messageId == QUERY_SYNC_MESSAGE) {
174 // deleted watermark
175 totalLen += Parcel::GetUInt64Len();
176 // query id
177 totalLen += Parcel::GetStringLen(queryId_);
178 // add for queryObject
179 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
180 }
181 if (IsCompressData()) {
182 totalLen += GenericSingleVerKvEntry::CalculateCompressedLens(compressData_); // add forcompressData_
183 }
184
185 if (version_ > SOFTWARE_VERSION_RELEASE_2_0 && IsExtraConditionData()) {
186 totalLen += Parcel::GetUInt32Len(); // extraCondition size
187 for (const auto &entry : extraConditions_) {
188 totalLen += Parcel::GetStringLen(entry.first);
189 totalLen += Parcel::GetStringLen(entry.second);
190 }
191 totalLen = Parcel::GetEightByteAlign(totalLen); // 8-byte align
192 }
193 if (version_ >= SOFTWARE_VERSION_RELEASE_9_0) {
194 totalLen += Parcel::GetUInt64Len(); // schemaVersion
195 totalLen += Parcel::GetInt64Len(); // systemTimeOffset
196 totalLen += Parcel::GetInt64Len(); // senderTimeOffset
197 totalLen += Parcel::GetIntLen(); // security label
198 totalLen += Parcel::GetIntLen(); // security flag
199 }
200 if (totalLen > INT32_MAX) {
201 return 0;
202 }
203 return totalLen;
204 }
205
SetFlag(uint32_t flag)206 void DataRequestPacket::SetFlag(uint32_t flag)
207 {
208 flag_ = flag;
209 }
210
GetFlag() const211 uint32_t DataRequestPacket::GetFlag() const
212 {
213 return flag_;
214 }
215
IsLastSequence() const216 bool DataRequestPacket::IsLastSequence() const
217 {
218 return ((flag_ & IS_LAST_SEQUENCE) == IS_LAST_SEQUENCE);
219 }
220
SetLastSequence()221 void DataRequestPacket::SetLastSequence()
222 {
223 flag_ = flag_ | IS_LAST_SEQUENCE;
224 }
225
IsNeedUpdateWaterMark() const226 bool DataRequestPacket::IsNeedUpdateWaterMark() const
227 {
228 return (flag_ & IS_UPDATE_WATER) != IS_UPDATE_WATER;
229 }
230
SetUpdateWaterMark()231 void DataRequestPacket::SetUpdateWaterMark()
232 {
233 flag_ = flag_ | IS_UPDATE_WATER;
234 }
235
SetCompressDataMark()236 void DataRequestPacket::SetCompressDataMark()
237 {
238 flag_ = flag_ | IS_COMPRESS_DATA;
239 }
240
IsCompressData() const241 bool DataRequestPacket::IsCompressData() const
242 {
243 return ((flag_ & IS_COMPRESS_DATA) == IS_COMPRESS_DATA);
244 }
245
SetCompressAlgo(CompressAlgorithm algo)246 void DataRequestPacket::SetCompressAlgo(CompressAlgorithm algo)
247 {
248 algo_ = algo;
249 }
250
GetCompressAlgo() const251 CompressAlgorithm DataRequestPacket::GetCompressAlgo() const
252 {
253 return algo_;
254 }
255
SetBasicInfo(int sendCode,uint32_t version,int32_t mode)256 void DataRequestPacket::SetBasicInfo(int sendCode, uint32_t version, int32_t mode)
257 {
258 SetSendCode(sendCode);
259 SetVersion(version);
260 SetMode(mode);
261 }
262
SetWaterMark(WaterMark localMark,WaterMark peerMark,WaterMark deletedWatermark)263 void DataRequestPacket::SetWaterMark(WaterMark localMark, WaterMark peerMark, WaterMark deletedWatermark)
264 {
265 localWaterMark_ = localMark;
266 peerWaterMark_ = peerMark;
267 deletedWatermark_ = deletedWatermark;
268 }
269
SetQuery(const QuerySyncObject & query)270 void DataRequestPacket::SetQuery(const QuerySyncObject &query)
271 {
272 query_ = query;
273 }
274
GetQuery() const275 QuerySyncObject DataRequestPacket::GetQuery() const
276 {
277 return query_;
278 }
279
SetQueryId(const std::string & queryId)280 void DataRequestPacket::SetQueryId(const std::string &queryId)
281 {
282 queryId_ = queryId;
283 }
284
GetQueryId() const285 std::string DataRequestPacket::GetQueryId() const
286 {
287 return queryId_;
288 }
289
SetDeletedWaterMark(WaterMark watermark)290 void DataRequestPacket::SetDeletedWaterMark(WaterMark watermark)
291 {
292 deletedWatermark_ = watermark;
293 }
294
GetDeletedWaterMark() const295 WaterMark DataRequestPacket::GetDeletedWaterMark() const
296 {
297 return deletedWatermark_;
298 }
299
SetExtraConditions(const std::map<std::string,std::string> & extraConditions)300 void DataRequestPacket::SetExtraConditions(const std::map<std::string, std::string> &extraConditions)
301 {
302 extraConditions_ = extraConditions;
303 flag_ |= IS_CONDITION_DATA;
304 }
305
GetExtraConditions() const306 std::map<std::string, std::string> DataRequestPacket::GetExtraConditions() const
307 {
308 return extraConditions_;
309 }
310
IsExtraConditionData() const311 bool DataRequestPacket::IsExtraConditionData() const
312 {
313 return ((flag_ & IS_CONDITION_DATA) == IS_CONDITION_DATA);
314 }
315
SetSchemaVersion(uint64_t schemaVersion)316 void DataRequestPacket::SetSchemaVersion(uint64_t schemaVersion)
317 {
318 schemaVersion_ = schemaVersion;
319 }
320
GetSchemaVersion() const321 uint64_t DataRequestPacket::GetSchemaVersion() const
322 {
323 return schemaVersion_;
324 }
325
SetSystemTimeOffset(int64_t systemTimeOffset)326 void DataRequestPacket::SetSystemTimeOffset(int64_t systemTimeOffset)
327 {
328 systemTimeOffset_ = systemTimeOffset;
329 }
330
GetSystemTimeOffset() const331 int64_t DataRequestPacket::GetSystemTimeOffset() const
332 {
333 return systemTimeOffset_;
334 }
335
SetSenderTimeOffset(int64_t senderTimeOffset)336 void DataRequestPacket::SetSenderTimeOffset(int64_t senderTimeOffset)
337 {
338 senderTimeOffset_ = senderTimeOffset;
339 }
340
GetSenderTimeOffset() const341 int64_t DataRequestPacket::GetSenderTimeOffset() const
342 {
343 return senderTimeOffset_;
344 }
345
SetSecurityOption(const SecurityOption & option)346 void DataRequestPacket::SetSecurityOption(const SecurityOption &option)
347 {
348 securityOption_ = option;
349 }
350
GetSecurityOption() const351 SecurityOption DataRequestPacket::GetSecurityOption() const
352 {
353 return securityOption_;
354 }
355
SetData(uint64_t data)356 void DataAckPacket::SetData(uint64_t data)
357 {
358 data_ = data;
359 }
360
GetData() const361 uint64_t DataAckPacket::GetData() const
362 {
363 return data_;
364 }
365
SetRecvCode(int32_t errorCode)366 void DataAckPacket::SetRecvCode(int32_t errorCode)
367 {
368 recvCode_ = errorCode;
369 }
370
GetRecvCode() const371 int32_t DataAckPacket::GetRecvCode() const
372 {
373 return recvCode_;
374 }
375
SetVersion(uint32_t version)376 void DataAckPacket::SetVersion(uint32_t version)
377 {
378 version_ = version;
379 }
380
GetVersion() const381 uint32_t DataAckPacket::GetVersion() const
382 {
383 return version_;
384 }
385
SetReserved(std::vector<uint64_t> & reserved)386 void DataAckPacket::SetReserved(std::vector<uint64_t> &reserved)
387 {
388 reserved_ = std::move(reserved);
389 }
390
GetReserved() const391 std::vector<uint64_t> DataAckPacket::GetReserved() const
392 {
393 return reserved_;
394 }
395
GetPacketId() const396 uint64_t DataAckPacket::GetPacketId() const
397 {
398 uint64_t packetId = 0;
399 std::vector<uint64_t> DataAckReserve = GetReserved();
400 if (DataAckReserve.size() > ACK_PACKET_RESERVED_INDEX_PACKETID) {
401 packetId = DataAckReserve[ACK_PACKET_RESERVED_INDEX_PACKETID];
402 }
403 // while remote db is close and open again, it may not carry packetId
404 // so the second index is deletewatermark if it is the query Sync, should drop the deletewatermark here
405 if (packetId > MAX_PACKETID) {
406 return 0;
407 }
408 return packetId;
409 }
410
IsPacketIdValid(uint64_t packetId)411 bool DataAckPacket::IsPacketIdValid(uint64_t packetId)
412 {
413 return (packetId > 0);
414 }
415
CalculateLen() const416 uint32_t DataAckPacket::CalculateLen() const
417 {
418 uint64_t len = Parcel::GetUInt64Len(); // ackWaterMark
419 len += Parcel::GetIntLen(); // recvCode
420 len += Parcel::GetUInt32Len(); // version
421 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
422
423 len = Parcel::GetEightByteAlign(len);
424 if (len > INT32_MAX) {
425 return 0;
426 }
427 return len;
428 }
429
SetPacketHead(int sendCode,uint32_t version,int32_t controlCmd,uint32_t flag)430 void ControlRequestPacket::SetPacketHead(int sendCode, uint32_t version, int32_t controlCmd, uint32_t flag)
431 {
432 sendCode_ = sendCode;
433 version_ = version;
434 controlCmdType_ = static_cast<uint32_t>(controlCmd);
435 flag_ = flag;
436 }
437
GetSendCode() const438 int32_t ControlRequestPacket::GetSendCode() const
439 {
440 return sendCode_;
441 }
442
GetVersion() const443 uint32_t ControlRequestPacket::GetVersion() const
444 {
445 return version_;
446 }
447
GetcontrolCmdType() const448 uint32_t ControlRequestPacket::GetcontrolCmdType() const
449 {
450 return controlCmdType_;
451 }
452
GetFlag() const453 uint32_t ControlRequestPacket::GetFlag() const
454 {
455 return flag_;
456 }
457
SetQuery(const QuerySyncObject & query)458 void ControlRequestPacket::SetQuery(const QuerySyncObject &query)
459 {
460 (void)query;
461 }
462
CalculateLen() const463 uint32_t ControlRequestPacket::CalculateLen() const
464 {
465 uint64_t len = Parcel::GetUInt32Len(); // version_
466 len += Parcel::GetIntLen(); // sendCode_
467 len += Parcel::GetUInt32Len(); // controlCmdType_
468 len += Parcel::GetUInt32Len(); // flag
469
470 len = Parcel::GetEightByteAlign(len);
471 if (len > INT32_MAX) {
472 return 0;
473 }
474 return len;
475 }
476
SetQuery(const QuerySyncObject & query)477 void SubscribeRequest::SetQuery(const QuerySyncObject &query)
478 {
479 query_ = query;
480 }
481
GetQuery() const482 QuerySyncObject SubscribeRequest::GetQuery() const
483 {
484 return query_;
485 }
486
CalculateLen() const487 uint32_t SubscribeRequest::CalculateLen() const
488 {
489 uint64_t totalLen = ControlRequestPacket::CalculateLen();
490 if (totalLen == 0) {
491 LOGE("[SubscribeRequest] cal packet len failed");
492 return 0;
493 }
494 // add for queryObject
495 totalLen += query_.CalculateParcelLen(SOFTWARE_VERSION_CURRENT);
496 if (totalLen > INT32_MAX) {
497 return 0;
498 }
499 return totalLen;
500 }
501
IsAutoSubscribe() const502 bool SubscribeRequest::IsAutoSubscribe() const
503 {
504 return ((GetFlag() & IS_AUTO_SUBSCRIBE) == IS_AUTO_SUBSCRIBE);
505 }
506
SetPacketHead(int recvCode,uint32_t version,int32_t controlCmd,uint32_t flag)507 void ControlAckPacket::SetPacketHead(int recvCode, uint32_t version, int32_t controlCmd, uint32_t flag)
508 {
509 recvCode_ = recvCode;
510 version_ = version;
511 controlCmdType_ = static_cast<uint32_t>(controlCmd);
512 flag_ = flag;
513 }
514
GetRecvCode() const515 int32_t ControlAckPacket::GetRecvCode() const
516 {
517 return recvCode_;
518 }
519
GetVersion() const520 uint32_t ControlAckPacket::GetVersion() const
521 {
522 return version_;
523 }
524
GetcontrolCmdType() const525 uint32_t ControlAckPacket::GetcontrolCmdType() const
526 {
527 return controlCmdType_;
528 }
529
GetFlag() const530 uint32_t ControlAckPacket::GetFlag() const
531 {
532 return flag_;
533 }
534
CalculateLen() const535 uint32_t ControlAckPacket::CalculateLen() const
536 {
537 uint64_t len = Parcel::GetUInt32Len(); // version_
538 len += Parcel::GetIntLen(); // recvCode_
539 len += Parcel::GetUInt32Len(); // controlCmdType_
540 len += Parcel::GetUInt32Len(); // flag
541 len = Parcel::GetEightByteAlign(len);
542 if (len > INT32_MAX) {
543 return 0;
544 }
545 return len;
546 }
547 } // namespace DistributedDB
548