1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "value_slice_sync.h"
18 
19 #include "db_constant.h"
20 #include "log_print.h"
21 #include "message_transform.h"
22 #include "parcel.h"
23 #include "performance_analysis.h"
24 #include "sync_types.h"
25 
26 namespace DistributedDB {
27 const int ValueSliceSync::MAX_VALUE_NODE_SIZE = 100000;
28 
29 // Class ValueSliceHashPacket
CalculateLen() const30 uint32_t ValueSliceHashPacket::CalculateLen() const
31 {
32     uint64_t len = Parcel::GetIntLen();
33     len = Parcel::GetEightByteAlign(len);
34     len += Parcel::GetVectorCharLen(valueSliceHash_);
35     if (len > INT32_MAX) {
36         return 0;
37     }
38     return len;
39 }
40 
SetValueSliceHash(ValueSliceHash & hash)41 void ValueSliceHashPacket::SetValueSliceHash(ValueSliceHash &hash)
42 {
43     valueSliceHash_ = std::move(hash);
44 }
45 
GetValueSliceHash(ValueSliceHash & hash) const46 void ValueSliceHashPacket::GetValueSliceHash(ValueSliceHash &hash) const
47 {
48     hash = valueSliceHash_;
49 }
50 
SetErrCode(int32_t errCode)51 void ValueSliceHashPacket::SetErrCode(int32_t errCode)
52 {
53     errCode_ = errCode;
54 }
55 
GetErrCode() const56 int32_t ValueSliceHashPacket::GetErrCode() const
57 {
58     return errCode_;
59 }
60 
61 // Class ValueSlicePacket
CalculateLen() const62 uint32_t ValueSlicePacket::CalculateLen() const
63 {
64     uint64_t len = Parcel::GetIntLen();
65     len = Parcel::GetEightByteAlign(len);
66     len += Parcel::GetVectorCharLen(valueSlice_);
67     if (len > INT32_MAX) {
68         return 0;
69     }
70     return len;
71 }
72 
SetData(const ValueSlice & data)73 void ValueSlicePacket::SetData(const ValueSlice &data)
74 {
75     valueSlice_ = std::move(data);
76 }
77 
GetData(ValueSlice & data) const78 void ValueSlicePacket::GetData(ValueSlice &data) const
79 {
80     data = valueSlice_;
81 }
82 
SetErrorCode(int32_t errCode)83 void ValueSlicePacket::SetErrorCode(int32_t errCode)
84 {
85     errorCode_ = errCode;
86 }
87 
GetErrorCode(int32_t & errCode) const88 void ValueSlicePacket::GetErrorCode(int32_t &errCode) const
89 {
90     errCode = errorCode_;
91 }
92 
93 // Class ValueSliceSync
~ValueSliceSync()94 ValueSliceSync::~ValueSliceSync()
95 {
96     storagePtr_ = nullptr;
97     communicateHandle_ = nullptr;
98 }
99 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int ValueSliceSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103         return -E_INVALID_ARGS;
104     }
105 
106     switch (inMsg->GetMessageType()) {
107         case TYPE_REQUEST:
108             return RequestPacketSerialization(buffer, length, inMsg);
109         case TYPE_RESPONSE:
110             return AckPacketSerialization(buffer, length, inMsg);
111         default:
112             return -E_MESSAGE_TYPE_ERROR;
113     }
114 }
115 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int ValueSliceSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119         return -E_INVALID_ARGS;
120     }
121 
122     switch (inMsg->GetMessageType()) {
123         case TYPE_REQUEST:
124             return RequestPacketDeSerialization(buffer, length, inMsg);
125         case TYPE_RESPONSE:
126             return AckPacketDeSerialization(buffer, length, inMsg);
127         default:
128             return -E_MESSAGE_TYPE_ERROR;
129     }
130 }
131 
CalculateLen(const Message * inMsg)132 uint32_t ValueSliceSync::CalculateLen(const Message *inMsg)
133 {
134     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135         return 0;
136     }
137 
138     uint32_t len = 0;
139     int errCode = E_OK;
140     switch (inMsg->GetMessageType()) {
141         case TYPE_REQUEST:
142             errCode = RequestPacketCalculateLen(inMsg, len);
143             break;
144         case TYPE_RESPONSE:
145             errCode = AckPacketCalculateLen(inMsg, len);
146             break;
147         default:
148             return 0;
149     }
150     if (errCode != E_OK) {
151         return 0;
152     }
153     return len;
154 }
155 
RegisterTransformFunc()156 int ValueSliceSync::RegisterTransformFunc()
157 {
158     TransformFunc func;
159     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
160     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
161         return Serialization(buffer, length, inMsg);
162     };
163     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
164         return DeSerialization(buffer, length, inMsg);
165     };
166     return MessageTransform::RegTransformFunction(VALUE_SLICE_SYNC_MESSAGE, func);
167 }
168 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)169 int ValueSliceSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
170 {
171     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
172         return -E_INVALID_ARGS;
173     }
174     storagePtr_ = storagePtr;
175     communicateHandle_ = communicateHandle;
176     return E_OK;
177 }
178 
SyncStart(MultiVerSyncTaskContext * context)179 int ValueSliceSync::SyncStart(MultiVerSyncTaskContext *context)
180 {
181     if (context == nullptr) {
182         return -E_INVALID_ARGS;
183     }
184     int entriesIndex = context->GetEntriesIndex();
185     int entriesSize = context->GetEntriesSize();
186     if (entriesSize > DBConstant::MAX_ENTRIES_SIZE) {
187         LOGE("ValueSliceSync::entriesSize too large %d", entriesSize);
188         return -E_INVALID_ARGS;
189     }
190     while (entriesIndex < entriesSize) {
191         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
192         if (performance != nullptr) {
193             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
194         }
195         ValueSliceHash valueSliceHashNode;
196         int errCode = GetValidValueSliceHashNode(context, valueSliceHashNode);
197         if (performance != nullptr) {
198             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_VALUE_SLICE_NODE);
199         }
200         LOGD("ValueSliceSync::SyncStart begin errCode = %d", errCode);
201         if (errCode == E_OK) {
202             errCode = SendRequestPacket(context, valueSliceHashNode);
203             LOGD("ValueSliceSync::SyncStart send request packet dst=%s{private}, errCode = %d",
204                 context->GetDeviceId().c_str(), errCode);
205             return errCode;
206         }
207         // move to next entry
208         MultiVerKvEntry *entry = nullptr;
209         entriesIndex++;
210         if (entriesIndex < entriesSize) {
211             LOGD("ValueSliceSync::SyncStart begin entriesIndex = %d, entriesSize = %d", entriesIndex, entriesSize);
212             context->SetEntriesIndex(entriesIndex);
213             context->GetEntry(entriesIndex, entry);
214             std::vector<ValueSliceHash> valueHashes;
215             errCode = entry->GetValueHash(valueHashes);
216             if (errCode != E_OK) {
217                 LOGE("ValueSliceSync::entry->GetValueHash %d", errCode);
218                 return errCode;
219             }
220             context->SetValueSliceHashNodes(valueHashes);
221             context->SetValueSlicesIndex(0);
222             context->SetValueSlicesSize(static_cast<int>(valueHashes.size()));
223         } else {
224             // all entries are received, move to next commit
225             return -E_NOT_FOUND;
226         }
227     }
228     return -E_NOT_FOUND;
229 }
230 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)231 int ValueSliceSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
232 {
233     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
234         return -E_INVALID_ARGS;
235     }
236 
237     const ValueSliceHashPacket *packet = message->GetObject<ValueSliceHashPacket>();
238     if (packet == nullptr) {
239         return -E_INVALID_ARGS;
240     }
241     ValueSliceHash valueSliceHashNode;
242     packet->GetValueSliceHash(valueSliceHashNode);
243     if ((packet->GetErrCode() == -E_LAST_SYNC_FRAME) && valueSliceHashNode.empty()) {
244         return -E_LAST_SYNC_FRAME;
245     }
246     ValueSlice valueSlice;
247     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
248     if (performance != nullptr) {
249         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
250     }
251     int errCode = GetValueSlice(valueSliceHashNode, valueSlice);
252     if (performance != nullptr) {
253         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_READ_VALUE_SLICE);
254     }
255     if (errCode != E_OK) {
256         LOGE("ValueSliceSync::RequestRecvCallback : GetValueSlice ERR, errno = %d", errCode);
257     }
258     errCode = SendAckPacket(context, valueSlice, errCode, message);
259     LOGD("ValueSliceSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}", errCode,
260          context->GetDeviceId().c_str());
261     if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
262         return -E_LAST_SYNC_FRAME;
263     }
264     return errCode;
265 }
266 
AckRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)267 int ValueSliceSync::AckRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
268 {
269     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
270         return -E_INVALID_ARGS;
271     }
272 
273     const ValueSlicePacket *packet = message->GetObject<ValueSlicePacket>();
274     if (packet == nullptr) {
275         return -E_INVALID_ARGS;
276     }
277     int errCode = E_OK;
278     packet->GetErrorCode(errCode);
279     ValueSlice valueSlice;
280     packet->GetData(valueSlice);
281     if (errCode != E_OK) {
282         return errCode;
283     }
284     int index = context->GetValueSlicesIndex();
285     ValueSliceHash hashValue;
286     context->GetValueSliceHashNode(index, hashValue);
287     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
288     if (performance != nullptr) {
289         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
290     }
291     errCode = PutValueSlice(hashValue, valueSlice);
292     if (performance != nullptr) {
293         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SAVE_VALUE_SLICE);
294     }
295     LOGD("ValueSliceSync::AckRecvCallback PutValueSlice finished, src=%s{private}, errCode = %d",
296         context->GetDeviceId().c_str(), errCode);
297     return errCode;
298 }
299 
SendFinishedRequest(const MultiVerSyncTaskContext * context)300 void ValueSliceSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
301 {
302     if (context == nullptr) {
303         return;
304     }
305 
306     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
307     if (packet == nullptr) {
308         return;
309     }
310 
311     packet->SetErrCode(-E_LAST_SYNC_FRAME);
312     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
313     if (message == nullptr) {
314         delete packet;
315         packet = nullptr;
316         return;
317     }
318 
319     int errCode = message->SetExternalObject(packet);
320     if (errCode != E_OK) {
321         delete packet;
322         packet = nullptr;
323         delete message;
324         message = nullptr;
325         return;
326     }
327 
328     message->SetMessageType(TYPE_REQUEST);
329     message->SetTarget(context->GetDeviceId());
330     message->SetSessionId(context->GetRequestSessionId());
331     message->SetSequenceId(context->GetSequenceId());
332     errCode = Send(message->GetTarget(), message);
333     if (errCode != E_OK) {
334         delete message;
335         message = nullptr;
336         LOGE("[ValueSliceSync][SendRequestPacket] SendRequestPacket failed, err %d", errCode);
337     }
338     LOGI("[ValueSliceSync][SendRequestPacket] SendRequestPacket dst=%s{private}", context->GetDeviceId().c_str());
339 }
340 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)341 int ValueSliceSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
342 {
343     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
344     if (packet == nullptr) {
345         return -E_INVALID_ARGS;
346     }
347 
348     len = packet->CalculateLen();
349     return E_OK;
350 }
351 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)352 int ValueSliceSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
353 {
354     const ValueSliceHashPacket *packet = inMsg->GetObject<ValueSliceHashPacket>();
355     if ((packet == nullptr) || (length != packet->CalculateLen())) {
356         return -E_INVALID_ARGS;
357     }
358 
359     Parcel parcel(buffer, length);
360     ValueSliceHash valueSliceHash;
361     packet->GetValueSliceHash(valueSliceHash);
362     int32_t ackCode = packet->GetErrCode();
363     // errCode Serialization
364     int32_t errCode = parcel.WriteInt(ackCode);
365     if (errCode != E_OK) {
366         return -E_SECUREC_ERROR;
367     }
368     parcel.EightByteAlign();
369     // commitMap Serialization
370     errCode = parcel.WriteVectorChar(valueSliceHash);
371     if (errCode != E_OK) {
372         return -E_SECUREC_ERROR;
373     }
374 
375     return errCode;
376 }
377 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)378 int ValueSliceSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
379 {
380     Parcel parcel(const_cast<uint8_t *>(buffer), length);
381 
382     int ackCode = 0;
383     // errCode DeSerialization
384     uint32_t packLen = parcel.ReadInt(ackCode);
385     parcel.EightByteAlign();
386     if (parcel.IsError()) {
387         return -E_PARSE_FAIL;
388     }
389     packLen = Parcel::GetEightByteAlign(packLen);
390 
391     ValueSliceHash valueSliceHash;
392     // commit DeSerialization
393     packLen += parcel.ReadVectorChar(valueSliceHash);
394     if (packLen != length || parcel.IsError()) {
395         return -E_INVALID_ARGS;
396     }
397     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
398     if (packet == nullptr) {
399         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
400         return -E_OUT_OF_MEMORY;
401     }
402 
403     packet->SetValueSliceHash(valueSliceHash);
404     int errCode = inMsg->SetExternalObject<>(packet);
405     if (errCode != E_OK) {
406         delete packet;
407         packet = nullptr;
408     }
409     return errCode;
410 }
411 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)412 int ValueSliceSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
413 {
414     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
415     if (packet == nullptr) {
416         return -E_INVALID_ARGS;
417     }
418     len = packet->CalculateLen();
419     return E_OK;
420 }
421 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)422 int ValueSliceSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
423 {
424     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
425         return -E_INVALID_ARGS;
426     }
427     const ValueSlicePacket *packet = inMsg->GetObject<ValueSlicePacket>();
428     if ((packet == nullptr) || (length != packet->CalculateLen())) {
429         return -E_INVALID_ARGS;
430     }
431 
432     Parcel parcel(buffer, length);
433     ValueSlice valueSlice;
434     packet->GetData(valueSlice);
435     int32_t ackCode = 0;
436     packet->GetErrorCode(ackCode);
437     // errCode Serialization
438     int32_t errCode = parcel.WriteInt(ackCode);
439     if (errCode != E_OK) {
440         return -E_SECUREC_ERROR;
441     }
442     parcel.EightByteAlign();
443 
444     // commits vector Serialization
445     errCode = parcel.WriteVectorChar(valueSlice);
446     if (errCode != E_OK) {
447         return -E_SECUREC_ERROR;
448     }
449 
450     return errCode;
451 }
452 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)453 int ValueSliceSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
454 {
455     Parcel parcel(const_cast<uint8_t *>(buffer), length);
456     int32_t ackCode = 0;
457     uint32_t packLen = 0;
458     ValueSlice valueSlice;
459 
460     // errCode DeSerialization
461     packLen += parcel.ReadInt(ackCode);
462     parcel.EightByteAlign();
463     if (parcel.IsError()) {
464         return -E_PARSE_FAIL;
465     }
466     packLen = Parcel::GetEightByteAlign(packLen);
467     // valueSlice DeSerialization
468     packLen += parcel.ReadVectorChar(valueSlice);
469     if (packLen != length || parcel.IsError()) {
470         LOGE("ValueSliceSync::AckPacketSerialization data error, packLen = %" PRIu32 ", length = %" PRIu32,
471             packLen, length);
472         return -E_INVALID_ARGS;
473     }
474     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
475     if (packet == nullptr) {
476         LOGE("ValueSliceSync::AckPacketDeSerialization : new packet error");
477         return -E_OUT_OF_MEMORY;
478     }
479     packet->SetData(valueSlice);
480     packet->SetErrorCode(ackCode);
481     int errCode = inMsg->SetExternalObject<>(packet);
482     if (errCode != E_OK) {
483         delete packet;
484         packet = nullptr;
485     }
486     return errCode;
487 }
488 
IsPacketValid(const Message * inMsg,uint16_t messageType)489 bool ValueSliceSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
490 {
491     if ((inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
492         return false;
493     }
494     if (messageType != inMsg->GetMessageType()) {
495         return false;
496     }
497     return true;
498 }
499 
GetValidValueSliceHashNode(MultiVerSyncTaskContext * context,ValueSliceHash & valueHashNode)500 int ValueSliceSync::GetValidValueSliceHashNode(MultiVerSyncTaskContext *context, ValueSliceHash &valueHashNode)
501 {
502     int index = context->GetValueSlicesIndex();
503     int valueNodesSize = context->GetValueSlicesSize();
504     if (valueNodesSize > MAX_VALUE_NODE_SIZE) {
505         LOGD("ValueSliceSync::GetValidValueSliceHashNode failed, too large!");
506         return -E_LENGTH_ERROR;
507     }
508     LOGD("ValueSliceSync::GetValidValueSliceHashNode ValueSlicesSize = %d", valueNodesSize);
509     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
510         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
511         index--;
512     }
513     std::vector<ValueSliceHash> valueSliceHashNodes;
514     context->GetValueSliceHashNodes(valueSliceHashNodes);
515     index = (index < 0) ? 0 : index;
516     while (index < valueNodesSize) {
517         if (IsValueSliceExisted(valueSliceHashNodes[index])) {
518             index++;
519             context->SetValueSlicesIndex(index);
520             continue;
521         }
522         valueHashNode = valueSliceHashNodes[index];
523         return E_OK;
524     }
525     return -E_NOT_FOUND;
526 }
527 
Send(const DeviceID & deviceId,const Message * inMsg)528 int ValueSliceSync::Send(const DeviceID &deviceId, const Message *inMsg)
529 {
530     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
531     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
532     if (errCode != E_OK) {
533         LOGE("ValueSliceSync::Send ERR! err = %d", errCode);
534     }
535     return errCode;
536 }
537 
SendRequestPacket(const MultiVerSyncTaskContext * context,ValueSliceHash & valueSliceHash)538 int ValueSliceSync::SendRequestPacket(const MultiVerSyncTaskContext *context, ValueSliceHash &valueSliceHash)
539 {
540     ValueSliceHashPacket *packet = new (std::nothrow) ValueSliceHashPacket();
541     if (packet == nullptr) {
542         LOGE("ValueSliceSync::SendRequestPacket : new packet error");
543         return -E_OUT_OF_MEMORY;
544     }
545 
546     packet->SetValueSliceHash(valueSliceHash);
547     Message *message = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
548     if (message == nullptr) {
549         delete packet;
550         packet = nullptr;
551         LOGE("ValueSliceSync::SendRequestPacket : new message error");
552         return -E_OUT_OF_MEMORY;
553     }
554 
555     int errCode = message->SetExternalObject<>(packet);
556     if (errCode != E_OK) {
557         delete packet;
558         packet = nullptr;
559         delete message;
560         message = nullptr;
561         return errCode;
562     }
563 
564     message->SetMessageType(TYPE_REQUEST);
565     message->SetTarget(context->GetDeviceId());
566     message->SetSessionId(context->GetRequestSessionId());
567     message->SetSequenceId(context->GetSequenceId());
568     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
569     if (performance != nullptr) {
570         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
571     }
572     errCode = Send(message->GetTarget(), message);
573     if (errCode != E_OK) {
574         delete message;
575         message = nullptr;
576     }
577     return errCode;
578 }
579 
SendAckPacket(const MultiVerSyncTaskContext * context,const ValueSlice & value,int ackCode,const Message * message)580 int ValueSliceSync::SendAckPacket(const MultiVerSyncTaskContext *context, const ValueSlice &value,
581     int ackCode, const Message *message)
582 {
583     ValueSlicePacket *packet = new (std::nothrow) ValueSlicePacket();
584     if (packet == nullptr) {
585         LOGE("ValueSliceSync::SendAckPacket : packet is nullptr");
586         return -E_OUT_OF_MEMORY;
587     }
588 
589     Message *ackMessage = new (std::nothrow) Message(VALUE_SLICE_SYNC_MESSAGE);
590     if (ackMessage == nullptr) {
591         delete packet;
592         packet = nullptr;
593         LOGE("ValueSliceSync::SendAckPacket : new message error");
594         return -E_OUT_OF_MEMORY;
595     }
596 
597     packet->SetData(value);
598     packet->SetErrorCode(static_cast<int32_t>(ackCode));
599     int errCode = ackMessage->SetExternalObject<>(packet);
600     if (errCode != E_OK) {
601         delete packet;
602         packet = nullptr;
603         delete ackMessage;
604         ackMessage = nullptr;
605         return errCode;
606     }
607 
608     ackMessage->SetMessageType(TYPE_RESPONSE);
609     ackMessage->SetTarget(context->GetDeviceId());
610     ackMessage->SetSequenceId(message->GetSequenceId());
611     ackMessage->SetSessionId(message->GetSessionId());
612     errCode = Send(ackMessage->GetTarget(), ackMessage);
613     if (errCode != E_OK) {
614         delete ackMessage;
615         ackMessage = nullptr;
616     }
617 
618     return errCode;
619 }
620 
IsValueSliceExisted(const ValueSliceHash & value)621 bool ValueSliceSync::IsValueSliceExisted(const ValueSliceHash &value)
622 {
623     return storagePtr_->IsValueSliceExisted(value);
624 }
625 
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue)626 int ValueSliceSync::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue)
627 {
628     return storagePtr_->GetValueSlice(hashValue, sliceValue);
629 }
630 
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue)631 int ValueSliceSync::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue)
632 {
633     return storagePtr_->PutValueSlice(hashValue, sliceValue);
634 }
635 } // namespace DistributedDB
636 #endif
637