1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_storage_executor.h"
18
19 #include <openssl/rand.h>
20
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "multi_ver_natural_store.h"
25 #include "multi_ver_natural_store_commit_notify_data.h"
26 #include "multi_ver_natural_store_transfer_data.h"
27 #include "value_hash_calc.h"
28
29 namespace DistributedDB {
MultiVerStorageExecutor(IKvDB * kvDB,IKvDBMultiVerDataStorage * dataStorage,IKvDBCommitStorage * commitStorage,MultiVerKvDataStorage * kvDataStorage,bool writable)30 MultiVerStorageExecutor::MultiVerStorageExecutor(IKvDB *kvDB, IKvDBMultiVerDataStorage *dataStorage,
31 IKvDBCommitStorage *commitStorage, MultiVerKvDataStorage *kvDataStorage, bool writable)
32 : StorageExecutor(writable),
33 kvDB_(kvDB),
34 dataStorage_(dataStorage),
35 commitStorage_(commitStorage),
36 kvDataStorage_(kvDataStorage),
37 transaction_(nullptr),
38 sliceTransaction_(nullptr)
39 {}
40
~MultiVerStorageExecutor()41 MultiVerStorageExecutor::~MultiVerStorageExecutor()
42 {
43 kvDB_ = nullptr;
44 dataStorage_ = nullptr;
45 commitStorage_ = nullptr;
46 kvDataStorage_ = nullptr;
47 transaction_ = nullptr;
48 }
49
Reset()50 int MultiVerStorageExecutor::Reset()
51 {
52 return E_OK;
53 }
54
PutMetaData(const Key & key,const Value & value)55 int MultiVerStorageExecutor::PutMetaData(const Key &key, const Value &value)
56 {
57 if (kvDataStorage_ == nullptr) {
58 return -E_INVALID_DB;
59 }
60
61 int errCode = kvDataStorage_->PutMetaData(key, value);
62 return CheckCorruptedStatus(errCode);
63 }
64
GetMetaData(const Key & key,Value & value) const65 int MultiVerStorageExecutor::GetMetaData(const Key &key, Value &value) const
66 {
67 if (kvDataStorage_ == nullptr) {
68 return -E_INVALID_DB;
69 }
70
71 int errCode = kvDataStorage_->GetMetaData(key, value);
72 return CheckCorruptedStatus(errCode);
73 }
74
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap) const75 int MultiVerStorageExecutor::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap) const
76 {
77 if (commitStorage_ == nullptr) {
78 LOGE("The commit history module is null.");
79 return -E_INVALID_DB;
80 }
81 std::map<DeviceID, IKvDBCommit *> latestCommits;
82 int errCode = commitStorage_->GetLatestCommits(latestCommits);
83 if (errCode != E_OK) {
84 LOGE("Get latest commits failed:%d", errCode);
85 return CheckCorruptedStatus(errCode);
86 }
87 for (auto &latestCommit : latestCommits) {
88 uint64_t localFlag = (latestCommit.second->GetLocalFlag() ?
89 MultiVerCommitNode::LOCAL_FLAG : MultiVerCommitNode::NON_LOCAL_FLAG);
90 MultiVerCommitNode commit = {
91 latestCommit.second->GetCommitId(), // commitId
92 latestCommit.second->GetLeftParentId(), // leftParent
93 latestCommit.second->GetRightParentId(), // rightParent
94 latestCommit.second->GetTimestamp(), // timestamp
95 latestCommit.second->GetCommitVersion(), // version
96 localFlag, // isLocal
97 latestCommit.second->GetDeviceInfo() // deviceInfo
98 };
99
100 commitStorage_->ReleaseCommit(latestCommit.second);
101 latestCommit.second = nullptr;
102 commitMap.insert(std::make_pair(latestCommit.first, std::move(commit)));
103 }
104 latestCommits.clear();
105 return E_OK;
106 }
107
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits) const108 int MultiVerStorageExecutor::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
109 std::vector<MultiVerCommitNode> &commits) const
110 {
111 if (commitStorage_ == nullptr) {
112 LOGE("The commit history module is null.");
113 return -E_INVALID_DB;
114 }
115 std::map<DeviceID, CommitID> latestCommits;
116 for (auto &latestCommit : commitMap) {
117 latestCommits.insert(std::make_pair(latestCommit.first, latestCommit.second.commitId));
118 }
119 std::list<IKvDBCommit *> commitTree;
120 int errCode = commitStorage_->GetCommitTree(latestCommits, commitTree);
121 if (errCode != E_OK) {
122 LOGE("Get commit tree failed:%d", errCode);
123 return CheckCorruptedStatus(errCode);
124 }
125 LOGD("Get commit tree size:%zu", commitTree.size());
126 for (auto &commitNode : commitTree) {
127 if (commitNode == nullptr) {
128 continue;
129 }
130 uint64_t localFlag = (commitNode->GetLocalFlag() ?
131 MultiVerCommitNode::LOCAL_FLAG : MultiVerCommitNode::NON_LOCAL_FLAG);
132 MultiVerCommitNode commit = {
133 commitNode->GetCommitId(), // commitId
134 commitNode->GetLeftParentId(), // leftParent
135 commitNode->GetRightParentId(), // rightParent
136 commitNode->GetTimestamp(), // timestamp
137 commitNode->GetCommitVersion(), // version
138 localFlag, // isLocal
139 commitNode->GetDeviceInfo() // deviceInfo
140 };
141
142 commitStorage_->ReleaseCommit(commitNode);
143 commitNode = nullptr;
144 commits.push_back(std::move(commit));
145 }
146 commitTree.clear();
147 return E_OK;
148 }
149
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries) const150 int MultiVerStorageExecutor::GetCommitData(const MultiVerCommitNode &commit,
151 std::vector<MultiVerKvEntry *> &entries) const
152 {
153 if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
154 return -E_INVALID_DB;
155 }
156 // call the putting value method.
157 CommitID commitId = commit.commitId;
158 int errCode = E_OK;
159 Version version;
160 IKvDBCommit *commitNode = commitStorage_->GetCommit(commitId, errCode);
161 if (commitNode == nullptr) {
162 LOGE("Failed to get the commit:%d", errCode);
163 return CheckCorruptedStatus(errCode);
164 }
165
166 // Get the commit and the version.
167 std::string devInfo = commitNode->GetDeviceInfo();
168 version = commitNode->GetCommitVersion();
169 commitStorage_->ReleaseCommit(commitNode);
170 commitNode = nullptr;
171 if (devInfo.size() != MULTI_VER_TAG_SIZE) {
172 LOGD("skip the foreign data");
173 entries.clear();
174 entries.shrink_to_fit();
175 return E_OK;
176 }
177
178 IKvDBMultiVerTransaction *transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
179 if (transaction == nullptr) {
180 LOGE("Failed to get the transaction:%d", errCode);
181 goto END;
182 }
183
184 errCode = transaction->GetEntriesByVersion(version, entries);
185 if (errCode != E_OK) {
186 LOGE("Get entries by version failed:%d", errCode);
187 }
188 END:
189 if (transaction != nullptr) {
190 dataStorage_->ReleaseTransaction(transaction);
191 transaction = nullptr;
192 }
193 return CheckCorruptedStatus(errCode);
194 }
195
IsCommitExisted(const MultiVerCommitNode & commit,int & errCode) const196 bool MultiVerStorageExecutor::IsCommitExisted(const MultiVerCommitNode &commit, int &errCode) const
197 {
198 if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
199 LOGE("The commit history module or data storage is null.");
200 return false;
201 }
202 auto readCommit = commitStorage_->GetCommit(commit.commitId, errCode);
203 if (readCommit == nullptr) {
204 return false;
205 }
206 commitStorage_->ReleaseCommit(readCommit);
207
208 bool result = false;
209 std::vector<MultiVerKvEntry *> entries;
210 auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, commit.version, errCode);
211 if (transaction == nullptr) {
212 LOGE("Failed to get the transaction:%d", errCode);
213 goto END;
214 }
215
216 errCode = transaction->GetEntriesByVersion(commit.version, entries);
217 if (errCode != E_OK) {
218 LOGE("Get entries by version failed:%d", errCode);
219 goto END;
220 }
221 if (!entries.empty()) {
222 result = true;
223 }
224 END:
225 if (errCode != E_OK) {
226 result = false;
227 }
228 if (transaction != nullptr) {
229 dataStorage_->ReleaseTransaction(transaction);
230 }
231
232 ReleaseMultiVerKvEntries(entries);
233 errCode = CheckCorruptedStatus(errCode);
234 return result;
235 }
236
IsValueSliceExisted(const ValueSliceHash & value,int & errCode) const237 bool MultiVerStorageExecutor::IsValueSliceExisted(const ValueSliceHash &value, int &errCode) const
238 {
239 if (kvDataStorage_ == nullptr) {
240 errCode = -E_INVALID_DB;
241 return false;
242 }
243 auto sliceTransaction = kvDataStorage_->GetSliceTransaction(false, errCode);
244 if (sliceTransaction == nullptr) {
245 (void)(CheckCorruptedStatus(errCode));
246 return false;
247 }
248 Value valueReal;
249 errCode = sliceTransaction->GetData(value, valueReal);
250 kvDataStorage_->ReleaseSliceTransaction(sliceTransaction);
251 if (errCode == E_OK) {
252 return true;
253 }
254 (void)(CheckCorruptedStatus(errCode));
255 return false;
256 }
257
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue) const258 int MultiVerStorageExecutor::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue) const
259 {
260 return GetValueSliceInner(nullptr, hashValue, sliceValue);
261 }
262
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue,bool isAddCount)263 int MultiVerStorageExecutor::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue,
264 bool isAddCount)
265 {
266 return PutValueSliceInner(nullptr, hashValue, sliceValue, isAddCount);
267 }
268
GetValueSliceInner(const SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue,ValueSlice & sliceValue) const269 int MultiVerStorageExecutor::GetValueSliceInner(const SliceTransaction *sliceTransaction,
270 const ValueSliceHash &hashValue, ValueSlice &sliceValue) const
271 {
272 int errCode;
273 if (sliceTransaction != nullptr) {
274 errCode = sliceTransaction->GetData(hashValue, sliceValue);
275 return CheckCorruptedStatus(errCode);
276 }
277 if (kvDataStorage_ == nullptr) {
278 return -E_INVALID_DB;
279 }
280
281 auto sliceTransact = kvDataStorage_->GetSliceTransaction(false, errCode);
282 if (sliceTransact == nullptr) {
283 return CheckCorruptedStatus(errCode);
284 }
285
286 errCode = sliceTransact->GetData(hashValue, sliceValue);
287 kvDataStorage_->ReleaseSliceTransaction(sliceTransact);
288 return CheckCorruptedStatus(errCode);
289 }
290
PutValueSliceInner(SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue,const ValueSlice & sliceValue,bool isAddCount)291 int MultiVerStorageExecutor::PutValueSliceInner(SliceTransaction *sliceTransaction, const ValueSliceHash &hashValue,
292 const ValueSlice &sliceValue, bool isAddCount)
293 {
294 int errCode;
295 if (sliceTransaction != nullptr) {
296 errCode = sliceTransaction->PutData(hashValue, sliceValue, isAddCount);
297 return CheckCorruptedStatus(errCode);
298 }
299
300 if (kvDataStorage_ == nullptr) {
301 return -E_INVALID_DB;
302 }
303
304 auto transaction = kvDataStorage_->GetSliceTransaction(true, errCode);
305 if (transaction == nullptr) {
306 return CheckCorruptedStatus(errCode);
307 }
308
309 errCode = transaction->PutData(hashValue, sliceValue, isAddCount);
310 kvDataStorage_->ReleaseSliceTransaction(transaction);
311 return CheckCorruptedStatus(errCode);
312 }
313
DeleteValueSliceInner(SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue)314 int MultiVerStorageExecutor::DeleteValueSliceInner(SliceTransaction *sliceTransaction,
315 const ValueSliceHash &hashValue)
316 {
317 int errCode;
318 if (sliceTransaction != nullptr) {
319 errCode = sliceTransaction->DeleteData(hashValue);
320 return CheckCorruptedStatus(errCode);
321 }
322
323 if (kvDataStorage_ == nullptr) {
324 return -E_INVALID_DB;
325 }
326
327 auto transaction = kvDataStorage_->GetSliceTransaction(true, errCode);
328 if (transaction == nullptr) {
329 return CheckCorruptedStatus(errCode);
330 }
331
332 errCode = transaction->DeleteData(hashValue);
333 kvDataStorage_->ReleaseSliceTransaction(transaction);
334 return CheckCorruptedStatus(errCode);
335 }
336
StartSliceTransaction()337 int MultiVerStorageExecutor::StartSliceTransaction()
338 {
339 if (kvDataStorage_ == nullptr) {
340 return -E_INVALID_DB;
341 }
342 if (sliceTransaction_ != nullptr) {
343 return -E_UNEXPECTED_DATA;
344 }
345 int errCode;
346 sliceTransaction_ = kvDataStorage_->GetSliceTransaction(true, errCode);
347 if (sliceTransaction_ == nullptr) {
348 return errCode;
349 }
350 errCode = sliceTransaction_->StartTransaction();
351 if (errCode != E_OK) {
352 kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
353 }
354 return errCode;
355 }
356
CommitSliceTransaction()357 int MultiVerStorageExecutor::CommitSliceTransaction()
358 {
359 if (sliceTransaction_ == nullptr) {
360 return -E_UNEXPECTED_DATA;
361 }
362 int errCode = sliceTransaction_->CommitTransaction();
363 if (errCode != E_OK) {
364 LOGE("Commit slice transaction failed:%d", errCode);
365 }
366 if (kvDataStorage_ == nullptr) {
367 return -E_INVALID_DB;
368 }
369 kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
370 sliceTransaction_ = nullptr;
371 return errCode;
372 }
373
RollbackSliceTransaction()374 int MultiVerStorageExecutor::RollbackSliceTransaction()
375 {
376 if (sliceTransaction_ == nullptr) {
377 return -E_UNEXPECTED_DATA;
378 }
379 int errCode = sliceTransaction_->RollbackTransaction();
380 if (errCode != E_OK) {
381 LOGE("Commit slice transaction failed:%d", errCode);
382 }
383 if (kvDataStorage_ == nullptr) {
384 return -E_INVALID_DB;
385 }
386 kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
387 sliceTransaction_ = nullptr;
388 return errCode;
389 }
390
ReInitTransactionVersion(const MultiVerCommitNode & commit)391 int MultiVerStorageExecutor::ReInitTransactionVersion(const MultiVerCommitNode &commit)
392 {
393 if (commitStorage_ == nullptr) {
394 LOGE("The commit history module is null when reinit transaction version.");
395 return -E_INVALID_DB;
396 }
397 int errCode = StartTransaction();
398 if (errCode != E_OK) {
399 LOGE("Start transaction failed:%d", errCode);
400 return errCode;
401 }
402 auto readCommit = commitStorage_->GetCommit(commit.commitId, errCode);
403 if (readCommit == nullptr) {
404 if (errCode != -E_NOT_FOUND) {
405 RollBackTransaction();
406 LOGE("Get the commit error:%d", errCode);
407 return errCode;
408 } else {
409 errCode = E_OK;
410 }
411 } else {
412 LOGD("Reput the version:%" PRIu64, readCommit->GetCommitVersion());
413 transaction_->SetVersion(readCommit->GetCommitVersion());
414 commitStorage_->ReleaseCommit(readCommit);
415 }
416
417 if (errCode != E_OK) {
418 RollBackTransaction();
419 }
420 return errCode;
421 }
422
AddSliceDataCount(const std::vector<Value> & values)423 int MultiVerStorageExecutor::AddSliceDataCount(const std::vector<Value> &values)
424 {
425 for (const auto &item : values) {
426 MultiVerValueObject valueObject;
427 int errCode = valueObject.DeSerialData(item);
428 if (errCode != E_OK) {
429 return errCode;
430 }
431 if (!valueObject.IsHash()) {
432 continue;
433 }
434 std::vector<ValueSliceHash> valueHashList;
435 valueObject.GetValueHash(valueHashList);
436 for (const auto &iter : valueHashList) {
437 Value filledData;
438 errCode = PutValueSliceInner(sliceTransaction_, iter, filledData, true);
439 if (errCode != E_OK) {
440 LOGE("Add the slice value count failed:%d", errCode);
441 return errCode;
442 }
443 }
444 }
445 return E_OK;
446 }
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)447 int MultiVerStorageExecutor::PutCommitData(const MultiVerCommitNode &commit,
448 const std::vector<MultiVerKvEntry *> &entries, const std::string &deviceName)
449 {
450 // Update the version while the commit has been put.
451 int errCode = ReInitTransactionVersion(commit);
452 if (errCode != E_OK) {
453 return CheckCorruptedStatus(errCode);
454 }
455 errCode = StartSliceTransaction();
456 if (errCode != E_OK) {
457 RollBackTransaction();
458 return CheckCorruptedStatus(errCode);
459 }
460
461 if (transaction_ == nullptr) {
462 return -E_INVALID_DB;
463 }
464
465 std::vector<Value> values;
466 errCode = transaction_->PutBatch(entries, false, values);
467 if (errCode != E_OK) {
468 LOGE("Put batch synced data failed:%d", errCode);
469 goto END;
470 }
471 errCode = AddSliceDataCount(values);
472 if (errCode != E_OK) {
473 goto END;
474 }
475 errCode = CommitSliceTransaction();
476 if (errCode != E_OK) {
477 RollBackTransaction();
478 } else {
479 errCode = CommitTransaction(commit, false);
480 }
481 return CheckCorruptedStatus(errCode);
482 END:
483 if (errCode != E_OK) {
484 (void)(RollbackSliceTransaction());
485 RollBackTransaction();
486 }
487 return CheckCorruptedStatus(errCode);
488 }
489
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)490 int MultiVerStorageExecutor::MergeSyncCommit(const MultiVerCommitNode &commit,
491 const std::vector<MultiVerCommitNode> &commits)
492 {
493 if (commits.empty()) {
494 return E_OK;
495 }
496 // if all the nodes have two parents, no need to merge.
497 bool isAllMerged = true;
498 for (const auto &item : commits) {
499 if (item.rightParent.empty()) {
500 isAllMerged = false;
501 }
502 }
503
504 if (isAllMerged) {
505 LOGI("all nodes have been merged");
506 return E_OK;
507 }
508
509 int errCode = MergeCommits(commits);
510 return CheckCorruptedStatus(errCode);
511 }
512
MergeOneCommit(const MultiVerCommitNode & commit)513 int MultiVerStorageExecutor::MergeOneCommit(const MultiVerCommitNode &commit)
514 {
515 std::vector<MultiVerKvEntry *> entries;
516 int errCode = GetResolvedConflictEntries(commit, entries);
517 if (errCode != E_OK) {
518 return errCode;
519 }
520
521 if (transaction_ == nullptr) {
522 return -E_INVALID_DB;
523 }
524
525 std::vector<Value> values;
526 errCode = transaction_->PutBatch(entries, true, values);
527 if (errCode != E_OK) {
528 goto END;
529 }
530
531 errCode = AddSliceDataCount(values);
532 END:
533 ReleaseMultiVerKvEntries(entries);
534 return errCode;
535 }
536
MergeCommits(const std::vector<MultiVerCommitNode> & commits)537 int MultiVerStorageExecutor::MergeCommits(const std::vector<MultiVerCommitNode> &commits)
538 {
539 const MultiVerCommitNode &rootCommitNode = commits.back();
540 std::string rootNodeDeviceInfo = rootCommitNode.deviceInfo;
541 if (rootNodeDeviceInfo.size() != SHA256_DIGEST_LENGTH + MULTI_VER_TAG_SIZE) {
542 return -E_UNEXPECTED_DATA;
543 }
544 int errCode = StartTransaction();
545 if (errCode != E_OK) {
546 return errCode;
547 }
548 errCode = StartSliceTransaction();
549 if (errCode != E_OK) {
550 RollBackTransaction();
551 return errCode;
552 }
553 for (const auto &item : commits) {
554 // only need to merge the node data which is from the same device
555 if (item.deviceInfo.size() != SHA256_DIGEST_LENGTH + MULTI_VER_TAG_SIZE &&
556 item.deviceInfo.size() != MULTI_VER_TAG_SIZE) {
557 errCode = -E_UNEXPECTED_DATA;
558 break;
559 }
560 if (item.deviceInfo.size() == MULTI_VER_TAG_SIZE ||
561 item.deviceInfo.compare(0, SHA256_DIGEST_LENGTH, rootNodeDeviceInfo, 0, SHA256_DIGEST_LENGTH) != 0) {
562 LOGD("Skip the version:%" PRIu64, item.version);
563 continue;
564 }
565 errCode = MergeOneCommit(item);
566 if (errCode != E_OK) {
567 break;
568 }
569 }
570
571 if (errCode != E_OK) {
572 (void)(RollbackSliceTransaction());
573 errCode = RollBackTransaction();
574 } else {
575 errCode = CommitSliceTransaction();
576 if (errCode == E_OK) {
577 errCode = CommitTransaction(rootCommitNode, true);
578 } else {
579 LOGE("Commit the slice transaction error, rollback the data transaction");
580 RollBackTransaction();
581 }
582 }
583 return errCode;
584 }
585
GetDiffEntries(const CommitID & begin,const CommitID & end,MultiVerDiffData & data) const586 int MultiVerStorageExecutor::GetDiffEntries(const CommitID &begin, const CommitID &end, MultiVerDiffData &data) const
587 {
588 if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
589 return -E_INVALID_DB;
590 }
591
592 int errCode = E_OK;
593 Version verBegin;
594 if (begin.empty()) {
595 verBegin = 0;
596 } else {
597 IKvDBCommit *commitBegin = commitStorage_->GetCommit(begin, errCode);
598 if (commitBegin == nullptr) {
599 verBegin = 0;
600 } else {
601 verBegin = commitBegin->GetCommitVersion();
602 }
603 commitStorage_->ReleaseCommit(commitBegin);
604 commitBegin = nullptr;
605 }
606
607 IKvDBCommit *commitEnd = commitStorage_->GetCommit(end, errCode);
608 if (commitEnd == nullptr) {
609 return CheckCorruptedStatus(errCode);
610 }
611
612 Version verEnd = commitEnd->GetCommitVersion();
613 commitStorage_->ReleaseCommit(commitEnd);
614 commitEnd = nullptr;
615
616 IKvDBMultiVerTransaction *transaction =
617 dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, verBegin, errCode);
618 if (transaction == nullptr) {
619 LOGE("Get diff data start read failed:%d", errCode);
620 return CheckCorruptedStatus(errCode);
621 }
622
623 errCode = transaction->GetDiffEntries(verBegin, verEnd, data);
624 if (errCode != E_OK) {
625 LOGE("get diff entries failed:%d", errCode);
626 goto END;
627 }
628
629 errCode = TransferDiffEntries(data);
630 END:
631 dataStorage_->ReleaseTransaction(transaction);
632 return CheckCorruptedStatus(errCode);
633 }
634
Get(const Key & key,Value & value) const635 int MultiVerStorageExecutor::Get(const Key &key, Value &value) const
636 {
637 if (dataStorage_ == nullptr) {
638 return -E_INVALID_DB;
639 }
640 int errCode = E_OK;
641 auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, readVersion_, errCode);
642 if (transaction == nullptr) {
643 LOGE("Get read transaction failed:%d", errCode);
644 return CheckCorruptedStatus(errCode);
645 }
646
647 Value rawValue;
648 errCode = transaction->Get(key, rawValue);
649
650 dataStorage_->ReleaseTransaction(transaction);
651 if (errCode != E_OK) {
652 return CheckCorruptedStatus(errCode);
653 }
654
655 return TransferToUserValue(rawValue, value);
656 }
657
GetEntries(const Key & keyPrefix,std::vector<Entry> & entries) const658 int MultiVerStorageExecutor::GetEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
659 {
660 if (dataStorage_ == nullptr) {
661 return -E_INVALID_DB;
662 }
663 int errCode = E_OK;
664 auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, readVersion_, errCode);
665 if (transaction == nullptr) {
666 LOGE("Get read transaction failed:%d", errCode);
667 return CheckCorruptedStatus(errCode);
668 }
669
670 errCode = transaction->GetEntries(keyPrefix, entries);
671
672 dataStorage_->ReleaseTransaction(transaction);
673 if (errCode != E_OK) {
674 return CheckCorruptedStatus(errCode);
675 }
676
677 for (auto &item : entries) {
678 Value userValue;
679 errCode = TransferToUserValue(item.value, userValue);
680 if (errCode != E_OK) {
681 entries.clear();
682 entries.shrink_to_fit();
683 break;
684 }
685 std::swap(userValue, item.value);
686 }
687
688 return CheckCorruptedStatus(errCode);
689 }
690
Put(const Key & key,const Value & value)691 int MultiVerStorageExecutor::Put(const Key &key, const Value &value)
692 {
693 if (transaction_ == nullptr) {
694 return -E_INVALID_DB;
695 }
696 Value savedValue;
697 int errCode = TransferToSavedValue(value, savedValue);
698 if (errCode != E_OK) {
699 return CheckCorruptedStatus(errCode);
700 }
701 errCode = transaction_->Put(key, savedValue);
702 return CheckCorruptedStatus(errCode);
703 }
704
Delete(const Key & key)705 int MultiVerStorageExecutor::Delete(const Key &key)
706 {
707 if (transaction_ == nullptr) {
708 return -E_INVALID_DB;
709 }
710
711 int errCode = transaction_->Delete(key);
712 return CheckCorruptedStatus(errCode);
713 }
714
Clear()715 int MultiVerStorageExecutor::Clear()
716 {
717 if (transaction_ == nullptr) {
718 return -E_INVALID_DB;
719 }
720
721 int errCode = transaction_->Clear();
722 return CheckCorruptedStatus(errCode);
723 }
724
StartAllDbTransaction()725 int MultiVerStorageExecutor::StartAllDbTransaction()
726 {
727 if (dataStorage_ == nullptr || commitStorage_ == nullptr) {
728 return -E_INVALID_DB;
729 }
730
731 IKvDBMultiVerTransaction *transaction = nullptr;
732 int errCode = dataStorage_->StartWrite(KvDataType::KV_DATA_SYNC_P2P, transaction);
733 if (transaction == nullptr) {
734 LOGE("start write transaction failed:%d", errCode);
735 return CheckCorruptedStatus(errCode);
736 }
737
738 // start data storage transaction
739 Version maxVersion = static_cast<MultiVerNaturalStore *>(kvDB_)->GetMaxCommitVersion();
740 transaction->SetVersion(maxVersion);
741
742 errCode = transaction->StartTransaction();
743 if (errCode != E_OK) {
744 LOGE("Start dataStorage transaction failed:%d", errCode);
745 goto END;
746 }
747
748 // start commit history transaction
749 errCode = commitStorage_->StartVacuum();
750 if (errCode != E_OK) {
751 transaction->RollBackTransaction();
752 LOGE("Start commitStorage transaction failed:%d", errCode);
753 goto END;
754 }
755
756 // start slice data transaction
757 errCode = StartSliceTransaction();
758 if (errCode != E_OK) {
759 transaction->RollBackTransaction();
760 commitStorage_->CancelVacuum();
761 LOGE("Start kvDataStorage transaction failed:%d", errCode);
762 goto END;
763 }
764 transaction_ = transaction;
765 END:
766 if (errCode != E_OK) {
767 dataStorage_->ReleaseTransaction(transaction);
768 transaction = nullptr;
769 transaction_ = nullptr;
770 return CheckCorruptedStatus(errCode);
771 }
772
773 return errCode;
774 }
775
StartTransaction(MultiTransactionType type)776 int MultiVerStorageExecutor::StartTransaction(MultiTransactionType type)
777 {
778 if (type == MultiTransactionType::ALL_DATA) {
779 return StartAllDbTransaction();
780 }
781
782 if (dataStorage_ == nullptr) {
783 return -E_INVALID_DB;
784 }
785 IKvDBMultiVerTransaction *transaction = nullptr;
786 int errCode = dataStorage_->StartWrite(KvDataType::KV_DATA_SYNC_P2P, transaction);
787 if (transaction == nullptr) {
788 LOGE("start write transaction failed:%d", errCode);
789 return CheckCorruptedStatus(errCode);
790 }
791
792 // Get the current max version, and the current version is max version + 1.
793 Version maxVersion = static_cast<MultiVerNaturalStore *>(kvDB_)->GetMaxCommitVersion();
794 transaction->SetVersion(++maxVersion);
795 errCode = transaction->StartTransaction();
796 if (errCode != E_OK) {
797 dataStorage_->ReleaseTransaction(transaction);
798 transaction = nullptr;
799 LOGE("Start transaction failed:%d", errCode);
800 return CheckCorruptedStatus(errCode);
801 }
802 transaction_ = transaction;
803 return E_OK;
804 }
805
CommitAllDbTransaction()806 int MultiVerStorageExecutor::CommitAllDbTransaction()
807 {
808 if (dataStorage_ == nullptr || commitStorage_ == nullptr || transaction_ == nullptr) {
809 return -E_INVALID_DB;
810 }
811
812 int errCode = transaction_->CommitTransaction();
813 if (errCode != E_OK) {
814 (void)(RollbackSliceTransaction());
815 commitStorage_->CancelVacuum();
816 LOGE("commit phase one failed:%d", errCode);
817 goto END;
818 }
819
820 // start slice data transaction
821 errCode = CommitSliceTransaction();
822 if (errCode != E_OK) {
823 commitStorage_->CancelVacuum();
824 LOGE("Finish kvDataStorage transaction failed:%d", errCode);
825 goto END;
826 }
827
828 // start commit history transaction
829 errCode = commitStorage_->FinishVacuum();
830 if (errCode != E_OK) {
831 LOGE("Finish commitStorage transaction failed:%d", errCode);
832 goto END;
833 }
834
835 END:
836 dataStorage_->ReleaseTransaction(transaction_);
837 transaction_ = nullptr;
838
839 return CheckCorruptedStatus(errCode);
840 }
841
CommitTransaction(MultiTransactionType type)842 int MultiVerStorageExecutor::CommitTransaction(MultiTransactionType type)
843 {
844 if (type == MultiTransactionType::ALL_DATA) {
845 return CommitAllDbTransaction();
846 }
847
848 if ((dataStorage_ == nullptr) || (transaction_ == nullptr)) {
849 return -E_INVALID_DB;
850 }
851 UpdateVerTimestamp multiVerTimestamp = {static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp(), true};
852 Version commitVersion;
853 CommitID commitId;
854 int errCode = E_OK;
855 bool isDataChanged = transaction_->IsDataChanged();
856 if (!isDataChanged) {
857 transaction_->RollBackTransaction();
858 goto END;
859 }
860
861 errCode = dataStorage_->CommitWritePhaseOne(transaction_, multiVerTimestamp);
862 if (errCode != E_OK) {
863 LOGE("commit phase one failed:%d", errCode);
864 goto END;
865 }
866
867 commitVersion = transaction_->GetVersion();
868 errCode = FillAndCommitLogEntry(commitVersion, commitId, multiVerTimestamp.timestamp);
869 if (errCode != E_OK) {
870 LOGE("rollback commit phase one failed:%d", errCode);
871 dataStorage_->RollbackWritePhaseOne(transaction_, commitVersion);
872 goto END;
873 }
874 LOGD("local commit version:%" PRIu64, commitVersion);
875 static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxTimestamp(multiVerTimestamp.timestamp);
876 dataStorage_->CommitWritePhaseTwo(transaction_);
877 static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxCommitVersion(commitVersion);
878 END:
879 dataStorage_->ReleaseTransaction(transaction_);
880 transaction_ = nullptr;
881 if (errCode == E_OK && isDataChanged) {
882 CommitNotifiedData(commitId);
883 }
884
885 return CheckCorruptedStatus(errCode);
886 }
887
RollBackAllDbTransaction()888 int MultiVerStorageExecutor::RollBackAllDbTransaction()
889 {
890 if ((dataStorage_ == nullptr) || (commitStorage_ == nullptr)) {
891 return -E_INVALID_DB;
892 }
893 int errCode = dataStorage_->RollbackWrite(transaction_);
894 if (errCode != E_OK) {
895 LOGE("Data storage rollback fail!");
896 (void)(commitStorage_->CancelVacuum());
897 (void)(RollbackSliceTransaction());
898 goto END;
899 }
900
901 errCode = commitStorage_->CancelVacuum();
902 if (errCode != E_OK) {
903 LOGE("Commit storage rollback fail!");
904 (void)(RollbackSliceTransaction());
905 goto END;
906 }
907
908 errCode = RollbackSliceTransaction();
909 if (errCode != E_OK) {
910 LOGE("Value slice rollback fail!");
911 }
912
913 END:
914 dataStorage_->ReleaseTransaction(transaction_);
915 transaction_ = nullptr;
916 return CheckCorruptedStatus(errCode);
917 }
918
RollBackTransaction(MultiTransactionType type)919 int MultiVerStorageExecutor::RollBackTransaction(MultiTransactionType type)
920 {
921 if (dataStorage_ == nullptr || transaction_ == nullptr) {
922 LOGE("invalid transaction for rollback");
923 return -E_INVALID_DB;
924 }
925
926 if (type == MultiTransactionType::ALL_DATA) {
927 return RollBackAllDbTransaction();
928 }
929
930 int errCode = dataStorage_->RollbackWrite(transaction_);
931 dataStorage_->ReleaseTransaction(transaction_);
932 transaction_ = nullptr;
933 return CheckCorruptedStatus(errCode);
934 }
935
Close()936 void MultiVerStorageExecutor::Close()
937 {
938 MultiVerStorageExecutor *handle = this;
939
940 MultiVerNaturalStore *multiVerNatureStore = static_cast<MultiVerNaturalStore *>(kvDB_);
941 if (multiVerNatureStore == nullptr) {
942 return;
943 }
944
945 if (readVersion_ != 0) {
946 multiVerNatureStore->RemoveVersionConstraintFromList(readVersion_);
947 readVersion_ = 0;
948 }
949 multiVerNatureStore->ReleaseHandle(handle);
950 }
951
InitCurrentReadVersion()952 int MultiVerStorageExecutor::InitCurrentReadVersion()
953 {
954 if (commitStorage_ == nullptr) {
955 return -E_INVALID_DB;
956 }
957 int errCode = E_OK;
958 CommitID commitId = commitStorage_->GetHeader(errCode);
959 if (errCode != E_OK) {
960 return CheckCorruptedStatus(errCode);
961 }
962
963 Version version = 0;
964 // if no head, just use the initial version.
965 if (!commitId.empty()) {
966 IKvDBCommit *commit = commitStorage_->GetCommit(commitId, errCode);
967 if (commit == nullptr) {
968 LOGE("get the header commit failed:%d", errCode);
969 return CheckCorruptedStatus(errCode);
970 }
971
972 version = commit->GetCommitVersion();
973 commitStorage_->ReleaseCommit(commit);
974 commit = nullptr;
975 }
976 readVersion_ = version;
977 return E_OK;
978 }
979
TransferDiffEntries(MultiVerDiffData & data) const980 int MultiVerStorageExecutor::TransferDiffEntries(MultiVerDiffData &data) const
981 {
982 int errCode;
983 Value valueTmp;
984 for (auto &insertedItem : data.inserted) {
985 errCode = TransferToUserValue(insertedItem.value, valueTmp);
986 if (errCode != E_OK) {
987 return errCode;
988 }
989 std::swap(insertedItem.value, valueTmp);
990 }
991
992 for (auto &updatedItem : data.updated) {
993 errCode = TransferToUserValue(updatedItem.value, valueTmp);
994 if (errCode != E_OK) {
995 return errCode;
996 }
997 std::swap(updatedItem.value, valueTmp);
998 }
999
1000 for (auto &deletedItem : data.deleted) {
1001 errCode = TransferToUserValue(deletedItem.value, valueTmp);
1002 if (errCode != E_OK) {
1003 return errCode;
1004 }
1005 std::swap(deletedItem.value, valueTmp);
1006 }
1007
1008 return E_OK;
1009 }
1010
TransferToUserValue(const Value & savedValue,Value & value) const1011 int MultiVerStorageExecutor::TransferToUserValue(const Value &savedValue, Value &value) const
1012 {
1013 MultiVerValueObject valueObject;
1014 int errCode = valueObject.DeSerialData(savedValue);
1015 if (errCode != E_OK) {
1016 LOGE("Deserialize the multi ver saved value failed:%d", errCode);
1017 return errCode;
1018 }
1019 if (!valueObject.IsHash()) {
1020 return valueObject.GetValue(value);
1021 }
1022
1023 std::vector<ValueSliceHash> sliceHashVect;
1024 errCode = valueObject.GetValueHash(sliceHashVect);
1025 if (errCode != E_OK) {
1026 return errCode;
1027 }
1028 value.clear();
1029 value.shrink_to_fit();
1030 for (const auto &item : sliceHashVect) {
1031 Value itemValue;
1032 errCode = GetValueSlice(item, itemValue);
1033 if (errCode != E_OK) {
1034 LOGE("Get hash entry error:%d", errCode);
1035 break;
1036 }
1037 value.insert(value.end(), itemValue.begin(), itemValue.end());
1038 }
1039
1040 return errCode;
1041 }
1042
TransferToValueObject(const Value & value,MultiVerValueObject & valueObject)1043 int MultiVerStorageExecutor::TransferToValueObject(const Value &value, MultiVerValueObject &valueObject)
1044 {
1045 MultiVerNaturalStoreTransferData splitData;
1046 std::vector<Value> partValues;
1047 // Segment data into blocks by fixed size
1048 // You can set Threshold and blocksize by SetSliceLengthThreshold, SetBlockSizeByte;
1049 int errCode = splitData.SegmentAndTransferValueToHash(value, partValues);
1050 if (errCode == E_OK) {
1051 valueObject.SetFlag(MultiVerValueObject::HASH_FLAG);
1052
1053 // Tansfer blocks data to hash value list
1054 std::vector<ValueSliceHash> hashValues;
1055 ValueSliceHash hashValue;
1056 for (const auto &partValue : partValues) {
1057 if (DBCommon::CalcValueHash(partValue, hashValue) != E_OK) {
1058 return -E_INTERNAL_ERROR;
1059 }
1060 // Put hash value into table
1061 errCode = PutValueSlice(hashValue, partValue, true);
1062 if (errCode != E_OK) {
1063 return errCode;
1064 }
1065 hashValues.push_back(std::move(hashValue));
1066 }
1067
1068 valueObject.SetValueHash(hashValues);
1069 } else {
1070 valueObject.SetFlag(0);
1071 valueObject.SetValue(value);
1072 }
1073 valueObject.SetDataLength(value.size());
1074 return E_OK;
1075 }
1076
TransferToSavedValue(const Value & value,Value & savedValue)1077 int MultiVerStorageExecutor::TransferToSavedValue(const Value &value, Value &savedValue)
1078 {
1079 MultiVerValueObject valueObject;
1080 int errCode = TransferToValueObject(value, valueObject);
1081 if (errCode != E_OK) {
1082 LOGE("Failed to get the serialize data of value object:%d", errCode);
1083 return errCode;
1084 }
1085
1086 errCode = valueObject.GetSerialData(savedValue);
1087 if (errCode != E_OK) {
1088 LOGE("failed to get the serialize data of savedValue:%d", errCode);
1089 return errCode;
1090 }
1091
1092 return E_OK;
1093 }
1094
GetResolvedConflictEntries(const MultiVerCommitNode & commitItem,std::vector<MultiVerKvEntry * > & entries) const1095 int MultiVerStorageExecutor::GetResolvedConflictEntries(const MultiVerCommitNode &commitItem,
1096 std::vector<MultiVerKvEntry *> &entries) const
1097 {
1098 if (commitStorage_ == nullptr) {
1099 return -E_INVALID_DB;
1100 }
1101 int errCode = E_OK;
1102 auto commit = commitStorage_->GetCommit(commitItem.commitId, errCode);
1103 if (commit == nullptr) {
1104 LOGE("failed to get the commit in merge:%d", errCode);
1105 return errCode;
1106 }
1107 entries.clear();
1108 entries.shrink_to_fit();
1109 Version version = commit->GetCommitVersion();
1110 LOGD("Version is %" PRIu64, version);
1111 if (transaction_ != nullptr) {
1112 errCode = transaction_->GetEntriesByVersion(version, entries);
1113 if (errCode != E_OK) {
1114 LOGE("failed to get the entries by version:%d", errCode);
1115 }
1116 }
1117 commitStorage_->ReleaseCommit(commit);
1118 return errCode;
1119 }
1120
CommitNotifiedData(const CommitID & commitId)1121 void MultiVerStorageExecutor::CommitNotifiedData(const CommitID &commitId)
1122 {
1123 CommitID startId;
1124 Version currentVersion = 0;
1125 int errCode = GetParentCommitId(commitId, startId, currentVersion);
1126 if (errCode != E_OK || currentVersion == 0) { // make sure that the version - 1 is valid.
1127 LOGE("Notify: get the parent commit failed:%d", errCode);
1128 return;
1129 }
1130 MultiVerNaturalStoreCommitNotifyData *committedData =
1131 new (std::nothrow) MultiVerNaturalStoreCommitNotifyData(
1132 static_cast<MultiVerNaturalStore *>(kvDB_), startId, commitId, currentVersion - 1);
1133 if (committedData != nullptr) {
1134 static_cast<MultiVerNaturalStore *>(kvDB_)->AddVersionConstraintToList(currentVersion - 1);
1135 static_cast<MultiVerNaturalStore *>(kvDB_)->CommitNotify(NATURAL_STORE_COMMIT_EVENT, committedData);
1136 committedData->DecObjRef(committedData);
1137 committedData = nullptr;
1138 } else {
1139 LOGE("Failed to do commit notify because of OOM.");
1140 }
1141 }
1142
GetParentCommitId(const CommitID & commitId,CommitID & parentId,Version & curVersion) const1143 int MultiVerStorageExecutor::GetParentCommitId(const CommitID &commitId, CommitID &parentId, Version &curVersion) const
1144 {
1145 if (commitStorage_ == nullptr) {
1146 return -E_INVALID_DB;
1147 }
1148 int errCode = E_OK;
1149 IKvDBCommit *commit = commitStorage_->GetCommit(commitId, errCode);
1150 if (commit == nullptr) {
1151 LOGE("Get commit failed while getting the parent id:%d", errCode);
1152 return CheckCorruptedStatus(errCode);
1153 }
1154
1155 parentId = commit->GetLeftParentId();
1156 curVersion = commit->GetCommitVersion();
1157 commitStorage_->ReleaseCommit(commit);
1158 commit = nullptr;
1159 return E_OK;
1160 }
1161
AllocNewCommitId(CommitID & commitId) const1162 int MultiVerStorageExecutor::AllocNewCommitId(CommitID &commitId) const
1163 {
1164 // Only for allocate for temporary.
1165 commitId.resize(COMMIT_ID_LENGTH);
1166 RAND_bytes(commitId.data(), COMMIT_ID_LENGTH);
1167 return E_OK;
1168 }
1169
FillAndCommitLogEntry(const Version & versionInfo,CommitID & commitId,uint64_t timestamp) const1170 int MultiVerStorageExecutor::FillAndCommitLogEntry(const Version &versionInfo, CommitID &commitId,
1171 uint64_t timestamp) const
1172 {
1173 if (kvDB_ == nullptr || commitStorage_ == nullptr) {
1174 return -E_INVALID_DB;
1175 }
1176 // Get the commit id.
1177 int errCode = E_OK;
1178 IKvDBCommit *commit = commitStorage_->AllocCommit(errCode);
1179 if (commit == nullptr) {
1180 LOGE("Failed to alloc the commit locally:%d", errCode);
1181 return errCode;
1182 }
1183
1184 (void)(AllocNewCommitId(commitId));
1185 std::vector<uint8_t> vectTag;
1186 static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTag(vectTag);
1187 std::string strTag(vectTag.begin(), vectTag.end());
1188
1189 // Get the commit struct.
1190 CommitID header = commitStorage_->GetHeader(errCode);
1191 if (errCode != E_OK) {
1192 goto END;
1193 }
1194
1195 commit->SetLeftParentId(header);
1196 commit->SetCommitId(commitId);
1197 commit->SetCommitVersion(versionInfo);
1198 commit->SetLocalFlag(true);
1199 commit->SetTimestamp(timestamp);
1200 commit->SetDeviceInfo(strTag);
1201
1202 // write the commit history.
1203 errCode = commitStorage_->AddCommit(*commit, true);
1204 if (errCode != E_OK) {
1205 LOGE("Add commit history failed:%d", errCode);
1206 }
1207
1208 END:
1209 if (commit != nullptr) {
1210 commitStorage_->ReleaseCommit(commit);
1211 commit = nullptr;
1212 }
1213 return errCode;
1214 }
1215
FillCommitByForeign(IKvDBCommit * commit,const MultiVerCommitNode & multiVerCommit,const Version & versionInfo,const CommitID & commitId,bool isMerge) const1216 int MultiVerStorageExecutor::FillCommitByForeign(IKvDBCommit *commit,
1217 const MultiVerCommitNode &multiVerCommit, const Version &versionInfo, const CommitID &commitId, bool isMerge) const
1218 {
1219 if (isMerge) {
1220 if (commitStorage_ == nullptr || kvDB_ == nullptr) {
1221 return -E_INVALID_DB;
1222 }
1223
1224 int errCode = E_OK;
1225 CommitID header = commitStorage_->GetHeader(errCode);
1226 if (errCode != E_OK) {
1227 return errCode;
1228 }
1229 std::vector<uint8_t> vectTag;
1230 static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTag(vectTag);
1231 std::string strTag(vectTag.begin(), vectTag.end());
1232
1233 commit->SetCommitId(commitId);
1234 commit->SetLeftParentId(header);
1235 commit->SetRightParentId(multiVerCommit.commitId);
1236 commit->SetLocalFlag(true);
1237 Timestamp timestamp = static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp();
1238 commit->SetTimestamp(timestamp);
1239 commit->SetDeviceInfo(strTag);
1240 } else {
1241 commit->SetCommitId(multiVerCommit.commitId);
1242 commit->SetLeftParentId(multiVerCommit.leftParent);
1243 commit->SetRightParentId(multiVerCommit.rightParent);
1244 commit->SetTimestamp(multiVerCommit.timestamp);
1245 commit->SetLocalFlag(false);
1246 commit->SetDeviceInfo(multiVerCommit.deviceInfo);
1247 }
1248
1249 commit->SetCommitVersion(versionInfo);
1250 return E_OK;
1251 }
1252
FillAndCommitLogEntry(const Version & versionInfo,const MultiVerCommitNode & multiVerCommit,CommitID & commitId,bool isMerge,Timestamp & timestamp) const1253 int MultiVerStorageExecutor::FillAndCommitLogEntry(const Version &versionInfo,
1254 const MultiVerCommitNode &multiVerCommit, CommitID &commitId, bool isMerge, Timestamp ×tamp) const
1255 {
1256 if (commitStorage_ == nullptr) {
1257 return -E_INVALID_DB;
1258 }
1259 int errCode = E_OK;
1260 IKvDBCommit *commit = commitStorage_->AllocCommit(errCode);
1261 if (commit == nullptr) {
1262 return errCode;
1263 }
1264
1265 if (isMerge) {
1266 (void)(AllocNewCommitId(commitId));
1267 }
1268
1269 errCode = FillCommitByForeign(commit, multiVerCommit, versionInfo, commitId, isMerge);
1270 if (errCode != E_OK) {
1271 LOGE("Failed to fill the sync commit:%d", errCode);
1272 goto END;
1273 }
1274
1275 timestamp = isMerge ? static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp() : multiVerCommit.timestamp;
1276 commit->SetTimestamp(timestamp);
1277
1278 // write the commit history.
1279 errCode = commitStorage_->AddCommit(*commit, isMerge);
1280 if (errCode != E_OK) {
1281 LOGE("Add commit history failed:%d", errCode);
1282 }
1283 END:
1284 if (commit != nullptr) {
1285 commitStorage_->ReleaseCommit(commit);
1286 commit = nullptr;
1287 }
1288
1289 return errCode;
1290 }
1291
CommitTransaction(const MultiVerCommitNode & multiVerCommit,bool isMerge)1292 int MultiVerStorageExecutor::CommitTransaction(const MultiVerCommitNode &multiVerCommit, bool isMerge)
1293 {
1294 if ((transaction_ == nullptr) || (dataStorage_ == nullptr)) {
1295 LOGE("invalid transaction for commit");
1296 return -E_INVALID_DB;
1297 }
1298
1299 Version commitVersion;
1300 CommitID commitId;
1301 UpdateVerTimestamp multiVerTimestamp = {0ULL, false};
1302 bool isDataChanged = transaction_->IsDataChanged();
1303
1304 int errCode = dataStorage_->CommitWritePhaseOne(transaction_, multiVerTimestamp);
1305 if (errCode != E_OK) {
1306 LOGE("commit phase one failed:%d", errCode);
1307 goto END;
1308 }
1309
1310 commitVersion = transaction_->GetVersion();
1311 errCode = FillAndCommitLogEntry(commitVersion, multiVerCommit, commitId, isMerge, multiVerTimestamp.timestamp);
1312 if (errCode != E_OK) {
1313 LOGE("rollback commit phase one failed:%d", errCode);
1314 dataStorage_->RollbackWritePhaseOne(transaction_, commitVersion);
1315 goto END;
1316 }
1317
1318 dataStorage_->CommitWritePhaseTwo(transaction_);
1319 static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxTimestamp(multiVerTimestamp.timestamp);
1320 static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxCommitVersion(commitVersion);
1321 LOGD("sync commit version:%" PRIu64, commitVersion);
1322 END:
1323 dataStorage_->ReleaseTransaction(transaction_);
1324 transaction_ = nullptr;
1325
1326 if (errCode == E_OK && isMerge && isDataChanged) {
1327 CommitNotifiedData(commitId);
1328 }
1329
1330 return CheckCorruptedStatus(errCode);
1331 }
1332
ReleaseMultiVerKvEntries(std::vector<MultiVerKvEntry * > & entries)1333 void MultiVerStorageExecutor::ReleaseMultiVerKvEntries(std::vector<MultiVerKvEntry *> &entries)
1334 {
1335 for (auto &item : entries) {
1336 if (item != nullptr) {
1337 delete item;
1338 item = nullptr;
1339 }
1340 }
1341 entries.clear();
1342 entries.shrink_to_fit();
1343 }
1344
GetCurrentReadVersion() const1345 Version MultiVerStorageExecutor::GetCurrentReadVersion() const
1346 {
1347 return readVersion_;
1348 }
1349
GetAllCommitsInTree(std::list<MultiVerCommitNode> & commits) const1350 int MultiVerStorageExecutor::GetAllCommitsInTree(std::list<MultiVerCommitNode> &commits) const
1351 {
1352 if (commitStorage_ == nullptr) {
1353 return -E_INVALID_DB;
1354 }
1355
1356 return commitStorage_->GetAllCommitsInTree(commits);
1357 }
1358
GetEntriesByVersion(Version version,std::list<MultiVerTrimedVersionData> & data) const1359 int MultiVerStorageExecutor::GetEntriesByVersion(Version version, std::list<MultiVerTrimedVersionData> &data) const
1360 {
1361 if (dataStorage_ == nullptr) {
1362 return -E_INVALID_DB;
1363 }
1364
1365 int errCode = E_OK;
1366 IKvDBMultiVerTransaction *transaction = nullptr;
1367 if (transaction_ == nullptr) {
1368 transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
1369 if (transaction == nullptr) {
1370 LOGE("Failed to get the transaction:%d", errCode);
1371 goto END;
1372 }
1373 } else {
1374 transaction = transaction_;
1375 }
1376
1377 // Note that the transaction fails and the parameters are empty.
1378 errCode = transaction->GetEntriesByVersion(version, data);
1379 END:
1380 if (transaction != transaction_) {
1381 dataStorage_->ReleaseTransaction(transaction);
1382 transaction = nullptr;
1383 }
1384 return CheckCorruptedStatus(errCode);
1385 }
1386
GetOverwrittenClearTypeEntries(Version clearVersion,std::list<MultiVerTrimedVersionData> & data) const1387 int MultiVerStorageExecutor::GetOverwrittenClearTypeEntries(Version clearVersion,
1388 std::list<MultiVerTrimedVersionData> &data) const
1389 {
1390 if (dataStorage_ == nullptr) {
1391 return -E_INVALID_DB;
1392 }
1393
1394 int errCode = E_OK;
1395 IKvDBMultiVerTransaction *transaction = nullptr;
1396 if (transaction_ == nullptr) {
1397 transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, clearVersion, errCode);
1398 if (transaction == nullptr) {
1399 LOGE("Failed to get the transaction:%d", errCode);
1400 goto END;
1401 }
1402 } else {
1403 transaction = transaction_;
1404 }
1405
1406 errCode = transaction->GetOverwrittenClearTypeEntries(clearVersion, data);
1407 END:
1408 if (transaction != transaction_) {
1409 dataStorage_->ReleaseTransaction(transaction);
1410 transaction = nullptr;
1411 }
1412
1413 return CheckCorruptedStatus(errCode);
1414 }
1415
GetOverwrittenNonClearTypeEntries(Version version,const Key & hashKey,std::list<MultiVerTrimedVersionData> & data) const1416 int MultiVerStorageExecutor::GetOverwrittenNonClearTypeEntries(Version version, const Key &hashKey,
1417 std::list<MultiVerTrimedVersionData> &data) const
1418 {
1419 if (dataStorage_ == nullptr) {
1420 return -E_INVALID_DB;
1421 }
1422
1423 int errCode = E_OK;
1424 IKvDBMultiVerTransaction *transaction = nullptr;
1425 if (transaction_ == nullptr) {
1426 transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
1427 if (transaction == nullptr) {
1428 LOGE("Failed to get the transaction:%d", errCode);
1429 goto END;
1430 }
1431 } else {
1432 transaction = transaction_;
1433 }
1434
1435 errCode = transaction->GetOverwrittenNonClearTypeEntries(version, hashKey, data);
1436 END:
1437 if (transaction != transaction_) {
1438 dataStorage_->ReleaseTransaction(transaction);
1439 transaction = nullptr;
1440 }
1441
1442 return CheckCorruptedStatus(errCode);
1443 }
1444
DeleteEntriesByHashKey(Version version,const Key & hashKey)1445 int MultiVerStorageExecutor::DeleteEntriesByHashKey(Version version, const Key &hashKey)
1446 {
1447 if (transaction_ == nullptr) {
1448 LOGI("You need start transaction before this operation!");
1449 return -E_NOT_PERMIT;
1450 }
1451
1452 Value savedValue;
1453 int errCode = transaction_->GetValueForTrimSlice(hashKey, version, savedValue);
1454 if (errCode != E_OK) {
1455 return CheckCorruptedStatus(errCode);
1456 }
1457
1458 errCode = transaction_->DeleteEntriesByHashKey(version, hashKey);
1459 if (errCode != E_OK) {
1460 return CheckCorruptedStatus(errCode);
1461 }
1462
1463 MultiVerValueObject valueObject;
1464 errCode = valueObject.DeSerialData(savedValue);
1465 // savedValue empty is del or clear record
1466 if (!valueObject.IsHash() || savedValue.empty()) {
1467 return E_OK;
1468 }
1469 if (errCode != E_OK) {
1470 return errCode;
1471 }
1472
1473 std::vector<ValueSliceHash> sliceHashVect;
1474 errCode = valueObject.GetValueHash(sliceHashVect);
1475 if (errCode != E_OK) {
1476 return errCode;
1477 }
1478
1479 for (const auto &item : sliceHashVect) {
1480 errCode = DeleteValueSliceInner(sliceTransaction_, item);
1481 if (errCode != E_OK) {
1482 LOGI("Value slice delete fail!");
1483 break;
1484 }
1485 }
1486
1487 return CheckCorruptedStatus(errCode);
1488 }
1489
UpdateTrimedFlag(Version version,const Key & hashKey)1490 int MultiVerStorageExecutor::UpdateTrimedFlag(Version version, const Key &hashKey)
1491 {
1492 (void)version;
1493 (void)hashKey;
1494 return E_OK;
1495 }
1496
UpdateTrimedFlag(const CommitID & commit)1497 int MultiVerStorageExecutor::UpdateTrimedFlag(const CommitID &commit)
1498 {
1499 (void)commit;
1500 return E_OK;
1501 }
1502 } // namespace DistributedDB
1503 #endif
1504