1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "commit_history_sync.h"
18 
19 #include "sync_engine.h"
20 #include "parcel.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25 
26 namespace DistributedDB {
27 // Class CommitHistorySyncRequestPacket
CalculateLen() const28 uint32_t CommitHistorySyncRequestPacket::CalculateLen() const
29 {
30     uint64_t len = Parcel::GetUInt64Len();
31     // commitMap len
32     for (const auto &iter : commitMap_) {
33         len += Parcel::GetStringLen(iter.first);
34         len += Parcel::GetMultiVerCommitLen(iter.second);
35         if (len > INT32_MAX) {
36             return 0;
37         }
38     }
39     len += Parcel::GetUInt32Len(); // version
40     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
41     len = Parcel::GetEightByteAlign(len);
42     if (len > INT32_MAX) {
43         return 0;
44     }
45     return len;
46 }
47 
SetCommitMap(std::map<std::string,MultiVerCommitNode> & inMap)48 void CommitHistorySyncRequestPacket::SetCommitMap(std::map<std::string, MultiVerCommitNode> &inMap)
49 {
50     commitMap_ = std::move(inMap);
51 }
52 
GetCommitMap(std::map<std::string,MultiVerCommitNode> & outMap) const53 void CommitHistorySyncRequestPacket::GetCommitMap(std::map<std::string, MultiVerCommitNode> &outMap) const
54 {
55     outMap = commitMap_;
56 }
57 
SetVersion(uint32_t version)58 void CommitHistorySyncRequestPacket::SetVersion(uint32_t version)
59 {
60     version_ = version;
61 }
62 
GetVersion() const63 uint32_t CommitHistorySyncRequestPacket::GetVersion() const
64 {
65     return version_;
66 }
67 
SetReserved(std::vector<uint64_t> & reserved)68 void CommitHistorySyncRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
69 {
70     reserved_ = std::move(reserved);
71 }
72 
GetReserved() const73 std::vector<uint64_t> CommitHistorySyncRequestPacket::GetReserved() const
74 {
75     return reserved_;
76 }
77 
CalculateLen() const78 uint32_t CommitHistorySyncAckPacket::CalculateLen() const
79 {
80     uint64_t len = Parcel::GetIntLen(); // errCode
81     len += Parcel::GetUInt32Len(); // version
82     len = Parcel::GetEightByteAlign(len);
83 
84     // commits vector len
85     len += Parcel::GetMultiVerCommitsLen(commits_);
86     len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
87     len = Parcel::GetEightByteAlign(len);
88     if (len > INT32_MAX) {
89         return 0;
90     }
91     return len;
92 }
93 
SetData(std::vector<MultiVerCommitNode> & inData)94 void CommitHistorySyncAckPacket::SetData(std::vector<MultiVerCommitNode> &inData)
95 {
96     commits_ = std::move(inData);
97 }
98 
GetData(std::vector<MultiVerCommitNode> & outData) const99 void CommitHistorySyncAckPacket::GetData(std::vector<MultiVerCommitNode> &outData) const
100 {
101     outData = commits_;
102 }
103 
SetErrorCode(int32_t errCode)104 void CommitHistorySyncAckPacket::SetErrorCode(int32_t errCode)
105 {
106     errorCode_ = errCode;
107 }
108 
GetErrorCode(int32_t & errCode) const109 void CommitHistorySyncAckPacket::GetErrorCode(int32_t &errCode) const
110 {
111     errCode = errorCode_;
112 }
113 
SetVersion(uint32_t version)114 void CommitHistorySyncAckPacket::SetVersion(uint32_t version)
115 {
116     version_ = version;
117 }
118 
GetVersion() const119 uint32_t CommitHistorySyncAckPacket::GetVersion() const
120 {
121     return version_;
122 }
123 
SetReserved(std::vector<uint64_t> & reserved)124 void CommitHistorySyncAckPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126     reserved_ = std::move(reserved);
127 }
128 
GetReserved() const129 std::vector<uint64_t> CommitHistorySyncAckPacket::GetReserved() const
130 {
131     return reserved_;
132 }
133 
134 // Class CommitHistorySync
~CommitHistorySync()135 CommitHistorySync::~CommitHistorySync()
136 {
137     storagePtr_ = nullptr;
138     communicateHandle_ = nullptr;
139 }
140 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)141 int CommitHistorySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
142 {
143     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
144         return -E_MESSAGE_ID_ERROR;
145     }
146 
147     switch (inMsg->GetMessageType()) {
148         case TYPE_REQUEST:
149             return RequestPacketSerialization(buffer, length, inMsg);
150         case TYPE_RESPONSE:
151             return AckPacketSerialization(buffer, length, inMsg);
152         default:
153             return -E_MESSAGE_TYPE_ERROR;
154     }
155 }
156 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)157 int CommitHistorySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
158 {
159     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
160         return -E_MESSAGE_ID_ERROR;
161     }
162 
163     switch (inMsg->GetMessageType()) {
164         case TYPE_REQUEST:
165             return RequestPacketDeSerialization(buffer, length, inMsg);
166         case TYPE_RESPONSE:
167             return AckPacketDeSerialization(buffer, length, inMsg);
168         default:
169             return -E_MESSAGE_TYPE_ERROR;
170     }
171 }
172 
CalculateLen(const Message * inMsg)173 uint32_t CommitHistorySync::CalculateLen(const Message *inMsg)
174 {
175     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
176         return 0;
177     }
178 
179     uint32_t len = 0;
180     int errCode = E_OK;
181     switch (inMsg->GetMessageType()) {
182         case TYPE_REQUEST:
183             errCode = RequestPacketCalculateLen(inMsg, len);
184             if (errCode != E_OK) {
185                 return 0;
186             }
187             return len;
188         case TYPE_RESPONSE:
189             errCode = AckPacketCalculateLen(inMsg, len);
190             if (errCode != E_OK) {
191                 return 0;
192             }
193             return len;
194         default:
195             return 0;
196     }
197 }
198 
RegisterTransformFunc()199 int CommitHistorySync::RegisterTransformFunc()
200 {
201     TransformFunc func;
202     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
203     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
204         return Serialization(buffer, length, inMsg);
205     };
206     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
207         return DeSerialization(buffer, length, inMsg);
208     };
209     return MessageTransform::RegTransformFunction(COMMIT_HISTORY_SYNC_MESSAGE, func);
210 }
211 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)212 int CommitHistorySync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
213 {
214     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
215         return -E_INVALID_ARGS;
216     }
217     storagePtr_ = storagePtr;
218     communicateHandle_ = communicateHandle;
219     return E_OK;
220 }
221 
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const222 void CommitHistorySync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
223 {
224     (void)context;
225     (void)message;
226     return;
227 }
228 
SyncStart(MultiVerSyncTaskContext * context)229 int CommitHistorySync::SyncStart(MultiVerSyncTaskContext *context)
230 {
231     if (context == nullptr) {
232         return -E_INVALID_ARGS;
233     }
234     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
235     if (performance != nullptr) {
236         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
237     }
238     std::map<std::string, MultiVerCommitNode> commitMap;
239     int errCode = GetDeviceLatestCommit(commitMap);
240     if (performance != nullptr) {
241         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
242     }
243     if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
244         return errCode;
245     }
246 
247     LOGD("CommitHistorySync::commitMap size = %zu, dst=%s{private}", commitMap.size(), context->GetDeviceId().c_str());
248     return SendRequestPacket(context, commitMap);
249 }
250 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)251 int CommitHistorySync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
252 {
253     if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
254         return -E_INVALID_ARGS;
255     }
256     const CommitHistorySyncRequestPacket *packet = message->GetObject<CommitHistorySyncRequestPacket>();
257     if (packet == nullptr) {
258         return -E_INVALID_ARGS;
259     }
260     std::vector<MultiVerCommitNode> commits;
261     int errCode = RunPermissionCheck(context->GetDeviceId());
262     if (errCode == -E_NOT_PERMIT) {
263         LOGE("CommitHistorySync::RequestRecvCallback RunPermissionCheck not pass");
264         SendAckPacket(context, commits, errCode, message);
265         return errCode;
266     }
267     std::map<std::string, MultiVerCommitNode> commitMap;
268     packet->GetCommitMap(commitMap);
269     uint32_t ver = packet->GetVersion();
270     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
271     if (performance != nullptr) {
272         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
273     }
274     errCode = GetCommitTree(commitMap, commits);
275     if (performance != nullptr) {
276         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
277     }
278     if (errCode != E_OK) {
279         LOGE("CommitHistorySync::RequestRecvCallback : GetCommitTree ERR, errno = %d", errCode);
280     }
281 
282     errCode = SendAckPacket(context, commits, errCode, message);
283     LOGD("CommitHistorySync::RequestRecvCallback:SendAckPacket, errno = %d, dst=%s{private}, ver = %" PRIu32
284         ", myversion = %" PRIu32, errCode, context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
285     if (errCode == E_OK) {
286         if (commitMap.empty()) {
287             LOGD("[CommitHistorySync][RequestRecvCallback] no need to start SyncResponse");
288             return -E_NOT_FOUND;
289         }
290     }
291     return errCode;
292 }
293 
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)294 int CommitHistorySync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
295 {
296     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
297         return -E_INVALID_ARGS;
298     }
299 
300     std::vector<MultiVerCommitNode> commits;
301     int32_t errCode;
302 
303     const CommitHistorySyncAckPacket *packet = message->GetObject<CommitHistorySyncAckPacket>();
304     if (packet == nullptr) {
305         return -E_INVALID_ARGS;
306     }
307     packet->GetErrorCode(errCode);
308     if (errCode == -E_NOT_PERMIT) {
309         LOGE("CommitHistorySync::AckRecvCallback RunPermissionCheck not pass");
310         return errCode;
311     }
312     packet->GetData(commits);
313     uint32_t ver = packet->GetVersion();
314     context->SetCommits(commits);
315     context->SetCommitIndex(0);
316     context->SetCommitsSize(static_cast<int>(commits.size()));
317     LOGD("CommitHistorySync::AckRecvCallback end, CommitsSize = %zu, dst = %s{private}, ver = %d, myversion = %u",
318         commits.size(), context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
319     return E_OK;
320 }
321 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)322 int CommitHistorySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
323 {
324     if (inMsg == nullptr) {
325         return -E_INVALID_ARGS;
326     }
327     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
328     if (packet == nullptr) {
329         return -E_INVALID_ARGS;
330     }
331 
332     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_REQUEST)) {
333         return -E_INVALID_ARGS;
334     }
335     len = packet->CalculateLen();
336     return E_OK;
337 }
338 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)339 int CommitHistorySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
340 {
341     if ((buffer == nullptr) || (inMsg == nullptr)) {
342         return -E_INVALID_ARGS;
343     }
344     const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
345     if ((packet == nullptr) || (length != packet->CalculateLen())) {
346         return -E_INVALID_ARGS;
347     }
348 
349     Parcel parcel(buffer, length);
350     std::map<std::string, MultiVerCommitNode> commitMap;
351     packet->GetCommitMap(commitMap);
352 
353     int errCode = parcel.WriteUInt64(commitMap.size());
354     if (errCode != E_OK) {
355         return -E_SECUREC_ERROR;
356     }
357     // commitMap Serialization
358     for (auto &iter : commitMap) {
359         errCode = parcel.WriteString(iter.first);
360         if (errCode != E_OK) {
361             return -E_SECUREC_ERROR;
362         }
363         errCode = parcel.WriteMultiVerCommit(iter.second);
364         if (errCode != E_OK) {
365             return -E_SECUREC_ERROR;
366         }
367     }
368     errCode = parcel.WriteUInt32(packet->GetVersion());
369     if (errCode != E_OK) {
370         return -E_SECUREC_ERROR;
371     }
372     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
373     if (errCode != E_OK) {
374         return -E_SECUREC_ERROR;
375     }
376     parcel.EightByteAlign();
377     if (parcel.IsError()) { // almost success
378         return -E_INVALID_ARGS;
379     }
380     return errCode;
381 }
382 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)383 int CommitHistorySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
384 {
385     if ((buffer == nullptr) || (inMsg == nullptr)) {
386         return -E_INVALID_ARGS;
387     }
388 
389     uint64_t packLen = 0;
390     uint64_t len = 0;
391     Parcel parcel(const_cast<uint8_t *>(buffer), length);
392     packLen += parcel.ReadUInt64(len);
393     if (len > DBConstant::MAX_DEVICES_SIZE) {
394         LOGE("CommitHistorySync::RequestPacketDeSerialization : commitMap size too large = %" PRIu64, len);
395         return -E_INVALID_ARGS;
396     }
397     // commitMap DeSerialization
398     std::map<std::string, MultiVerCommitNode> commitMap;
399     while (len > 0) {
400         std::string key;
401         MultiVerCommitNode val;
402         packLen += parcel.ReadString(key);
403         packLen += parcel.ReadMultiVerCommit(val);
404         commitMap[key] = val;
405         len--;
406         if (parcel.IsError()) {
407             return -E_INVALID_ARGS;
408         }
409     }
410     uint32_t version;
411     std::vector<uint64_t> reserved;
412     packLen += parcel.ReadUInt32(version);
413     packLen += parcel.ReadVector<uint64_t>(reserved);
414     packLen = Parcel::GetEightByteAlign(packLen);
415     if (packLen != length || parcel.IsError()) {
416         LOGE("CommitHistorySync::RequestPacketDeSerialization : length error, input len = %" PRIu32
417             ", cac len = %" PRIu64, length, packLen);
418         return -E_INVALID_ARGS;
419     }
420     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
421     if (packet == nullptr) {
422         LOGE("CommitHistorySync::RequestPacketDeSerialization : new packet error");
423         return -E_OUT_OF_MEMORY;
424     }
425     packet->SetCommitMap(commitMap);
426     packet->SetVersion(version);
427     packet->SetReserved(reserved);
428     int errCode = inMsg->SetExternalObject<>(packet);
429     if (errCode != E_OK) {
430         delete packet;
431         packet = nullptr;
432     }
433     return errCode;
434 }
435 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)436 int CommitHistorySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
437 {
438     if (inMsg == nullptr) {
439         return -E_INVALID_ARGS;
440     }
441     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
442     if (packet == nullptr) {
443         return -E_INVALID_ARGS;
444     }
445 
446     if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_RESPONSE)) {
447         return -E_INVALID_ARGS;
448     }
449     len = packet->CalculateLen();
450     return E_OK;
451 }
452 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)453 int CommitHistorySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
454 {
455     if ((buffer == nullptr) || (inMsg == nullptr)) {
456         return -E_INVALID_ARGS;
457     }
458     const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
459     if ((packet == nullptr) || (length != packet->CalculateLen())) {
460         return -E_INVALID_ARGS;
461     }
462 
463     Parcel parcel(buffer, length);
464     int32_t ackErrCode;
465     std::vector<MultiVerCommitNode> commits;
466 
467     packet->GetData(commits);
468     packet->GetErrorCode(ackErrCode);
469     // errCode Serialization
470     parcel.WriteInt(ackErrCode);
471     parcel.WriteUInt32(packet->GetVersion());
472     parcel.EightByteAlign();
473     if (parcel.IsError()) { // almost success
474         return -E_INVALID_ARGS;
475     }
476     // commits vector Serialization
477     int errCode = parcel.WriteMultiVerCommits(commits);
478     if (errCode != E_OK) {
479         return -E_SECUREC_ERROR;
480     }
481     errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
482     if (errCode != E_OK) {
483         return -E_SECUREC_ERROR;
484     }
485     parcel.EightByteAlign();
486     if (parcel.IsError()) { // almost success
487         return -E_INVALID_ARGS;
488     }
489     return errCode;
490 }
491 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)492 int CommitHistorySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
493 {
494     std::vector<MultiVerCommitNode> commits;
495     uint32_t packLen = 0;
496     Parcel parcel(const_cast<uint8_t *>(buffer), length);
497     int32_t pktErrCode;
498     uint32_t version;
499     std::vector<uint64_t> reserved;
500 
501     // errCode DeSerialization
502     packLen += parcel.ReadInt(pktErrCode);
503     packLen += parcel.ReadUInt32(version);
504     parcel.EightByteAlign();
505     if (parcel.IsError()) {
506         return -E_PARSE_FAIL;
507     }
508     packLen = Parcel::GetEightByteAlign(packLen);
509     // commits vector DeSerialization
510     packLen += parcel.ReadMultiVerCommits(commits);
511     packLen += parcel.ReadVector<uint64_t>(reserved);
512     packLen = Parcel::GetEightByteAlign(packLen);
513     if (packLen != length || parcel.IsError()) {
514         LOGE("CommitHistorySync::AckPacketDeSerialization : packet len error, input len = %u, cal len = %u",
515             length, packLen);
516         return -E_INVALID_ARGS;
517     }
518     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
519     if (packet == nullptr) {
520         LOGE("CommitHistorySync::AckPacketDeSerialization : new packet error");
521         return -E_OUT_OF_MEMORY;
522     }
523     packet->SetData(commits);
524     packet->SetErrorCode(pktErrCode);
525     packet->SetVersion(version);
526     packet->SetReserved(reserved);
527     int errCode = inMsg->SetExternalObject<>(packet);
528     if (errCode != E_OK) {
529         delete packet;
530         packet = nullptr;
531     }
532     return errCode;
533 }
534 
IsPacketValid(const Message * inMsg,uint16_t messageType)535 bool CommitHistorySync::IsPacketValid(const Message *inMsg, uint16_t messageType)
536 {
537     if ((inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
538         return false;
539     }
540     if (messageType != inMsg->GetMessageType()) {
541         return false;
542     }
543     return true;
544 }
545 
Send(const DeviceID & deviceId,const Message * inMsg)546 int CommitHistorySync::Send(const DeviceID &deviceId, const Message *inMsg)
547 {
548     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
549     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
550     if (errCode != E_OK) {
551         LOGE("CommitHistorySync::Send ERR! err = %d", errCode);
552     }
553     return errCode;
554 }
555 
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap)556 int CommitHistorySync::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap)
557 {
558     std::map<std::string, MultiVerCommitNode> readCommitMap;
559     int errCode = storagePtr_->GetDeviceLatestCommit(readCommitMap);
560     if (errCode != E_OK) {
561         return errCode;
562     }
563 
564     std::string localDevice;
565     errCode = GetLocalDeviceInfo(localDevice);
566     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
567     if (errCode != E_OK) {
568         return errCode;
569     }
570 
571     for (auto &item : readCommitMap) {
572         errCode = storagePtr_->TransferSyncCommitDevInfo(item.second, localDevice, false);
573         if (errCode != E_OK) {
574             break;
575         }
576         commitMap.insert(std::make_pair(item.second.deviceInfo, item.second));
577     }
578 
579     return errCode;
580 }
581 
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits)582 int CommitHistorySync::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
583     std::vector<MultiVerCommitNode> &commits)
584 {
585     std::map<std::string, MultiVerCommitNode> newCommitMap;
586 
587     std::string localDevice;
588     int errCode = GetLocalDeviceInfo(localDevice);
589     LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
590     if (errCode != E_OK) {
591         return errCode;
592     }
593 
594     for (const auto &item : commitMap) {
595         MultiVerCommitNode commitNode = item.second;
596         errCode = storagePtr_->TransferSyncCommitDevInfo(commitNode, localDevice, true);
597         if (errCode != E_OK) {
598             return errCode;
599         }
600         newCommitMap.insert(std::make_pair(commitNode.deviceInfo, commitNode));
601     }
602 
603     errCode = storagePtr_->GetCommitTree(newCommitMap, commits);
604     if (errCode != E_OK) {
605         return errCode;
606     }
607     for (auto &commit : commits) {
608         errCode = storagePtr_->TransferSyncCommitDevInfo(commit, localDevice, false);
609         if (errCode != E_OK) {
610             break;
611         }
612     }
613     return errCode;
614 }
615 
SendRequestPacket(const MultiVerSyncTaskContext * context,std::map<std::string,MultiVerCommitNode> & commitMap)616 int CommitHistorySync::SendRequestPacket(const MultiVerSyncTaskContext *context,
617     std::map<std::string, MultiVerCommitNode> &commitMap)
618 {
619     CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
620     if (packet == nullptr) {
621         LOGE("CommitHistorySync::SendRequestPacket : new packet error");
622         return -E_OUT_OF_MEMORY;
623     }
624     packet->SetCommitMap(commitMap);
625     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
626     Message *message = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
627     if (message == nullptr) {
628         LOGE("CommitHistorySync::SendRequestPacket : new message error");
629         delete packet;
630         packet = nullptr;
631         return -E_OUT_OF_MEMORY;
632     }
633     message->SetMessageType(TYPE_REQUEST);
634     message->SetTarget(context->GetDeviceId());
635     int errCode = message->SetExternalObject(packet);
636     if (errCode != E_OK) {
637         delete packet;
638         packet = nullptr;
639         delete message;
640         message = nullptr;
641         LOGE("CommitHistorySync::SendRequestPacket : SetExternalObject failed errCode:%d", errCode);
642         return errCode;
643     }
644     message->SetSessionId(context->GetRequestSessionId());
645     message->SetSequenceId(context->GetSequenceId());
646 
647     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
648     if (performance != nullptr) {
649         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
650     }
651     errCode = Send(message->GetTarget(), message);
652     if (errCode != E_OK) {
653         LOGE("CommitHistorySync::SendRequestPacket : Send failed errCode:%d", errCode);
654         delete message;
655         message = nullptr;
656     }
657     return errCode;
658 }
659 
SendAckPacket(const MultiVerSyncTaskContext * context,std::vector<MultiVerCommitNode> & commits,int ackCode,const Message * message)660 int CommitHistorySync::SendAckPacket(const MultiVerSyncTaskContext *context,
661     std::vector<MultiVerCommitNode> &commits, int ackCode, const Message *message)
662 {
663     if (message == nullptr) {
664         LOGE("CommitHistorySync::SendAckPacket : message is nullptr");
665         return -E_INVALID_ARGS;
666     }
667     CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
668     if (packet == nullptr) {
669         LOGE("CommitHistorySync::SendAckPacket : packet is nullptr");
670         return -E_OUT_OF_MEMORY;
671     }
672     Message *ackMessage = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
673     if (ackMessage == nullptr) {
674         LOGE("CommitHistorySync::SendAckPacket : new message error");
675         delete packet;
676         packet = nullptr;
677         return -E_OUT_OF_MEMORY;
678     }
679 
680     packet->SetData(commits);
681     packet->SetErrorCode(static_cast<int32_t>(ackCode));
682     packet->SetVersion(SOFTWARE_VERSION_CURRENT);
683     ackMessage->SetMessageType(TYPE_RESPONSE);
684     ackMessage->SetTarget(context->GetDeviceId());
685     int errCode = ackMessage->SetExternalObject(packet);
686     if (errCode != E_OK) {
687         delete packet;
688         packet = nullptr;
689         delete ackMessage;
690         ackMessage = nullptr;
691         LOGE("CommitHistorySync::SendAckPacket : SetExternalObject failed errCode:%d", errCode);
692         return errCode;
693     }
694     ackMessage->SetSequenceId(message->GetSequenceId());
695     ackMessage->SetSessionId(message->GetSessionId());
696     errCode = Send(ackMessage->GetTarget(), ackMessage);
697     if (errCode != E_OK) {
698         LOGE("CommitHistorySync::SendAckPacket : Send failed errCode:%d", errCode);
699         delete ackMessage;
700         ackMessage = nullptr;
701     }
702     return errCode;
703 }
704 
GetLocalDeviceInfo(std::string & deviceInfo)705 int CommitHistorySync::GetLocalDeviceInfo(std::string &deviceInfo)
706 {
707     return communicateHandle_->GetLocalIdentity(deviceInfo);
708 }
709 
RunPermissionCheck(const std::string & deviceId) const710 int CommitHistorySync::RunPermissionCheck(const std::string &deviceId) const
711 {
712     std::string appId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
713     std::string userId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
714     std::string storeId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
715     uint8_t flag = CHECK_FLAG_SEND;
716     PermissionCheckParam param = { userId, appId, storeId, deviceId };
717     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
718     if (errCode != E_OK) {
719         LOGE("[CommitHistorySync] RunPermissionCheck not pass errCode:%d, flag:%d", errCode, flag);
720         return -E_NOT_PERMIT;
721     }
722     return errCode;
723 }
724 }
725 #endif