1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "commit_history_sync.h"
18
19 #include "sync_engine.h"
20 #include "parcel.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25
26 namespace DistributedDB {
27 // Class CommitHistorySyncRequestPacket
CalculateLen() const28 uint32_t CommitHistorySyncRequestPacket::CalculateLen() const
29 {
30 uint64_t len = Parcel::GetUInt64Len();
31 // commitMap len
32 for (const auto &iter : commitMap_) {
33 len += Parcel::GetStringLen(iter.first);
34 len += Parcel::GetMultiVerCommitLen(iter.second);
35 if (len > INT32_MAX) {
36 return 0;
37 }
38 }
39 len += Parcel::GetUInt32Len(); // version
40 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
41 len = Parcel::GetEightByteAlign(len);
42 if (len > INT32_MAX) {
43 return 0;
44 }
45 return len;
46 }
47
SetCommitMap(std::map<std::string,MultiVerCommitNode> & inMap)48 void CommitHistorySyncRequestPacket::SetCommitMap(std::map<std::string, MultiVerCommitNode> &inMap)
49 {
50 commitMap_ = std::move(inMap);
51 }
52
GetCommitMap(std::map<std::string,MultiVerCommitNode> & outMap) const53 void CommitHistorySyncRequestPacket::GetCommitMap(std::map<std::string, MultiVerCommitNode> &outMap) const
54 {
55 outMap = commitMap_;
56 }
57
SetVersion(uint32_t version)58 void CommitHistorySyncRequestPacket::SetVersion(uint32_t version)
59 {
60 version_ = version;
61 }
62
GetVersion() const63 uint32_t CommitHistorySyncRequestPacket::GetVersion() const
64 {
65 return version_;
66 }
67
SetReserved(std::vector<uint64_t> & reserved)68 void CommitHistorySyncRequestPacket::SetReserved(std::vector<uint64_t> &reserved)
69 {
70 reserved_ = std::move(reserved);
71 }
72
GetReserved() const73 std::vector<uint64_t> CommitHistorySyncRequestPacket::GetReserved() const
74 {
75 return reserved_;
76 }
77
CalculateLen() const78 uint32_t CommitHistorySyncAckPacket::CalculateLen() const
79 {
80 uint64_t len = Parcel::GetIntLen(); // errCode
81 len += Parcel::GetUInt32Len(); // version
82 len = Parcel::GetEightByteAlign(len);
83
84 // commits vector len
85 len += Parcel::GetMultiVerCommitsLen(commits_);
86 len += Parcel::GetVectorLen<uint64_t>(reserved_); // reserved
87 len = Parcel::GetEightByteAlign(len);
88 if (len > INT32_MAX) {
89 return 0;
90 }
91 return len;
92 }
93
SetData(std::vector<MultiVerCommitNode> & inData)94 void CommitHistorySyncAckPacket::SetData(std::vector<MultiVerCommitNode> &inData)
95 {
96 commits_ = std::move(inData);
97 }
98
GetData(std::vector<MultiVerCommitNode> & outData) const99 void CommitHistorySyncAckPacket::GetData(std::vector<MultiVerCommitNode> &outData) const
100 {
101 outData = commits_;
102 }
103
SetErrorCode(int32_t errCode)104 void CommitHistorySyncAckPacket::SetErrorCode(int32_t errCode)
105 {
106 errorCode_ = errCode;
107 }
108
GetErrorCode(int32_t & errCode) const109 void CommitHistorySyncAckPacket::GetErrorCode(int32_t &errCode) const
110 {
111 errCode = errorCode_;
112 }
113
SetVersion(uint32_t version)114 void CommitHistorySyncAckPacket::SetVersion(uint32_t version)
115 {
116 version_ = version;
117 }
118
GetVersion() const119 uint32_t CommitHistorySyncAckPacket::GetVersion() const
120 {
121 return version_;
122 }
123
SetReserved(std::vector<uint64_t> & reserved)124 void CommitHistorySyncAckPacket::SetReserved(std::vector<uint64_t> &reserved)
125 {
126 reserved_ = std::move(reserved);
127 }
128
GetReserved() const129 std::vector<uint64_t> CommitHistorySyncAckPacket::GetReserved() const
130 {
131 return reserved_;
132 }
133
134 // Class CommitHistorySync
~CommitHistorySync()135 CommitHistorySync::~CommitHistorySync()
136 {
137 storagePtr_ = nullptr;
138 communicateHandle_ = nullptr;
139 }
140
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)141 int CommitHistorySync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
142 {
143 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
144 return -E_MESSAGE_ID_ERROR;
145 }
146
147 switch (inMsg->GetMessageType()) {
148 case TYPE_REQUEST:
149 return RequestPacketSerialization(buffer, length, inMsg);
150 case TYPE_RESPONSE:
151 return AckPacketSerialization(buffer, length, inMsg);
152 default:
153 return -E_MESSAGE_TYPE_ERROR;
154 }
155 }
156
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)157 int CommitHistorySync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
158 {
159 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
160 return -E_MESSAGE_ID_ERROR;
161 }
162
163 switch (inMsg->GetMessageType()) {
164 case TYPE_REQUEST:
165 return RequestPacketDeSerialization(buffer, length, inMsg);
166 case TYPE_RESPONSE:
167 return AckPacketDeSerialization(buffer, length, inMsg);
168 default:
169 return -E_MESSAGE_TYPE_ERROR;
170 }
171 }
172
CalculateLen(const Message * inMsg)173 uint32_t CommitHistorySync::CalculateLen(const Message *inMsg)
174 {
175 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
176 return 0;
177 }
178
179 uint32_t len = 0;
180 int errCode = E_OK;
181 switch (inMsg->GetMessageType()) {
182 case TYPE_REQUEST:
183 errCode = RequestPacketCalculateLen(inMsg, len);
184 if (errCode != E_OK) {
185 return 0;
186 }
187 return len;
188 case TYPE_RESPONSE:
189 errCode = AckPacketCalculateLen(inMsg, len);
190 if (errCode != E_OK) {
191 return 0;
192 }
193 return len;
194 default:
195 return 0;
196 }
197 }
198
RegisterTransformFunc()199 int CommitHistorySync::RegisterTransformFunc()
200 {
201 TransformFunc func;
202 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
203 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
204 return Serialization(buffer, length, inMsg);
205 };
206 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
207 return DeSerialization(buffer, length, inMsg);
208 };
209 return MessageTransform::RegTransformFunction(COMMIT_HISTORY_SYNC_MESSAGE, func);
210 }
211
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)212 int CommitHistorySync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
213 {
214 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
215 return -E_INVALID_ARGS;
216 }
217 storagePtr_ = storagePtr;
218 communicateHandle_ = communicateHandle;
219 return E_OK;
220 }
221
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const222 void CommitHistorySync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
223 {
224 (void)context;
225 (void)message;
226 return;
227 }
228
SyncStart(MultiVerSyncTaskContext * context)229 int CommitHistorySync::SyncStart(MultiVerSyncTaskContext *context)
230 {
231 if (context == nullptr) {
232 return -E_INVALID_ARGS;
233 }
234 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
235 if (performance != nullptr) {
236 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
237 }
238 std::map<std::string, MultiVerCommitNode> commitMap;
239 int errCode = GetDeviceLatestCommit(commitMap);
240 if (performance != nullptr) {
241 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_DEVICE_LATEST_COMMIT);
242 }
243 if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
244 return errCode;
245 }
246
247 LOGD("CommitHistorySync::commitMap size = %zu, dst=%s{private}", commitMap.size(), context->GetDeviceId().c_str());
248 return SendRequestPacket(context, commitMap);
249 }
250
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)251 int CommitHistorySync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
252 {
253 if (!IsPacketValid(message, TYPE_REQUEST) || context == nullptr) {
254 return -E_INVALID_ARGS;
255 }
256 const CommitHistorySyncRequestPacket *packet = message->GetObject<CommitHistorySyncRequestPacket>();
257 if (packet == nullptr) {
258 return -E_INVALID_ARGS;
259 }
260 std::vector<MultiVerCommitNode> commits;
261 int errCode = RunPermissionCheck(context->GetDeviceId());
262 if (errCode == -E_NOT_PERMIT) {
263 LOGE("CommitHistorySync::RequestRecvCallback RunPermissionCheck not pass");
264 SendAckPacket(context, commits, errCode, message);
265 return errCode;
266 }
267 std::map<std::string, MultiVerCommitNode> commitMap;
268 packet->GetCommitMap(commitMap);
269 uint32_t ver = packet->GetVersion();
270 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
271 if (performance != nullptr) {
272 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
273 }
274 errCode = GetCommitTree(commitMap, commits);
275 if (performance != nullptr) {
276 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_TREE);
277 }
278 if (errCode != E_OK) {
279 LOGE("CommitHistorySync::RequestRecvCallback : GetCommitTree ERR, errno = %d", errCode);
280 }
281
282 errCode = SendAckPacket(context, commits, errCode, message);
283 LOGD("CommitHistorySync::RequestRecvCallback:SendAckPacket, errno = %d, dst=%s{private}, ver = %" PRIu32
284 ", myversion = %" PRIu32, errCode, context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
285 if (errCode == E_OK) {
286 if (commitMap.empty()) {
287 LOGD("[CommitHistorySync][RequestRecvCallback] no need to start SyncResponse");
288 return -E_NOT_FOUND;
289 }
290 }
291 return errCode;
292 }
293
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)294 int CommitHistorySync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
295 {
296 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
297 return -E_INVALID_ARGS;
298 }
299
300 std::vector<MultiVerCommitNode> commits;
301 int32_t errCode;
302
303 const CommitHistorySyncAckPacket *packet = message->GetObject<CommitHistorySyncAckPacket>();
304 if (packet == nullptr) {
305 return -E_INVALID_ARGS;
306 }
307 packet->GetErrorCode(errCode);
308 if (errCode == -E_NOT_PERMIT) {
309 LOGE("CommitHistorySync::AckRecvCallback RunPermissionCheck not pass");
310 return errCode;
311 }
312 packet->GetData(commits);
313 uint32_t ver = packet->GetVersion();
314 context->SetCommits(commits);
315 context->SetCommitIndex(0);
316 context->SetCommitsSize(static_cast<int>(commits.size()));
317 LOGD("CommitHistorySync::AckRecvCallback end, CommitsSize = %zu, dst = %s{private}, ver = %d, myversion = %u",
318 commits.size(), context->GetDeviceId().c_str(), ver, SOFTWARE_VERSION_CURRENT);
319 return E_OK;
320 }
321
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)322 int CommitHistorySync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
323 {
324 if (inMsg == nullptr) {
325 return -E_INVALID_ARGS;
326 }
327 const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
328 if (packet == nullptr) {
329 return -E_INVALID_ARGS;
330 }
331
332 if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_REQUEST)) {
333 return -E_INVALID_ARGS;
334 }
335 len = packet->CalculateLen();
336 return E_OK;
337 }
338
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)339 int CommitHistorySync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
340 {
341 if ((buffer == nullptr) || (inMsg == nullptr)) {
342 return -E_INVALID_ARGS;
343 }
344 const CommitHistorySyncRequestPacket *packet = inMsg->GetObject<CommitHistorySyncRequestPacket>();
345 if ((packet == nullptr) || (length != packet->CalculateLen())) {
346 return -E_INVALID_ARGS;
347 }
348
349 Parcel parcel(buffer, length);
350 std::map<std::string, MultiVerCommitNode> commitMap;
351 packet->GetCommitMap(commitMap);
352
353 int errCode = parcel.WriteUInt64(commitMap.size());
354 if (errCode != E_OK) {
355 return -E_SECUREC_ERROR;
356 }
357 // commitMap Serialization
358 for (auto &iter : commitMap) {
359 errCode = parcel.WriteString(iter.first);
360 if (errCode != E_OK) {
361 return -E_SECUREC_ERROR;
362 }
363 errCode = parcel.WriteMultiVerCommit(iter.second);
364 if (errCode != E_OK) {
365 return -E_SECUREC_ERROR;
366 }
367 }
368 errCode = parcel.WriteUInt32(packet->GetVersion());
369 if (errCode != E_OK) {
370 return -E_SECUREC_ERROR;
371 }
372 errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
373 if (errCode != E_OK) {
374 return -E_SECUREC_ERROR;
375 }
376 parcel.EightByteAlign();
377 if (parcel.IsError()) { // almost success
378 return -E_INVALID_ARGS;
379 }
380 return errCode;
381 }
382
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)383 int CommitHistorySync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
384 {
385 if ((buffer == nullptr) || (inMsg == nullptr)) {
386 return -E_INVALID_ARGS;
387 }
388
389 uint64_t packLen = 0;
390 uint64_t len = 0;
391 Parcel parcel(const_cast<uint8_t *>(buffer), length);
392 packLen += parcel.ReadUInt64(len);
393 if (len > DBConstant::MAX_DEVICES_SIZE) {
394 LOGE("CommitHistorySync::RequestPacketDeSerialization : commitMap size too large = %" PRIu64, len);
395 return -E_INVALID_ARGS;
396 }
397 // commitMap DeSerialization
398 std::map<std::string, MultiVerCommitNode> commitMap;
399 while (len > 0) {
400 std::string key;
401 MultiVerCommitNode val;
402 packLen += parcel.ReadString(key);
403 packLen += parcel.ReadMultiVerCommit(val);
404 commitMap[key] = val;
405 len--;
406 if (parcel.IsError()) {
407 return -E_INVALID_ARGS;
408 }
409 }
410 uint32_t version;
411 std::vector<uint64_t> reserved;
412 packLen += parcel.ReadUInt32(version);
413 packLen += parcel.ReadVector<uint64_t>(reserved);
414 packLen = Parcel::GetEightByteAlign(packLen);
415 if (packLen != length || parcel.IsError()) {
416 LOGE("CommitHistorySync::RequestPacketDeSerialization : length error, input len = %" PRIu32
417 ", cac len = %" PRIu64, length, packLen);
418 return -E_INVALID_ARGS;
419 }
420 CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
421 if (packet == nullptr) {
422 LOGE("CommitHistorySync::RequestPacketDeSerialization : new packet error");
423 return -E_OUT_OF_MEMORY;
424 }
425 packet->SetCommitMap(commitMap);
426 packet->SetVersion(version);
427 packet->SetReserved(reserved);
428 int errCode = inMsg->SetExternalObject<>(packet);
429 if (errCode != E_OK) {
430 delete packet;
431 packet = nullptr;
432 }
433 return errCode;
434 }
435
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)436 int CommitHistorySync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
437 {
438 if (inMsg == nullptr) {
439 return -E_INVALID_ARGS;
440 }
441 const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
442 if (packet == nullptr) {
443 return -E_INVALID_ARGS;
444 }
445
446 if ((inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE) || (inMsg->GetMessageType() != TYPE_RESPONSE)) {
447 return -E_INVALID_ARGS;
448 }
449 len = packet->CalculateLen();
450 return E_OK;
451 }
452
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)453 int CommitHistorySync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
454 {
455 if ((buffer == nullptr) || (inMsg == nullptr)) {
456 return -E_INVALID_ARGS;
457 }
458 const CommitHistorySyncAckPacket *packet = inMsg->GetObject<CommitHistorySyncAckPacket>();
459 if ((packet == nullptr) || (length != packet->CalculateLen())) {
460 return -E_INVALID_ARGS;
461 }
462
463 Parcel parcel(buffer, length);
464 int32_t ackErrCode;
465 std::vector<MultiVerCommitNode> commits;
466
467 packet->GetData(commits);
468 packet->GetErrorCode(ackErrCode);
469 // errCode Serialization
470 parcel.WriteInt(ackErrCode);
471 parcel.WriteUInt32(packet->GetVersion());
472 parcel.EightByteAlign();
473 if (parcel.IsError()) { // almost success
474 return -E_INVALID_ARGS;
475 }
476 // commits vector Serialization
477 int errCode = parcel.WriteMultiVerCommits(commits);
478 if (errCode != E_OK) {
479 return -E_SECUREC_ERROR;
480 }
481 errCode = parcel.WriteVector<uint64_t>(packet->GetReserved());
482 if (errCode != E_OK) {
483 return -E_SECUREC_ERROR;
484 }
485 parcel.EightByteAlign();
486 if (parcel.IsError()) { // almost success
487 return -E_INVALID_ARGS;
488 }
489 return errCode;
490 }
491
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)492 int CommitHistorySync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
493 {
494 std::vector<MultiVerCommitNode> commits;
495 uint32_t packLen = 0;
496 Parcel parcel(const_cast<uint8_t *>(buffer), length);
497 int32_t pktErrCode;
498 uint32_t version;
499 std::vector<uint64_t> reserved;
500
501 // errCode DeSerialization
502 packLen += parcel.ReadInt(pktErrCode);
503 packLen += parcel.ReadUInt32(version);
504 parcel.EightByteAlign();
505 if (parcel.IsError()) {
506 return -E_PARSE_FAIL;
507 }
508 packLen = Parcel::GetEightByteAlign(packLen);
509 // commits vector DeSerialization
510 packLen += parcel.ReadMultiVerCommits(commits);
511 packLen += parcel.ReadVector<uint64_t>(reserved);
512 packLen = Parcel::GetEightByteAlign(packLen);
513 if (packLen != length || parcel.IsError()) {
514 LOGE("CommitHistorySync::AckPacketDeSerialization : packet len error, input len = %u, cal len = %u",
515 length, packLen);
516 return -E_INVALID_ARGS;
517 }
518 CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
519 if (packet == nullptr) {
520 LOGE("CommitHistorySync::AckPacketDeSerialization : new packet error");
521 return -E_OUT_OF_MEMORY;
522 }
523 packet->SetData(commits);
524 packet->SetErrorCode(pktErrCode);
525 packet->SetVersion(version);
526 packet->SetReserved(reserved);
527 int errCode = inMsg->SetExternalObject<>(packet);
528 if (errCode != E_OK) {
529 delete packet;
530 packet = nullptr;
531 }
532 return errCode;
533 }
534
IsPacketValid(const Message * inMsg,uint16_t messageType)535 bool CommitHistorySync::IsPacketValid(const Message *inMsg, uint16_t messageType)
536 {
537 if ((inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
538 return false;
539 }
540 if (messageType != inMsg->GetMessageType()) {
541 return false;
542 }
543 return true;
544 }
545
Send(const DeviceID & deviceId,const Message * inMsg)546 int CommitHistorySync::Send(const DeviceID &deviceId, const Message *inMsg)
547 {
548 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
549 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
550 if (errCode != E_OK) {
551 LOGE("CommitHistorySync::Send ERR! err = %d", errCode);
552 }
553 return errCode;
554 }
555
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap)556 int CommitHistorySync::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap)
557 {
558 std::map<std::string, MultiVerCommitNode> readCommitMap;
559 int errCode = storagePtr_->GetDeviceLatestCommit(readCommitMap);
560 if (errCode != E_OK) {
561 return errCode;
562 }
563
564 std::string localDevice;
565 errCode = GetLocalDeviceInfo(localDevice);
566 LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
567 if (errCode != E_OK) {
568 return errCode;
569 }
570
571 for (auto &item : readCommitMap) {
572 errCode = storagePtr_->TransferSyncCommitDevInfo(item.second, localDevice, false);
573 if (errCode != E_OK) {
574 break;
575 }
576 commitMap.insert(std::make_pair(item.second.deviceInfo, item.second));
577 }
578
579 return errCode;
580 }
581
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits)582 int CommitHistorySync::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
583 std::vector<MultiVerCommitNode> &commits)
584 {
585 std::map<std::string, MultiVerCommitNode> newCommitMap;
586
587 std::string localDevice;
588 int errCode = GetLocalDeviceInfo(localDevice);
589 LOGD("GetLocalDeviceInfo : %s{private}, errCode = %d", localDevice.c_str(), errCode);
590 if (errCode != E_OK) {
591 return errCode;
592 }
593
594 for (const auto &item : commitMap) {
595 MultiVerCommitNode commitNode = item.second;
596 errCode = storagePtr_->TransferSyncCommitDevInfo(commitNode, localDevice, true);
597 if (errCode != E_OK) {
598 return errCode;
599 }
600 newCommitMap.insert(std::make_pair(commitNode.deviceInfo, commitNode));
601 }
602
603 errCode = storagePtr_->GetCommitTree(newCommitMap, commits);
604 if (errCode != E_OK) {
605 return errCode;
606 }
607 for (auto &commit : commits) {
608 errCode = storagePtr_->TransferSyncCommitDevInfo(commit, localDevice, false);
609 if (errCode != E_OK) {
610 break;
611 }
612 }
613 return errCode;
614 }
615
SendRequestPacket(const MultiVerSyncTaskContext * context,std::map<std::string,MultiVerCommitNode> & commitMap)616 int CommitHistorySync::SendRequestPacket(const MultiVerSyncTaskContext *context,
617 std::map<std::string, MultiVerCommitNode> &commitMap)
618 {
619 CommitHistorySyncRequestPacket *packet = new (std::nothrow) CommitHistorySyncRequestPacket();
620 if (packet == nullptr) {
621 LOGE("CommitHistorySync::SendRequestPacket : new packet error");
622 return -E_OUT_OF_MEMORY;
623 }
624 packet->SetCommitMap(commitMap);
625 packet->SetVersion(SOFTWARE_VERSION_CURRENT);
626 Message *message = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
627 if (message == nullptr) {
628 LOGE("CommitHistorySync::SendRequestPacket : new message error");
629 delete packet;
630 packet = nullptr;
631 return -E_OUT_OF_MEMORY;
632 }
633 message->SetMessageType(TYPE_REQUEST);
634 message->SetTarget(context->GetDeviceId());
635 int errCode = message->SetExternalObject(packet);
636 if (errCode != E_OK) {
637 delete packet;
638 packet = nullptr;
639 delete message;
640 message = nullptr;
641 LOGE("CommitHistorySync::SendRequestPacket : SetExternalObject failed errCode:%d", errCode);
642 return errCode;
643 }
644 message->SetSessionId(context->GetRequestSessionId());
645 message->SetSequenceId(context->GetSequenceId());
646
647 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
648 if (performance != nullptr) {
649 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
650 }
651 errCode = Send(message->GetTarget(), message);
652 if (errCode != E_OK) {
653 LOGE("CommitHistorySync::SendRequestPacket : Send failed errCode:%d", errCode);
654 delete message;
655 message = nullptr;
656 }
657 return errCode;
658 }
659
SendAckPacket(const MultiVerSyncTaskContext * context,std::vector<MultiVerCommitNode> & commits,int ackCode,const Message * message)660 int CommitHistorySync::SendAckPacket(const MultiVerSyncTaskContext *context,
661 std::vector<MultiVerCommitNode> &commits, int ackCode, const Message *message)
662 {
663 if (message == nullptr) {
664 LOGE("CommitHistorySync::SendAckPacket : message is nullptr");
665 return -E_INVALID_ARGS;
666 }
667 CommitHistorySyncAckPacket *packet = new (std::nothrow) CommitHistorySyncAckPacket();
668 if (packet == nullptr) {
669 LOGE("CommitHistorySync::SendAckPacket : packet is nullptr");
670 return -E_OUT_OF_MEMORY;
671 }
672 Message *ackMessage = new (std::nothrow) Message(COMMIT_HISTORY_SYNC_MESSAGE);
673 if (ackMessage == nullptr) {
674 LOGE("CommitHistorySync::SendAckPacket : new message error");
675 delete packet;
676 packet = nullptr;
677 return -E_OUT_OF_MEMORY;
678 }
679
680 packet->SetData(commits);
681 packet->SetErrorCode(static_cast<int32_t>(ackCode));
682 packet->SetVersion(SOFTWARE_VERSION_CURRENT);
683 ackMessage->SetMessageType(TYPE_RESPONSE);
684 ackMessage->SetTarget(context->GetDeviceId());
685 int errCode = ackMessage->SetExternalObject(packet);
686 if (errCode != E_OK) {
687 delete packet;
688 packet = nullptr;
689 delete ackMessage;
690 ackMessage = nullptr;
691 LOGE("CommitHistorySync::SendAckPacket : SetExternalObject failed errCode:%d", errCode);
692 return errCode;
693 }
694 ackMessage->SetSequenceId(message->GetSequenceId());
695 ackMessage->SetSessionId(message->GetSessionId());
696 errCode = Send(ackMessage->GetTarget(), ackMessage);
697 if (errCode != E_OK) {
698 LOGE("CommitHistorySync::SendAckPacket : Send failed errCode:%d", errCode);
699 delete ackMessage;
700 ackMessage = nullptr;
701 }
702 return errCode;
703 }
704
GetLocalDeviceInfo(std::string & deviceInfo)705 int CommitHistorySync::GetLocalDeviceInfo(std::string &deviceInfo)
706 {
707 return communicateHandle_->GetLocalIdentity(deviceInfo);
708 }
709
RunPermissionCheck(const std::string & deviceId) const710 int CommitHistorySync::RunPermissionCheck(const std::string &deviceId) const
711 {
712 std::string appId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, "");
713 std::string userId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, "");
714 std::string storeId = storagePtr_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, "");
715 uint8_t flag = CHECK_FLAG_SEND;
716 PermissionCheckParam param = { userId, appId, storeId, deviceId };
717 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(param, flag);
718 if (errCode != E_OK) {
719 LOGE("[CommitHistorySync] RunPermissionCheck not pass errCode:%d, flag:%d", errCode, flag);
720 return -E_NOT_PERMIT;
721 }
722 return errCode;
723 }
724 }
725 #endif