1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32 uint64_t len = Parcel::GetIntLen();
33 len = Parcel::GetEightByteAlign(len);
34 len += Parcel::GetVectorCharLen(valueSliceHash_);
35 if (len > INT32_MAX) {
36 return 0;
37 }
38 return len;
39 }
40
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43 valueSliceHash_ = std::move(hash);
44 }
45
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48 hash = valueSliceHash_;
49 }
50
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53 errCode_ = errCode;
54 }
55
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58 return errCode_;
59 }
60
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64 uint64_t len = Parcel::GetIntLen();
65 len = Parcel::GetEightByteAlign(len);
66 len += Parcel::GetVectorCharLen(valueSlice_);
67 if (len > INT32_MAX) {
68 return 0;
69 }
70 return len;
71 }
72
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75 valueSlice_ = std::move(data);
76 }
77
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80 data = valueSlice_;
81 }
82
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85 errorCode_ = errCode;
86 }
87
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90 errCode = errorCode_;
91 }
92
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96 storagePtr_ = nullptr;
97 communicateHandle_ = nullptr;
98 }
99
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103 return -E_INVALID_ARGS;
104 }
105
106 switch (inMsg->GetMessageType()) {
107 case TYPE_REQUEST:
108 return RequestPacketSerialization(buffer, length, inMsg);
109 case TYPE_RESPONSE:
110 return AckPacketSerialization(buffer, length, inMsg);
111 default:
112 return -E_MESSAGE_TYPE_ERROR;
113 }
114 }
115
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119 return -E_INVALID_ARGS;
120 }
121
122 switch (inMsg->GetMessageType()) {
123 case TYPE_REQUEST:
124 return RequestPacketDeSerialization(buffer, length, inMsg);
125 case TYPE_RESPONSE:
126 return AckPacketDeSerialization(buffer, length, inMsg);
127 default:
128 return -E_MESSAGE_TYPE_ERROR;
129 }
130 }
131
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135 return 0;
136 }
137
138 uint32_t len = 0;
139 int errCode = E_OK;
140 switch (inMsg->GetMessageType()) {
141 case TYPE_REQUEST:
142 errCode = RequestPacketCalculateLen(inMsg, len);
143 break;
144 case TYPE_RESPONSE:
145 errCode = AckPacketCalculateLen(inMsg, len);
146 break;
147 default:
148 return 0;
149 }
150 if (errCode != E_OK) {
151 return 0;
152 }
153 return len;
154 }
155
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158 TransformFunc func;
159 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
160 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
161 return Serialization(buffer, length, inMsg);
162 };
163 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
164 return DeSerialization(buffer, length, inMsg);
165 };
166 return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
167 }
168
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)169 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
170 {
171 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
172 return -E_INVALID_ARGS;
173 }
174 storagePtr_ = storagePtr;
175 communicateHandle_ = communicateHandle;
176 return E_OK;
177 }
178
SyncStart(MultiVerSyncTaskContext * context)179 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
180 {
181 if (context == nullptr) {
182 return -E_INVALID_ARGS;
183 }
184 int entriesIndex = context->GetEntriesIndex();
185 int entriesSize = context->GetEntriesSize();
186 if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
187 LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
188 return -E_INVALID_ARGS;
189 }
190 while (entriesIndex < entriesSize) {
191 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
192 if (performance != nullptr) {
193 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
194 }
195 ValueSliceHash valueSliceHashNode;
196 int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
197 if (performance != nullptr) {
198 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
199 }
200 LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
201 if (errCode == E_OK) {
202 errCode = SendRequestPacket(context, valueSliceHashNode);
203 LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
204 context->GetDeviceId().c_str(), errCode);
205 return errCode;
206 }
207 // move to next entry
208 MultiVerKvEntry *entry = nullptr;
209 entriesIndex++;
210 if (entriesIndex < entriesSize) {
211 LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
212 context->SetEntriesIndex(entriesIndex);
213 context->GetEntry(entriesIndex, entry);
214 std::vector<ValueSliceHash> valueHashes;
215 errCode = entry->GetValueHash(valueHashes);
216 if (errCode != E_OK) {
217 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
218 return errCode;
219 }
220 context->SetValueSliceHashNodes(valueHashes);
221 context->SetValueSlicesIndex(0);
222 context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
223 } else {
224 // all entries are received, move to next commit
225 return -E_NOT_FOUND;
226 }
227 }
228 return -E_NOT_FOUND;
229 }
230
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)231 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
232 {
233 if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
234 return -E_INVALID_ARGS;
235 }
236
237 const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
238 if (packet == nullptr) {
239 return -E_INVALID_ARGS;
240 }
241 ValueSliceHash valueSliceHashNode;
242 packet->GetValueSliceHash(valueSliceHashNode);
243 if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
244 return -E_LAST_SYNC_FRAME;
245 }
246 ValueSlice valueSlice;
247 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
248 if (performance != nullptr) {
249 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
250 }
251 int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
252 if (performance != nullptr) {
253 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
254 }
255 if (errCode != E_OK) {
256 LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
257 }
258 errCode = SendAckPacket(context, valueSlice, errCode, message);
259 LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
260 context->GetDeviceId().c_str());
261 if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
262 return -E_LAST_SYNC_FRAME;
263 }
264 return errCode;
265 }
266
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)267 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
268 {
269 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
270 return -E_INVALID_ARGS;
271 }
272
273 const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
274 if (packet == nullptr) {
275 return -E_INVALID_ARGS;
276 }
277 int errCode = E_OK;
278 packet->GetErrorCode(errCode);
279 ValueSlice valueSlice;
280 packet->GetData(valueSlice);
281 if (errCode != E_OK) {
282 return errCode;
283 }
284 int index = context->GetValueSlicesIndex();
285 ValueSliceHash hashValue;
286 context->GetValueSliceHashNode(index, hashValue);
287 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
288 if (performance != nullptr) {
289 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
290 }
291 errCode = PutValueSlice(hashValue, valueSlice);
292 if (performance != nullptr) {
293 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
294 }
295 LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
296 context->GetDeviceId().c_str(), errCode);
297 return errCode;
298 }
299
SendFinishedRequest(const MultiVerSyncTaskContext * context)300 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
301 {
302 if (context == nullptr) {
303 return;
304 }
305
306 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
307 if (packet == nullptr) {
308 return;
309 }
310
311 packet->SetErrCode(-E_LAST_SYNC_FRAME);
312 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
313 if (message == nullptr) {
314 delete packet;
315 packet = nullptr;
316 return;
317 }
318
319 int errCode = message->SetExternalObject(packet);
320 if (errCode != E_OK) {
321 delete packet;
322 packet = nullptr;
323 delete message;
324 message = nullptr;
325 return;
326 }
327
328 message->SetMessageType(TYPE_REQUEST);
329 message->SetTarget(context->GetDeviceId());
330 message->SetSessionId(context->GetRequestSessionId());
331 message->SetSequenceId(context->GetSequenceId());
332 errCode = Send(message->GetTarget(), message);
333 if (errCode != E_OK) {
334 delete message;
335 message = nullptr;
336 LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
337 }
338 LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
339 }
340
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)341 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
342 {
343 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
344 if (packet == nullptr) {
345 return -E_INVALID_ARGS;
346 }
347
348 len = packet->CalculateLen();
349 return E_OK;
350 }
351
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)352 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
353 {
354 const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
355 if ((packet == nullptr) || (length != packet->CalculateLen())) {
356 return -E_INVALID_ARGS;
357 }
358
359 Parcel parcel(buffer, length);
360 ValueSliceHash valueSliceHash;
361 packet->GetValueSliceHash(valueSliceHash);
362 int32_t ackCode = packet->GetErrCode();
363 // errCode Serialization
364 int32_t errCode = parcel.WriteInt(ackCode);
365 if (errCode != E_OK) {
366 return -E_SECUREC_ERROR;
367 }
368 parcel.EightByteAlign();
369 // commitMap Serialization
370 errCode = parcel.WriteVectorChar(valueSliceHash);
371 if (errCode != E_OK) {
372 return -E_SECUREC_ERROR;
373 }
374
375 return errCode;
376 }
377
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)378 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
379 {
380 Parcel parcel(const_cast<uint8_t *>(buffer), length);
381
382 int ackCode = 0;
383 // errCode DeSerialization
384 uint32_t packLen = parcel.ReadInt(ackCode);
385 parcel.EightByteAlign();
386 if (parcel.IsError()) {
387 return -E_PARSE_FAIL;
388 }
389 packLen = Parcel::GetEightByteAlign(packLen);
390
391 ValueSliceHash valueSliceHash;
392 // commit DeSerialization
393 packLen += parcel.ReadVectorChar(valueSliceHash);
394 if (packLen != length || parcel.IsError()) {
395 return -E_INVALID_ARGS;
396 }
397 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
398 if (packet == nullptr) {
399 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
400 return -E_OUT_OF_MEMORY;
401 }
402
403 packet->SetValueSliceHash(valueSliceHash);
404 int errCode = inMsg->SetExternalObject<>(packet);
405 if (errCode != E_OK) {
406 delete packet;
407 packet = nullptr;
408 }
409 return errCode;
410 }
411
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)412 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
413 {
414 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
415 if (packet == nullptr) {
416 return -E_INVALID_ARGS;
417 }
418 len = packet->CalculateLen();
419 return E_OK;
420 }
421
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)422 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
423 {
424 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
425 return -E_INVALID_ARGS;
426 }
427 const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
428 if ((packet == nullptr) || (length != packet->CalculateLen())) {
429 return -E_INVALID_ARGS;
430 }
431
432 Parcel parcel(buffer, length);
433 ValueSlice valueSlice;
434 packet->GetData(valueSlice);
435 int32_t ackCode = 0;
436 packet->GetErrorCode(ackCode);
437 // errCode Serialization
438 int32_t errCode = parcel.WriteInt(ackCode);
439 if (errCode != E_OK) {
440 return -E_SECUREC_ERROR;
441 }
442 parcel.EightByteAlign();
443
444 // commits vector Serialization
445 errCode = parcel.WriteVectorChar(valueSlice);
446 if (errCode != E_OK) {
447 return -E_SECUREC_ERROR;
448 }
449
450 return errCode;
451 }
452
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)453 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
454 {
455 Parcel parcel(const_cast<uint8_t *>(buffer), length);
456 int32_t ackCode = 0;
457 uint32_t packLen = 0;
458 ValueSlice valueSlice;
459
460 // errCode DeSerialization
461 packLen += parcel.ReadInt(ackCode);
462 parcel.EightByteAlign();
463 if (parcel.IsError()) {
464 return -E_PARSE_FAIL;
465 }
466 packLen = Parcel::GetEightByteAlign(packLen);
467 // valueSlice DeSerialization
468 packLen += parcel.ReadVectorChar(valueSlice);
469 if (packLen != length || parcel.IsError()) {
470 LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
471 packLen, length);
472 return -E_INVALID_ARGS;
473 }
474 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
475 if (packet == nullptr) {
476 LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
477 return -E_OUT_OF_MEMORY;
478 }
479 packet->SetData(valueSlice);
480 packet->SetErrorCode(ackCode);
481 int errCode = inMsg->SetExternalObject<>(packet);
482 if (errCode != E_OK) {
483 delete packet;
484 packet = nullptr;
485 }
486 return errCode;
487 }
488
IsPacketValid(const Message * inMsg,uint16_t messageType)489 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
490 {
491 if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
492 return false;
493 }
494 if (messageType != inMsg->GetMessageType()) {
495 return false;
496 }
497 return true;
498 }
499
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)500 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
501 {
502 int index = context->GetValueSlicesIndex();
503 int valueNodesSize = context->GetValueSlicesSize();
504 if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
505 LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
506 return -E_LENGTH_ERROR;
507 }
508 LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
509 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
510 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
511 index--;
512 }
513 std::vector<ValueSliceHash> valueSliceHashNodes;
514 context->GetValueSliceHashNodes(valueSliceHashNodes);
515 index = (index < 0) ? 0 : index;
516 while (index < valueNodesSize) {
517 if (IsValueSliceExisted(valueSliceHashNodes[index])) {
518 index++;
519 context->SetValueSlicesIndex(index);
520 continue;
521 }
522 valueHashNode = valueSliceHashNodes[index];
523 return E_OK;
524 }
525 return -E_NOT_FOUND;
526 }
527
Send(const DeviceID & deviceId,const Message * inMsg)528 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
529 {
530 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
531 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
532 if (errCode != E_OK) {
533 LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
534 }
535 return errCode;
536 }
537
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)538 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
539 {
540 ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
541 if (packet == nullptr) {
542 LOGE("ValueSliceSync::SendRequestPacket : new packet error");
543 return -E_OUT_OF_MEMORY;
544 }
545
546 packet->SetValueSliceHash(valueSliceHash);
547 Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
548 if (message == nullptr) {
549 delete packet;
550 packet = nullptr;
551 LOGE("ValueSliceSync::SendRequestPacket : new message error");
552 return -E_OUT_OF_MEMORY;
553 }
554
555 int errCode = message->SetExternalObject<>(packet);
556 if (errCode != E_OK) {
557 delete packet;
558 packet = nullptr;
559 delete message;
560 message = nullptr;
561 return errCode;
562 }
563
564 message->SetMessageType(TYPE_REQUEST);
565 message->SetTarget(context->GetDeviceId());
566 message->SetSessionId(context->GetRequestSessionId());
567 message->SetSequenceId(context->GetSequenceId());
568 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
569 if (performance != nullptr) {
570 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
571 }
572 errCode = Send(message->GetTarget(), message);
573 if (errCode != E_OK) {
574 delete message;
575 message = nullptr;
576 }
577 return errCode;
578 }
579
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)580 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
581 int ackCode, const Message *message)
582 {
583 ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
584 if (packet == nullptr) {
585 LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
586 return -E_OUT_OF_MEMORY;
587 }
588
589 Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
590 if (ackMessage == nullptr) {
591 delete packet;
592 packet = nullptr;
593 LOGE("ValueSliceSync::SendAckPacket : new message error");
594 return -E_OUT_OF_MEMORY;
595 }
596
597 packet->SetData(value);
598 packet->SetErrorCode(static_cast<int32_t>(ackCode));
599 int errCode = ackMessage->SetExternalObject<>(packet);
600 if (errCode != E_OK) {
601 delete packet;
602 packet = nullptr;
603 delete ackMessage;
604 ackMessage = nullptr;
605 return errCode;
606 }
607
608 ackMessage->SetMessageType(TYPE_RESPONSE);
609 ackMessage->SetTarget(context->GetDeviceId());
610 ackMessage->SetSequenceId(message->GetSequenceId());
611 ackMessage->SetSessionId(message->GetSessionId());
612 errCode = Send(ackMessage->GetTarget(), ackMessage);
613 if (errCode != E_OK) {
614 delete ackMessage;
615 ackMessage = nullptr;
616 }
617
618 return errCode;
619 }
620
IsValueSliceExisted(const ValueSliceHash & value)621 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
622 {
623 return storagePtr_->IsValueSliceExisted(value);
624 }
625
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)626 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
627 {
628 return storagePtr_->GetValueSlice(hashValue, sliceValue);
629 }
630
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)631 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
632 {
633 return storagePtr_->PutValueSlice(hashValue, sliceValue);
634 }
635 } // namespace DistributedDB
636 #endif
637