1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_storage_executor.h"
18 
19 #include <openssl/rand.h>
20 
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "multi_ver_natural_store.h"
25 #include "multi_ver_natural_store_commit_notify_data.h"
26 #include "multi_ver_natural_store_transfer_data.h"
27 #include "value_hash_calc.h"
28 
29 namespace DistributedDB {
MultiVerStorageExecutor(IKvDB * kvDB,IKvDBMultiVerDataStorage * dataStorage,IKvDBCommitStorage * commitStorage,MultiVerKvDataStorage * kvDataStorage,bool writable)30 MultiVerStorageExecutor::MultiVerStorageExecutor(IKvDB *kvDB, IKvDBMultiVerDataStorage *dataStorage,
31     IKvDBCommitStorage *commitStorage, MultiVerKvDataStorage *kvDataStorage, bool writable)
32     : StorageExecutor(writable),
33       kvDB_(kvDB),
34       dataStorage_(dataStorage),
35       commitStorage_(commitStorage),
36       kvDataStorage_(kvDataStorage),
37       transaction_(nullptr),
38       sliceTransaction_(nullptr)
39 {}
40 
~MultiVerStorageExecutor()41 MultiVerStorageExecutor::~MultiVerStorageExecutor()
42 {
43     kvDB_ = nullptr;
44     dataStorage_ = nullptr;
45     commitStorage_ = nullptr;
46     kvDataStorage_ = nullptr;
47     transaction_ = nullptr;
48 }
49 
Reset()50 int MultiVerStorageExecutor::Reset()
51 {
52     return E_OK;
53 }
54 
PutMetaData(const Key & key,const Value & value)55 int MultiVerStorageExecutor::PutMetaData(const Key &key, const Value &value)
56 {
57     if (kvDataStorage_ == nullptr) {
58         return -E_INVALID_DB;
59     }
60 
61     int errCode = kvDataStorage_->PutMetaData(key, value);
62     return CheckCorruptedStatus(errCode);
63 }
64 
GetMetaData(const Key & key,Value & value) const65 int MultiVerStorageExecutor::GetMetaData(const Key &key, Value &value) const
66 {
67     if (kvDataStorage_ == nullptr) {
68         return -E_INVALID_DB;
69     }
70 
71     int errCode = kvDataStorage_->GetMetaData(key, value);
72     return CheckCorruptedStatus(errCode);
73 }
74 
GetDeviceLatestCommit(std::map<std::string,MultiVerCommitNode> & commitMap) const75 int MultiVerStorageExecutor::GetDeviceLatestCommit(std::map<std::string, MultiVerCommitNode> &commitMap) const
76 {
77     if (commitStorage_ == nullptr) {
78         LOGE("The commit history module is null.");
79         return -E_INVALID_DB;
80     }
81     std::map<DeviceID, IKvDBCommit *> latestCommits;
82     int errCode = commitStorage_->GetLatestCommits(latestCommits);
83     if (errCode != E_OK) {
84         LOGE("Get latest commits failed:%d", errCode);
85         return CheckCorruptedStatus(errCode);
86     }
87     for (auto &latestCommit : latestCommits) {
88         uint64_t localFlag = (latestCommit.second->GetLocalFlag() ?
89             MultiVerCommitNode::LOCAL_FLAG : MultiVerCommitNode::NON_LOCAL_FLAG);
90         MultiVerCommitNode commit = {
91             latestCommit.second->GetCommitId(), // commitId
92             latestCommit.second->GetLeftParentId(), // leftParent
93             latestCommit.second->GetRightParentId(), // rightParent
94             latestCommit.second->GetTimestamp(), // timestamp
95             latestCommit.second->GetCommitVersion(), // version
96             localFlag, // isLocal
97             latestCommit.second->GetDeviceInfo() // deviceInfo
98         };
99 
100         commitStorage_->ReleaseCommit(latestCommit.second);
101         latestCommit.second = nullptr;
102         commitMap.insert(std::make_pair(latestCommit.first, std::move(commit)));
103     }
104     latestCommits.clear();
105     return E_OK;
106 }
107 
GetCommitTree(const std::map<std::string,MultiVerCommitNode> & commitMap,std::vector<MultiVerCommitNode> & commits) const108 int MultiVerStorageExecutor::GetCommitTree(const std::map<std::string, MultiVerCommitNode> &commitMap,
109     std::vector<MultiVerCommitNode> &commits) const
110 {
111     if (commitStorage_ == nullptr) {
112         LOGE("The commit history module is null.");
113         return -E_INVALID_DB;
114     }
115     std::map<DeviceID, CommitID> latestCommits;
116     for (auto &latestCommit : commitMap) {
117         latestCommits.insert(std::make_pair(latestCommit.first, latestCommit.second.commitId));
118     }
119     std::list<IKvDBCommit *> commitTree;
120     int errCode = commitStorage_->GetCommitTree(latestCommits, commitTree);
121     if (errCode != E_OK) {
122         LOGE("Get commit tree failed:%d", errCode);
123         return CheckCorruptedStatus(errCode);
124     }
125     LOGD("Get commit tree size:%zu", commitTree.size());
126     for (auto &commitNode : commitTree) {
127         if (commitNode == nullptr) {
128             continue;
129         }
130         uint64_t localFlag = (commitNode->GetLocalFlag() ?
131             MultiVerCommitNode::LOCAL_FLAG : MultiVerCommitNode::NON_LOCAL_FLAG);
132         MultiVerCommitNode commit = {
133             commitNode->GetCommitId(), // commitId
134             commitNode->GetLeftParentId(), // leftParent
135             commitNode->GetRightParentId(), // rightParent
136             commitNode->GetTimestamp(), // timestamp
137             commitNode->GetCommitVersion(), // version
138             localFlag, // isLocal
139             commitNode->GetDeviceInfo() // deviceInfo
140         };
141 
142         commitStorage_->ReleaseCommit(commitNode);
143         commitNode = nullptr;
144         commits.push_back(std::move(commit));
145     }
146     commitTree.clear();
147     return E_OK;
148 }
149 
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries) const150 int MultiVerStorageExecutor::GetCommitData(const MultiVerCommitNode &commit,
151     std::vector<MultiVerKvEntry *> &entries) const
152 {
153     if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
154         return -E_INVALID_DB;
155     }
156     // call the putting value method.
157     CommitID commitId = commit.commitId;
158     int errCode = E_OK;
159     Version version;
160     IKvDBCommit *commitNode = commitStorage_->GetCommit(commitId, errCode);
161     if (commitNode == nullptr) {
162         LOGE("Failed to get the commit:%d", errCode);
163         return CheckCorruptedStatus(errCode);
164     }
165 
166     // Get the commit and the version.
167     std::string devInfo = commitNode->GetDeviceInfo();
168     version = commitNode->GetCommitVersion();
169     commitStorage_->ReleaseCommit(commitNode);
170     commitNode = nullptr;
171     if (devInfo.size() != MULTI_VER_TAG_SIZE) {
172         LOGD("skip the foreign data");
173         entries.clear();
174         entries.shrink_to_fit();
175         return E_OK;
176     }
177 
178     IKvDBMultiVerTransaction *transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
179     if (transaction == nullptr) {
180         LOGE("Failed to get the transaction:%d", errCode);
181         goto END;
182     }
183 
184     errCode = transaction->GetEntriesByVersion(version, entries);
185     if (errCode != E_OK) {
186         LOGE("Get entries by version failed:%d", errCode);
187     }
188 END:
189     if (transaction != nullptr) {
190         dataStorage_->ReleaseTransaction(transaction);
191         transaction = nullptr;
192     }
193     return CheckCorruptedStatus(errCode);
194 }
195 
IsCommitExisted(const MultiVerCommitNode & commit,int & errCode) const196 bool MultiVerStorageExecutor::IsCommitExisted(const MultiVerCommitNode &commit, int &errCode) const
197 {
198     if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
199         LOGE("The commit history module or data storage is null.");
200         return false;
201     }
202     auto readCommit = commitStorage_->GetCommit(commit.commitId, errCode);
203     if (readCommit == nullptr) {
204         return false;
205     }
206     commitStorage_->ReleaseCommit(readCommit);
207 
208     bool result = false;
209     std::vector<MultiVerKvEntry *> entries;
210     auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, commit.version, errCode);
211     if (transaction == nullptr) {
212         LOGE("Failed to get the transaction:%d", errCode);
213         goto END;
214     }
215 
216     errCode = transaction->GetEntriesByVersion(commit.version, entries);
217     if (errCode != E_OK) {
218         LOGE("Get entries by version failed:%d", errCode);
219         goto END;
220     }
221     if (!entries.empty()) {
222         result = true;
223     }
224 END:
225     if (errCode != E_OK) {
226         result = false;
227     }
228     if (transaction != nullptr) {
229         dataStorage_->ReleaseTransaction(transaction);
230     }
231 
232     ReleaseMultiVerKvEntries(entries);
233     errCode = CheckCorruptedStatus(errCode);
234     return result;
235 }
236 
IsValueSliceExisted(const ValueSliceHash & value,int & errCode) const237 bool MultiVerStorageExecutor::IsValueSliceExisted(const ValueSliceHash &value, int &errCode) const
238 {
239     if (kvDataStorage_ == nullptr) {
240         errCode = -E_INVALID_DB;
241         return false;
242     }
243     auto sliceTransaction = kvDataStorage_->GetSliceTransaction(false, errCode);
244     if (sliceTransaction == nullptr) {
245         (void)(CheckCorruptedStatus(errCode));
246         return false;
247     }
248     Value valueReal;
249     errCode = sliceTransaction->GetData(value, valueReal);
250     kvDataStorage_->ReleaseSliceTransaction(sliceTransaction);
251     if (errCode == E_OK) {
252         return true;
253     }
254     (void)(CheckCorruptedStatus(errCode));
255     return false;
256 }
257 
GetValueSlice(const ValueSliceHash & hashValue,ValueSlice & sliceValue) const258 int MultiVerStorageExecutor::GetValueSlice(const ValueSliceHash &hashValue, ValueSlice &sliceValue) const
259 {
260     return GetValueSliceInner(nullptr, hashValue, sliceValue);
261 }
262 
PutValueSlice(const ValueSliceHash & hashValue,const ValueSlice & sliceValue,bool isAddCount)263 int MultiVerStorageExecutor::PutValueSlice(const ValueSliceHash &hashValue, const ValueSlice &sliceValue,
264     bool isAddCount)
265 {
266     return PutValueSliceInner(nullptr, hashValue, sliceValue, isAddCount);
267 }
268 
GetValueSliceInner(const SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue,ValueSlice & sliceValue) const269 int MultiVerStorageExecutor::GetValueSliceInner(const SliceTransaction *sliceTransaction,
270     const ValueSliceHash &hashValue, ValueSlice &sliceValue) const
271 {
272     int errCode;
273     if (sliceTransaction != nullptr) {
274         errCode = sliceTransaction->GetData(hashValue, sliceValue);
275         return CheckCorruptedStatus(errCode);
276     }
277     if (kvDataStorage_ == nullptr) {
278         return -E_INVALID_DB;
279     }
280 
281     auto sliceTransact = kvDataStorage_->GetSliceTransaction(false, errCode);
282     if (sliceTransact == nullptr) {
283         return CheckCorruptedStatus(errCode);
284     }
285 
286     errCode = sliceTransact->GetData(hashValue, sliceValue);
287     kvDataStorage_->ReleaseSliceTransaction(sliceTransact);
288     return CheckCorruptedStatus(errCode);
289 }
290 
PutValueSliceInner(SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue,const ValueSlice & sliceValue,bool isAddCount)291 int MultiVerStorageExecutor::PutValueSliceInner(SliceTransaction *sliceTransaction, const ValueSliceHash &hashValue,
292     const ValueSlice &sliceValue, bool isAddCount)
293 {
294     int errCode;
295     if (sliceTransaction != nullptr) {
296         errCode = sliceTransaction->PutData(hashValue, sliceValue, isAddCount);
297         return CheckCorruptedStatus(errCode);
298     }
299 
300     if (kvDataStorage_ == nullptr) {
301         return -E_INVALID_DB;
302     }
303 
304     auto transaction = kvDataStorage_->GetSliceTransaction(true, errCode);
305     if (transaction == nullptr) {
306         return CheckCorruptedStatus(errCode);
307     }
308 
309     errCode = transaction->PutData(hashValue, sliceValue, isAddCount);
310     kvDataStorage_->ReleaseSliceTransaction(transaction);
311     return CheckCorruptedStatus(errCode);
312 }
313 
DeleteValueSliceInner(SliceTransaction * sliceTransaction,const ValueSliceHash & hashValue)314 int MultiVerStorageExecutor::DeleteValueSliceInner(SliceTransaction *sliceTransaction,
315     const ValueSliceHash &hashValue)
316 {
317     int errCode;
318     if (sliceTransaction != nullptr) {
319         errCode = sliceTransaction->DeleteData(hashValue);
320         return CheckCorruptedStatus(errCode);
321     }
322 
323     if (kvDataStorage_ == nullptr) {
324         return -E_INVALID_DB;
325     }
326 
327     auto transaction = kvDataStorage_->GetSliceTransaction(true, errCode);
328     if (transaction == nullptr) {
329         return CheckCorruptedStatus(errCode);
330     }
331 
332     errCode = transaction->DeleteData(hashValue);
333     kvDataStorage_->ReleaseSliceTransaction(transaction);
334     return CheckCorruptedStatus(errCode);
335 }
336 
StartSliceTransaction()337 int MultiVerStorageExecutor::StartSliceTransaction()
338 {
339     if (kvDataStorage_ == nullptr) {
340         return -E_INVALID_DB;
341     }
342     if (sliceTransaction_ != nullptr) {
343         return -E_UNEXPECTED_DATA;
344     }
345     int errCode;
346     sliceTransaction_ = kvDataStorage_->GetSliceTransaction(true, errCode);
347     if (sliceTransaction_ == nullptr) {
348         return errCode;
349     }
350     errCode = sliceTransaction_->StartTransaction();
351     if (errCode != E_OK) {
352         kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
353     }
354     return errCode;
355 }
356 
CommitSliceTransaction()357 int MultiVerStorageExecutor::CommitSliceTransaction()
358 {
359     if (sliceTransaction_ == nullptr) {
360         return -E_UNEXPECTED_DATA;
361     }
362     int errCode = sliceTransaction_->CommitTransaction();
363     if (errCode != E_OK) {
364         LOGE("Commit slice transaction failed:%d", errCode);
365     }
366     if (kvDataStorage_ == nullptr) {
367         return -E_INVALID_DB;
368     }
369     kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
370     sliceTransaction_ = nullptr;
371     return errCode;
372 }
373 
RollbackSliceTransaction()374 int MultiVerStorageExecutor::RollbackSliceTransaction()
375 {
376     if (sliceTransaction_ == nullptr) {
377         return -E_UNEXPECTED_DATA;
378     }
379     int errCode = sliceTransaction_->RollbackTransaction();
380     if (errCode != E_OK) {
381         LOGE("Commit slice transaction failed:%d", errCode);
382     }
383     if (kvDataStorage_ == nullptr) {
384         return -E_INVALID_DB;
385     }
386     kvDataStorage_->ReleaseSliceTransaction(sliceTransaction_);
387     sliceTransaction_ = nullptr;
388     return errCode;
389 }
390 
ReInitTransactionVersion(const MultiVerCommitNode & commit)391 int MultiVerStorageExecutor::ReInitTransactionVersion(const MultiVerCommitNode &commit)
392 {
393     if (commitStorage_ == nullptr) {
394         LOGE("The commit history module is null when reinit transaction version.");
395         return -E_INVALID_DB;
396     }
397     int errCode = StartTransaction();
398     if (errCode != E_OK) {
399         LOGE("Start transaction failed:%d", errCode);
400         return errCode;
401     }
402     auto readCommit = commitStorage_->GetCommit(commit.commitId, errCode);
403     if (readCommit == nullptr) {
404         if (errCode != -E_NOT_FOUND) {
405             RollBackTransaction();
406             LOGE("Get the commit error:%d", errCode);
407             return errCode;
408         } else {
409             errCode = E_OK;
410         }
411     } else {
412         LOGD("Reput the version:%" PRIu64, readCommit->GetCommitVersion());
413         transaction_->SetVersion(readCommit->GetCommitVersion());
414         commitStorage_->ReleaseCommit(readCommit);
415     }
416 
417     if (errCode != E_OK) {
418         RollBackTransaction();
419     }
420     return errCode;
421 }
422 
AddSliceDataCount(const std::vector<Value> & values)423 int MultiVerStorageExecutor::AddSliceDataCount(const std::vector<Value> &values)
424 {
425     for (const auto &item : values) {
426         MultiVerValueObject valueObject;
427         int errCode = valueObject.DeSerialData(item);
428         if (errCode != E_OK) {
429             return errCode;
430         }
431         if (!valueObject.IsHash()) {
432             continue;
433         }
434         std::vector<ValueSliceHash> valueHashList;
435         valueObject.GetValueHash(valueHashList);
436         for (const auto &iter : valueHashList) {
437             Value filledData;
438             errCode = PutValueSliceInner(sliceTransaction_, iter, filledData, true);
439             if (errCode != E_OK) {
440                 LOGE("Add the slice value count failed:%d", errCode);
441                 return errCode;
442             }
443         }
444     }
445     return E_OK;
446 }
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)447 int MultiVerStorageExecutor::PutCommitData(const MultiVerCommitNode &commit,
448     const std::vector<MultiVerKvEntry *> &entries, const std::string &deviceName)
449 {
450     // Update the version while the commit has been put.
451     int errCode = ReInitTransactionVersion(commit);
452     if (errCode != E_OK) {
453         return CheckCorruptedStatus(errCode);
454     }
455     errCode = StartSliceTransaction();
456     if (errCode != E_OK) {
457         RollBackTransaction();
458         return CheckCorruptedStatus(errCode);
459     }
460 
461     if (transaction_ == nullptr) {
462         return -E_INVALID_DB;
463     }
464 
465     std::vector<Value> values;
466     errCode = transaction_->PutBatch(entries, false, values);
467     if (errCode != E_OK) {
468         LOGE("Put batch synced data failed:%d", errCode);
469         goto END;
470     }
471     errCode = AddSliceDataCount(values);
472     if (errCode != E_OK) {
473         goto END;
474     }
475     errCode = CommitSliceTransaction();
476     if (errCode != E_OK) {
477         RollBackTransaction();
478     } else {
479         errCode = CommitTransaction(commit, false);
480     }
481     return CheckCorruptedStatus(errCode);
482 END:
483     if (errCode != E_OK) {
484         (void)(RollbackSliceTransaction());
485         RollBackTransaction();
486     }
487     return CheckCorruptedStatus(errCode);
488 }
489 
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)490 int MultiVerStorageExecutor::MergeSyncCommit(const MultiVerCommitNode &commit,
491     const std::vector<MultiVerCommitNode> &commits)
492 {
493     if (commits.empty()) {
494         return E_OK;
495     }
496     // if all the nodes have two parents, no need to merge.
497     bool isAllMerged = true;
498     for (const auto &item : commits) {
499         if (item.rightParent.empty()) {
500             isAllMerged = false;
501         }
502     }
503 
504     if (isAllMerged) {
505         LOGI("all nodes have been merged");
506         return E_OK;
507     }
508 
509     int errCode = MergeCommits(commits);
510     return CheckCorruptedStatus(errCode);
511 }
512 
MergeOneCommit(const MultiVerCommitNode & commit)513 int MultiVerStorageExecutor::MergeOneCommit(const MultiVerCommitNode &commit)
514 {
515     std::vector<MultiVerKvEntry *> entries;
516     int errCode = GetResolvedConflictEntries(commit, entries);
517     if (errCode != E_OK) {
518         return errCode;
519     }
520 
521     if (transaction_ == nullptr) {
522         return -E_INVALID_DB;
523     }
524 
525     std::vector<Value> values;
526     errCode = transaction_->PutBatch(entries, true, values);
527     if (errCode != E_OK) {
528         goto END;
529     }
530 
531     errCode = AddSliceDataCount(values);
532 END:
533     ReleaseMultiVerKvEntries(entries);
534     return errCode;
535 }
536 
MergeCommits(const std::vector<MultiVerCommitNode> & commits)537 int MultiVerStorageExecutor::MergeCommits(const std::vector<MultiVerCommitNode> &commits)
538 {
539     const MultiVerCommitNode &rootCommitNode = commits.back();
540     std::string rootNodeDeviceInfo = rootCommitNode.deviceInfo;
541     if (rootNodeDeviceInfo.size() != SHA256_DIGEST_LENGTH + MULTI_VER_TAG_SIZE) {
542         return -E_UNEXPECTED_DATA;
543     }
544     int errCode = StartTransaction();
545     if (errCode != E_OK) {
546         return errCode;
547     }
548     errCode = StartSliceTransaction();
549     if (errCode != E_OK) {
550         RollBackTransaction();
551         return errCode;
552     }
553     for (const auto &item : commits) {
554         // only need to merge the node data which is from the same device
555         if (item.deviceInfo.size() != SHA256_DIGEST_LENGTH + MULTI_VER_TAG_SIZE &&
556             item.deviceInfo.size() != MULTI_VER_TAG_SIZE) {
557             errCode = -E_UNEXPECTED_DATA;
558             break;
559         }
560         if (item.deviceInfo.size() == MULTI_VER_TAG_SIZE ||
561             item.deviceInfo.compare(0, SHA256_DIGEST_LENGTH, rootNodeDeviceInfo, 0, SHA256_DIGEST_LENGTH) != 0) {
562             LOGD("Skip the version:%" PRIu64, item.version);
563             continue;
564         }
565         errCode = MergeOneCommit(item);
566         if (errCode != E_OK) {
567             break;
568         }
569     }
570 
571     if (errCode != E_OK) {
572         (void)(RollbackSliceTransaction());
573         errCode = RollBackTransaction();
574     } else {
575         errCode = CommitSliceTransaction();
576         if (errCode == E_OK) {
577             errCode = CommitTransaction(rootCommitNode, true);
578         } else {
579             LOGE("Commit the slice transaction error, rollback the data transaction");
580             RollBackTransaction();
581         }
582     }
583     return errCode;
584 }
585 
GetDiffEntries(const CommitID & begin,const CommitID & end,MultiVerDiffData & data) const586 int MultiVerStorageExecutor::GetDiffEntries(const CommitID &begin, const CommitID &end, MultiVerDiffData &data) const
587 {
588     if ((commitStorage_ == nullptr) || (dataStorage_ == nullptr)) {
589         return -E_INVALID_DB;
590     }
591 
592     int errCode = E_OK;
593     Version verBegin;
594     if (begin.empty()) {
595         verBegin = 0;
596     } else {
597         IKvDBCommit *commitBegin = commitStorage_->GetCommit(begin, errCode);
598         if (commitBegin == nullptr) {
599             verBegin = 0;
600         } else {
601             verBegin = commitBegin->GetCommitVersion();
602         }
603         commitStorage_->ReleaseCommit(commitBegin);
604         commitBegin = nullptr;
605     }
606 
607     IKvDBCommit *commitEnd = commitStorage_->GetCommit(end, errCode);
608     if (commitEnd == nullptr) {
609         return CheckCorruptedStatus(errCode);
610     }
611 
612     Version verEnd = commitEnd->GetCommitVersion();
613     commitStorage_->ReleaseCommit(commitEnd);
614     commitEnd = nullptr;
615 
616     IKvDBMultiVerTransaction *transaction =
617         dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, verBegin, errCode);
618     if (transaction == nullptr) {
619         LOGE("Get diff data start read failed:%d", errCode);
620         return CheckCorruptedStatus(errCode);
621     }
622 
623     errCode = transaction->GetDiffEntries(verBegin, verEnd, data);
624     if (errCode != E_OK) {
625         LOGE("get diff entries failed:%d", errCode);
626         goto END;
627     }
628 
629     errCode = TransferDiffEntries(data);
630 END:
631     dataStorage_->ReleaseTransaction(transaction);
632     return CheckCorruptedStatus(errCode);
633 }
634 
Get(const Key & key,Value & value) const635 int MultiVerStorageExecutor::Get(const Key &key, Value &value) const
636 {
637     if (dataStorage_ == nullptr) {
638         return -E_INVALID_DB;
639     }
640     int errCode = E_OK;
641     auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, readVersion_, errCode);
642     if (transaction == nullptr) {
643         LOGE("Get read transaction failed:%d", errCode);
644         return CheckCorruptedStatus(errCode);
645     }
646 
647     Value rawValue;
648     errCode = transaction->Get(key, rawValue);
649 
650     dataStorage_->ReleaseTransaction(transaction);
651     if (errCode != E_OK) {
652         return CheckCorruptedStatus(errCode);
653     }
654 
655     return TransferToUserValue(rawValue, value);
656 }
657 
GetEntries(const Key & keyPrefix,std::vector<Entry> & entries) const658 int MultiVerStorageExecutor::GetEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
659 {
660     if (dataStorage_ == nullptr) {
661         return -E_INVALID_DB;
662     }
663     int errCode = E_OK;
664     auto transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, readVersion_, errCode);
665     if (transaction == nullptr) {
666         LOGE("Get read transaction failed:%d", errCode);
667         return CheckCorruptedStatus(errCode);
668     }
669 
670     errCode = transaction->GetEntries(keyPrefix, entries);
671 
672     dataStorage_->ReleaseTransaction(transaction);
673     if (errCode != E_OK) {
674         return CheckCorruptedStatus(errCode);
675     }
676 
677     for (auto &item : entries) {
678         Value userValue;
679         errCode = TransferToUserValue(item.value, userValue);
680         if (errCode != E_OK) {
681             entries.clear();
682             entries.shrink_to_fit();
683             break;
684         }
685         std::swap(userValue, item.value);
686     }
687 
688     return CheckCorruptedStatus(errCode);
689 }
690 
Put(const Key & key,const Value & value)691 int MultiVerStorageExecutor::Put(const Key &key, const Value &value)
692 {
693     if (transaction_ == nullptr) {
694         return -E_INVALID_DB;
695     }
696     Value savedValue;
697     int errCode = TransferToSavedValue(value, savedValue);
698     if (errCode != E_OK) {
699         return CheckCorruptedStatus(errCode);
700     }
701     errCode = transaction_->Put(key, savedValue);
702     return CheckCorruptedStatus(errCode);
703 }
704 
Delete(const Key & key)705 int MultiVerStorageExecutor::Delete(const Key &key)
706 {
707     if (transaction_ == nullptr) {
708         return -E_INVALID_DB;
709     }
710 
711     int errCode = transaction_->Delete(key);
712     return CheckCorruptedStatus(errCode);
713 }
714 
Clear()715 int MultiVerStorageExecutor::Clear()
716 {
717     if (transaction_ == nullptr) {
718         return -E_INVALID_DB;
719     }
720 
721     int errCode = transaction_->Clear();
722     return CheckCorruptedStatus(errCode);
723 }
724 
StartAllDbTransaction()725 int MultiVerStorageExecutor::StartAllDbTransaction()
726 {
727     if (dataStorage_ == nullptr || commitStorage_ == nullptr) {
728         return -E_INVALID_DB;
729     }
730 
731     IKvDBMultiVerTransaction *transaction = nullptr;
732     int errCode = dataStorage_->StartWrite(KvDataType::KV_DATA_SYNC_P2P, transaction);
733     if (transaction == nullptr) {
734         LOGE("start write transaction failed:%d", errCode);
735         return CheckCorruptedStatus(errCode);
736     }
737 
738     // start data storage transaction
739     Version maxVersion = static_cast<MultiVerNaturalStore *>(kvDB_)->GetMaxCommitVersion();
740     transaction->SetVersion(maxVersion);
741 
742     errCode = transaction->StartTransaction();
743     if (errCode != E_OK) {
744         LOGE("Start dataStorage transaction failed:%d", errCode);
745         goto END;
746     }
747 
748     // start commit history transaction
749     errCode = commitStorage_->StartVacuum();
750     if (errCode != E_OK) {
751         transaction->RollBackTransaction();
752         LOGE("Start commitStorage transaction failed:%d", errCode);
753         goto END;
754     }
755 
756     // start slice data transaction
757     errCode = StartSliceTransaction();
758     if (errCode != E_OK) {
759         transaction->RollBackTransaction();
760         commitStorage_->CancelVacuum();
761         LOGE("Start kvDataStorage transaction failed:%d", errCode);
762         goto END;
763     }
764     transaction_ = transaction;
765 END:
766     if (errCode != E_OK) {
767         dataStorage_->ReleaseTransaction(transaction);
768         transaction = nullptr;
769         transaction_ = nullptr;
770         return CheckCorruptedStatus(errCode);
771     }
772 
773     return errCode;
774 }
775 
StartTransaction(MultiTransactionType type)776 int MultiVerStorageExecutor::StartTransaction(MultiTransactionType type)
777 {
778     if (type == MultiTransactionType::ALL_DATA) {
779         return StartAllDbTransaction();
780     }
781 
782     if (dataStorage_ == nullptr) {
783         return -E_INVALID_DB;
784     }
785     IKvDBMultiVerTransaction *transaction = nullptr;
786     int errCode = dataStorage_->StartWrite(KvDataType::KV_DATA_SYNC_P2P, transaction);
787     if (transaction == nullptr) {
788         LOGE("start write transaction failed:%d", errCode);
789         return CheckCorruptedStatus(errCode);
790     }
791 
792     // Get the current max version, and the current version is max version + 1.
793     Version maxVersion = static_cast<MultiVerNaturalStore *>(kvDB_)->GetMaxCommitVersion();
794     transaction->SetVersion(++maxVersion);
795     errCode = transaction->StartTransaction();
796     if (errCode != E_OK) {
797         dataStorage_->ReleaseTransaction(transaction);
798         transaction = nullptr;
799         LOGE("Start transaction failed:%d", errCode);
800         return CheckCorruptedStatus(errCode);
801     }
802     transaction_ = transaction;
803     return E_OK;
804 }
805 
CommitAllDbTransaction()806 int MultiVerStorageExecutor::CommitAllDbTransaction()
807 {
808     if (dataStorage_ == nullptr || commitStorage_ == nullptr || transaction_ == nullptr) {
809         return -E_INVALID_DB;
810     }
811 
812     int errCode = transaction_->CommitTransaction();
813     if (errCode != E_OK) {
814         (void)(RollbackSliceTransaction());
815         commitStorage_->CancelVacuum();
816         LOGE("commit phase one failed:%d", errCode);
817         goto END;
818     }
819 
820     // start slice data transaction
821     errCode = CommitSliceTransaction();
822     if (errCode != E_OK) {
823         commitStorage_->CancelVacuum();
824         LOGE("Finish kvDataStorage transaction failed:%d", errCode);
825         goto END;
826     }
827 
828     // start commit history transaction
829     errCode = commitStorage_->FinishVacuum();
830     if (errCode != E_OK) {
831         LOGE("Finish commitStorage transaction failed:%d", errCode);
832         goto END;
833     }
834 
835 END:
836     dataStorage_->ReleaseTransaction(transaction_);
837     transaction_ = nullptr;
838 
839     return CheckCorruptedStatus(errCode);
840 }
841 
CommitTransaction(MultiTransactionType type)842 int MultiVerStorageExecutor::CommitTransaction(MultiTransactionType type)
843 {
844     if (type == MultiTransactionType::ALL_DATA) {
845         return CommitAllDbTransaction();
846     }
847 
848     if ((dataStorage_ == nullptr) || (transaction_ == nullptr)) {
849         return -E_INVALID_DB;
850     }
851     UpdateVerTimestamp multiVerTimestamp = {static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp(), true};
852     Version commitVersion;
853     CommitID commitId;
854     int errCode = E_OK;
855     bool isDataChanged = transaction_->IsDataChanged();
856     if (!isDataChanged) {
857         transaction_->RollBackTransaction();
858         goto END;
859     }
860 
861     errCode = dataStorage_->CommitWritePhaseOne(transaction_, multiVerTimestamp);
862     if (errCode != E_OK) {
863         LOGE("commit phase one failed:%d", errCode);
864         goto END;
865     }
866 
867     commitVersion = transaction_->GetVersion();
868     errCode = FillAndCommitLogEntry(commitVersion, commitId, multiVerTimestamp.timestamp);
869     if (errCode != E_OK) {
870         LOGE("rollback commit phase one failed:%d", errCode);
871         dataStorage_->RollbackWritePhaseOne(transaction_, commitVersion);
872         goto END;
873     }
874     LOGD("local commit version:%" PRIu64, commitVersion);
875     static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxTimestamp(multiVerTimestamp.timestamp);
876     dataStorage_->CommitWritePhaseTwo(transaction_);
877     static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxCommitVersion(commitVersion);
878 END:
879     dataStorage_->ReleaseTransaction(transaction_);
880     transaction_ = nullptr;
881     if (errCode == E_OK && isDataChanged) {
882         CommitNotifiedData(commitId);
883     }
884 
885     return CheckCorruptedStatus(errCode);
886 }
887 
RollBackAllDbTransaction()888 int MultiVerStorageExecutor::RollBackAllDbTransaction()
889 {
890     if ((dataStorage_ == nullptr) || (commitStorage_ == nullptr)) {
891         return -E_INVALID_DB;
892     }
893     int errCode = dataStorage_->RollbackWrite(transaction_);
894     if (errCode != E_OK) {
895         LOGE("Data storage rollback fail!");
896         (void)(commitStorage_->CancelVacuum());
897         (void)(RollbackSliceTransaction());
898         goto END;
899     }
900 
901     errCode = commitStorage_->CancelVacuum();
902     if (errCode != E_OK) {
903         LOGE("Commit storage rollback fail!");
904         (void)(RollbackSliceTransaction());
905         goto END;
906     }
907 
908     errCode = RollbackSliceTransaction();
909     if (errCode != E_OK) {
910         LOGE("Value slice rollback fail!");
911     }
912 
913 END:
914     dataStorage_->ReleaseTransaction(transaction_);
915     transaction_ = nullptr;
916     return CheckCorruptedStatus(errCode);
917 }
918 
RollBackTransaction(MultiTransactionType type)919 int MultiVerStorageExecutor::RollBackTransaction(MultiTransactionType type)
920 {
921     if (dataStorage_ == nullptr || transaction_ == nullptr) {
922         LOGE("invalid transaction for rollback");
923         return -E_INVALID_DB;
924     }
925 
926     if (type == MultiTransactionType::ALL_DATA) {
927         return RollBackAllDbTransaction();
928     }
929 
930     int errCode = dataStorage_->RollbackWrite(transaction_);
931     dataStorage_->ReleaseTransaction(transaction_);
932     transaction_ = nullptr;
933     return CheckCorruptedStatus(errCode);
934 }
935 
Close()936 void MultiVerStorageExecutor::Close()
937 {
938     MultiVerStorageExecutor *handle = this;
939 
940     MultiVerNaturalStore *multiVerNatureStore = static_cast<MultiVerNaturalStore *>(kvDB_);
941     if (multiVerNatureStore == nullptr) {
942         return;
943     }
944 
945     if (readVersion_ != 0) {
946         multiVerNatureStore->RemoveVersionConstraintFromList(readVersion_);
947         readVersion_ = 0;
948     }
949     multiVerNatureStore->ReleaseHandle(handle);
950 }
951 
InitCurrentReadVersion()952 int MultiVerStorageExecutor::InitCurrentReadVersion()
953 {
954     if (commitStorage_ == nullptr) {
955         return -E_INVALID_DB;
956     }
957     int errCode = E_OK;
958     CommitID commitId = commitStorage_->GetHeader(errCode);
959     if (errCode != E_OK) {
960         return CheckCorruptedStatus(errCode);
961     }
962 
963     Version version = 0;
964     // if no head, just use the initial version.
965     if (!commitId.empty()) {
966         IKvDBCommit *commit = commitStorage_->GetCommit(commitId, errCode);
967         if (commit == nullptr) {
968             LOGE("get the header commit failed:%d", errCode);
969             return CheckCorruptedStatus(errCode);
970         }
971 
972         version = commit->GetCommitVersion();
973         commitStorage_->ReleaseCommit(commit);
974         commit = nullptr;
975     }
976     readVersion_ = version;
977     return E_OK;
978 }
979 
TransferDiffEntries(MultiVerDiffData & data) const980 int MultiVerStorageExecutor::TransferDiffEntries(MultiVerDiffData &data) const
981 {
982     int errCode;
983     Value valueTmp;
984     for (auto &insertedItem : data.inserted) {
985         errCode = TransferToUserValue(insertedItem.value, valueTmp);
986         if (errCode != E_OK) {
987             return errCode;
988         }
989         std::swap(insertedItem.value, valueTmp);
990     }
991 
992     for (auto &updatedItem : data.updated) {
993         errCode = TransferToUserValue(updatedItem.value, valueTmp);
994         if (errCode != E_OK) {
995             return errCode;
996         }
997         std::swap(updatedItem.value, valueTmp);
998     }
999 
1000     for (auto &deletedItem : data.deleted) {
1001         errCode = TransferToUserValue(deletedItem.value, valueTmp);
1002         if (errCode != E_OK) {
1003             return errCode;
1004         }
1005         std::swap(deletedItem.value, valueTmp);
1006     }
1007 
1008     return E_OK;
1009 }
1010 
TransferToUserValue(const Value & savedValue,Value & value) const1011 int MultiVerStorageExecutor::TransferToUserValue(const Value &savedValue, Value &value) const
1012 {
1013     MultiVerValueObject valueObject;
1014     int errCode = valueObject.DeSerialData(savedValue);
1015     if (errCode != E_OK) {
1016         LOGE("Deserialize the multi ver saved value failed:%d", errCode);
1017         return errCode;
1018     }
1019     if (!valueObject.IsHash()) {
1020         return valueObject.GetValue(value);
1021     }
1022 
1023     std::vector<ValueSliceHash> sliceHashVect;
1024     errCode = valueObject.GetValueHash(sliceHashVect);
1025     if (errCode != E_OK) {
1026         return errCode;
1027     }
1028     value.clear();
1029     value.shrink_to_fit();
1030     for (const auto &item : sliceHashVect) {
1031         Value itemValue;
1032         errCode = GetValueSlice(item, itemValue);
1033         if (errCode != E_OK) {
1034             LOGE("Get hash entry error:%d", errCode);
1035             break;
1036         }
1037         value.insert(value.end(), itemValue.begin(), itemValue.end());
1038     }
1039 
1040     return errCode;
1041 }
1042 
TransferToValueObject(const Value & value,MultiVerValueObject & valueObject)1043 int MultiVerStorageExecutor::TransferToValueObject(const Value &value, MultiVerValueObject &valueObject)
1044 {
1045     MultiVerNaturalStoreTransferData splitData;
1046     std::vector<Value> partValues;
1047     // Segment data into blocks by fixed size
1048     // You can set Threshold and blocksize by SetSliceLengthThreshold, SetBlockSizeByte;
1049     int errCode = splitData.SegmentAndTransferValueToHash(value, partValues);
1050     if (errCode == E_OK) {
1051         valueObject.SetFlag(MultiVerValueObject::HASH_FLAG);
1052 
1053         // Tansfer blocks data to hash value list
1054         std::vector<ValueSliceHash> hashValues;
1055         ValueSliceHash hashValue;
1056         for (const auto &partValue : partValues) {
1057             if (DBCommon::CalcValueHash(partValue, hashValue) != E_OK) {
1058                 return -E_INTERNAL_ERROR;
1059             }
1060             // Put hash value into table
1061             errCode = PutValueSlice(hashValue, partValue, true);
1062             if (errCode != E_OK) {
1063                 return errCode;
1064             }
1065             hashValues.push_back(std::move(hashValue));
1066         }
1067 
1068         valueObject.SetValueHash(hashValues);
1069     } else {
1070         valueObject.SetFlag(0);
1071         valueObject.SetValue(value);
1072     }
1073     valueObject.SetDataLength(value.size());
1074     return E_OK;
1075 }
1076 
TransferToSavedValue(const Value & value,Value & savedValue)1077 int MultiVerStorageExecutor::TransferToSavedValue(const Value &value, Value &savedValue)
1078 {
1079     MultiVerValueObject valueObject;
1080     int errCode = TransferToValueObject(value, valueObject);
1081     if (errCode != E_OK) {
1082         LOGE("Failed to get the serialize data of value object:%d", errCode);
1083         return errCode;
1084     }
1085 
1086     errCode = valueObject.GetSerialData(savedValue);
1087     if (errCode != E_OK) {
1088         LOGE("failed to get the serialize data of savedValue:%d", errCode);
1089         return errCode;
1090     }
1091 
1092     return E_OK;
1093 }
1094 
GetResolvedConflictEntries(const MultiVerCommitNode & commitItem,std::vector<MultiVerKvEntry * > & entries) const1095 int MultiVerStorageExecutor::GetResolvedConflictEntries(const MultiVerCommitNode &commitItem,
1096     std::vector<MultiVerKvEntry *> &entries) const
1097 {
1098     if (commitStorage_ == nullptr) {
1099         return -E_INVALID_DB;
1100     }
1101     int errCode = E_OK;
1102     auto commit = commitStorage_->GetCommit(commitItem.commitId, errCode);
1103     if (commit == nullptr) {
1104         LOGE("failed to get the commit in merge:%d", errCode);
1105         return errCode;
1106     }
1107     entries.clear();
1108     entries.shrink_to_fit();
1109     Version version = commit->GetCommitVersion();
1110     LOGD("Version is %" PRIu64, version);
1111     if (transaction_ != nullptr) {
1112         errCode = transaction_->GetEntriesByVersion(version, entries);
1113         if (errCode != E_OK) {
1114             LOGE("failed to get the entries by version:%d", errCode);
1115         }
1116     }
1117     commitStorage_->ReleaseCommit(commit);
1118     return errCode;
1119 }
1120 
CommitNotifiedData(const CommitID & commitId)1121 void MultiVerStorageExecutor::CommitNotifiedData(const CommitID &commitId)
1122 {
1123     CommitID startId;
1124     Version currentVersion = 0;
1125     int errCode = GetParentCommitId(commitId, startId, currentVersion);
1126     if (errCode != E_OK || currentVersion == 0) { // make sure that the version - 1 is valid.
1127         LOGE("Notify: get the parent commit failed:%d", errCode);
1128         return;
1129     }
1130     MultiVerNaturalStoreCommitNotifyData *committedData =
1131         new (std::nothrow) MultiVerNaturalStoreCommitNotifyData(
1132         static_cast<MultiVerNaturalStore *>(kvDB_), startId, commitId, currentVersion - 1);
1133     if (committedData != nullptr) {
1134         static_cast<MultiVerNaturalStore *>(kvDB_)->AddVersionConstraintToList(currentVersion - 1);
1135         static_cast<MultiVerNaturalStore *>(kvDB_)->CommitNotify(NATURAL_STORE_COMMIT_EVENT, committedData);
1136         committedData->DecObjRef(committedData);
1137         committedData = nullptr;
1138     } else {
1139         LOGE("Failed to do commit notify because of OOM.");
1140     }
1141 }
1142 
GetParentCommitId(const CommitID & commitId,CommitID & parentId,Version & curVersion) const1143 int MultiVerStorageExecutor::GetParentCommitId(const CommitID &commitId, CommitID &parentId, Version &curVersion) const
1144 {
1145     if (commitStorage_ == nullptr) {
1146         return -E_INVALID_DB;
1147     }
1148     int errCode = E_OK;
1149     IKvDBCommit *commit = commitStorage_->GetCommit(commitId, errCode);
1150     if (commit == nullptr) {
1151         LOGE("Get commit failed while getting the parent id:%d", errCode);
1152         return CheckCorruptedStatus(errCode);
1153     }
1154 
1155     parentId = commit->GetLeftParentId();
1156     curVersion = commit->GetCommitVersion();
1157     commitStorage_->ReleaseCommit(commit);
1158     commit = nullptr;
1159     return E_OK;
1160 }
1161 
AllocNewCommitId(CommitID & commitId) const1162 int MultiVerStorageExecutor::AllocNewCommitId(CommitID &commitId) const
1163 {
1164     // Only for allocate for temporary.
1165     commitId.resize(COMMIT_ID_LENGTH);
1166     RAND_bytes(commitId.data(), COMMIT_ID_LENGTH);
1167     return E_OK;
1168 }
1169 
FillAndCommitLogEntry(const Version & versionInfo,CommitID & commitId,uint64_t timestamp) const1170 int MultiVerStorageExecutor::FillAndCommitLogEntry(const Version &versionInfo, CommitID &commitId,
1171     uint64_t timestamp) const
1172 {
1173     if (kvDB_ == nullptr || commitStorage_ == nullptr) {
1174         return -E_INVALID_DB;
1175     }
1176     // Get the commit id.
1177     int errCode = E_OK;
1178     IKvDBCommit *commit = commitStorage_->AllocCommit(errCode);
1179     if (commit == nullptr) {
1180         LOGE("Failed to alloc the commit locally:%d", errCode);
1181         return errCode;
1182     }
1183 
1184     (void)(AllocNewCommitId(commitId));
1185     std::vector<uint8_t> vectTag;
1186     static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTag(vectTag);
1187     std::string strTag(vectTag.begin(), vectTag.end());
1188 
1189     // Get the commit struct.
1190     CommitID header = commitStorage_->GetHeader(errCode);
1191     if (errCode != E_OK) {
1192         goto END;
1193     }
1194 
1195     commit->SetLeftParentId(header);
1196     commit->SetCommitId(commitId);
1197     commit->SetCommitVersion(versionInfo);
1198     commit->SetLocalFlag(true);
1199     commit->SetTimestamp(timestamp);
1200     commit->SetDeviceInfo(strTag);
1201 
1202     // write the commit history.
1203     errCode = commitStorage_->AddCommit(*commit, true);
1204     if (errCode != E_OK) {
1205         LOGE("Add commit history failed:%d", errCode);
1206     }
1207 
1208 END:
1209     if (commit != nullptr) {
1210         commitStorage_->ReleaseCommit(commit);
1211         commit = nullptr;
1212     }
1213     return errCode;
1214 }
1215 
FillCommitByForeign(IKvDBCommit * commit,const MultiVerCommitNode & multiVerCommit,const Version & versionInfo,const CommitID & commitId,bool isMerge) const1216 int MultiVerStorageExecutor::FillCommitByForeign(IKvDBCommit *commit,
1217     const MultiVerCommitNode &multiVerCommit, const Version &versionInfo, const CommitID &commitId, bool isMerge) const
1218 {
1219     if (isMerge) {
1220         if (commitStorage_ == nullptr || kvDB_ == nullptr) {
1221             return -E_INVALID_DB;
1222         }
1223 
1224         int errCode = E_OK;
1225         CommitID header = commitStorage_->GetHeader(errCode);
1226         if (errCode != E_OK) {
1227             return errCode;
1228         }
1229         std::vector<uint8_t> vectTag;
1230         static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTag(vectTag);
1231         std::string strTag(vectTag.begin(), vectTag.end());
1232 
1233         commit->SetCommitId(commitId);
1234         commit->SetLeftParentId(header);
1235         commit->SetRightParentId(multiVerCommit.commitId);
1236         commit->SetLocalFlag(true);
1237         Timestamp timestamp = static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp();
1238         commit->SetTimestamp(timestamp);
1239         commit->SetDeviceInfo(strTag);
1240     } else {
1241         commit->SetCommitId(multiVerCommit.commitId);
1242         commit->SetLeftParentId(multiVerCommit.leftParent);
1243         commit->SetRightParentId(multiVerCommit.rightParent);
1244         commit->SetTimestamp(multiVerCommit.timestamp);
1245         commit->SetLocalFlag(false);
1246         commit->SetDeviceInfo(multiVerCommit.deviceInfo);
1247     }
1248 
1249     commit->SetCommitVersion(versionInfo);
1250     return E_OK;
1251 }
1252 
FillAndCommitLogEntry(const Version & versionInfo,const MultiVerCommitNode & multiVerCommit,CommitID & commitId,bool isMerge,Timestamp & timestamp) const1253 int MultiVerStorageExecutor::FillAndCommitLogEntry(const Version &versionInfo,
1254     const MultiVerCommitNode &multiVerCommit, CommitID &commitId, bool isMerge, Timestamp &timestamp) const
1255 {
1256     if (commitStorage_ == nullptr) {
1257         return -E_INVALID_DB;
1258     }
1259     int errCode = E_OK;
1260     IKvDBCommit *commit = commitStorage_->AllocCommit(errCode);
1261     if (commit == nullptr) {
1262         return errCode;
1263     }
1264 
1265     if (isMerge) {
1266         (void)(AllocNewCommitId(commitId));
1267     }
1268 
1269     errCode = FillCommitByForeign(commit, multiVerCommit, versionInfo, commitId, isMerge);
1270     if (errCode != E_OK) {
1271         LOGE("Failed to fill the sync commit:%d", errCode);
1272         goto END;
1273     }
1274 
1275     timestamp = isMerge ? static_cast<MultiVerNaturalStore *>(kvDB_)->GetCurrentTimestamp() : multiVerCommit.timestamp;
1276     commit->SetTimestamp(timestamp);
1277 
1278     // write the commit history.
1279     errCode = commitStorage_->AddCommit(*commit, isMerge);
1280     if (errCode != E_OK) {
1281         LOGE("Add commit history failed:%d", errCode);
1282     }
1283 END:
1284     if (commit != nullptr) {
1285         commitStorage_->ReleaseCommit(commit);
1286         commit = nullptr;
1287     }
1288 
1289     return errCode;
1290 }
1291 
CommitTransaction(const MultiVerCommitNode & multiVerCommit,bool isMerge)1292 int MultiVerStorageExecutor::CommitTransaction(const MultiVerCommitNode &multiVerCommit, bool isMerge)
1293 {
1294     if ((transaction_ == nullptr) || (dataStorage_ == nullptr)) {
1295         LOGE("invalid transaction for commit");
1296         return -E_INVALID_DB;
1297     }
1298 
1299     Version commitVersion;
1300     CommitID commitId;
1301     UpdateVerTimestamp multiVerTimestamp = {0ULL, false};
1302     bool isDataChanged = transaction_->IsDataChanged();
1303 
1304     int errCode = dataStorage_->CommitWritePhaseOne(transaction_, multiVerTimestamp);
1305     if (errCode != E_OK) {
1306         LOGE("commit phase one failed:%d", errCode);
1307         goto END;
1308     }
1309 
1310     commitVersion = transaction_->GetVersion();
1311     errCode = FillAndCommitLogEntry(commitVersion, multiVerCommit, commitId, isMerge, multiVerTimestamp.timestamp);
1312     if (errCode != E_OK) {
1313         LOGE("rollback commit phase one failed:%d", errCode);
1314         dataStorage_->RollbackWritePhaseOne(transaction_, commitVersion);
1315         goto END;
1316     }
1317 
1318     dataStorage_->CommitWritePhaseTwo(transaction_);
1319     static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxTimestamp(multiVerTimestamp.timestamp);
1320     static_cast<MultiVerNaturalStore *>(kvDB_)->SetMaxCommitVersion(commitVersion);
1321     LOGD("sync commit version:%" PRIu64, commitVersion);
1322 END:
1323     dataStorage_->ReleaseTransaction(transaction_);
1324     transaction_ = nullptr;
1325 
1326     if (errCode == E_OK && isMerge && isDataChanged) {
1327         CommitNotifiedData(commitId);
1328     }
1329 
1330     return CheckCorruptedStatus(errCode);
1331 }
1332 
ReleaseMultiVerKvEntries(std::vector<MultiVerKvEntry * > & entries)1333 void MultiVerStorageExecutor::ReleaseMultiVerKvEntries(std::vector<MultiVerKvEntry *> &entries)
1334 {
1335     for (auto &item : entries) {
1336         if (item != nullptr) {
1337             delete item;
1338             item = nullptr;
1339         }
1340     }
1341     entries.clear();
1342     entries.shrink_to_fit();
1343 }
1344 
GetCurrentReadVersion() const1345 Version MultiVerStorageExecutor::GetCurrentReadVersion() const
1346 {
1347     return readVersion_;
1348 }
1349 
GetAllCommitsInTree(std::list<MultiVerCommitNode> & commits) const1350 int MultiVerStorageExecutor::GetAllCommitsInTree(std::list<MultiVerCommitNode> &commits) const
1351 {
1352     if (commitStorage_ == nullptr) {
1353         return -E_INVALID_DB;
1354     }
1355 
1356     return commitStorage_->GetAllCommitsInTree(commits);
1357 }
1358 
GetEntriesByVersion(Version version,std::list<MultiVerTrimedVersionData> & data) const1359 int MultiVerStorageExecutor::GetEntriesByVersion(Version version, std::list<MultiVerTrimedVersionData> &data) const
1360 {
1361     if (dataStorage_ == nullptr) {
1362         return -E_INVALID_DB;
1363     }
1364 
1365     int errCode = E_OK;
1366     IKvDBMultiVerTransaction *transaction = nullptr;
1367     if (transaction_ == nullptr) {
1368         transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
1369         if (transaction == nullptr) {
1370             LOGE("Failed to get the transaction:%d", errCode);
1371             goto END;
1372         }
1373     } else {
1374         transaction = transaction_;
1375     }
1376 
1377     // Note that the transaction fails and the parameters are empty.
1378     errCode = transaction->GetEntriesByVersion(version, data);
1379 END:
1380     if (transaction != transaction_) {
1381         dataStorage_->ReleaseTransaction(transaction);
1382         transaction = nullptr;
1383     }
1384     return CheckCorruptedStatus(errCode);
1385 }
1386 
GetOverwrittenClearTypeEntries(Version clearVersion,std::list<MultiVerTrimedVersionData> & data) const1387 int MultiVerStorageExecutor::GetOverwrittenClearTypeEntries(Version clearVersion,
1388     std::list<MultiVerTrimedVersionData> &data) const
1389 {
1390     if (dataStorage_ == nullptr) {
1391         return -E_INVALID_DB;
1392     }
1393 
1394     int errCode = E_OK;
1395     IKvDBMultiVerTransaction *transaction = nullptr;
1396     if (transaction_ == nullptr) {
1397         transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, clearVersion, errCode);
1398         if (transaction == nullptr) {
1399             LOGE("Failed to get the transaction:%d", errCode);
1400             goto END;
1401         }
1402     } else {
1403         transaction = transaction_;
1404     }
1405 
1406     errCode = transaction->GetOverwrittenClearTypeEntries(clearVersion, data);
1407 END:
1408     if (transaction != transaction_) {
1409         dataStorage_->ReleaseTransaction(transaction);
1410         transaction = nullptr;
1411     }
1412 
1413     return CheckCorruptedStatus(errCode);
1414 }
1415 
GetOverwrittenNonClearTypeEntries(Version version,const Key & hashKey,std::list<MultiVerTrimedVersionData> & data) const1416 int MultiVerStorageExecutor::GetOverwrittenNonClearTypeEntries(Version version, const Key &hashKey,
1417     std::list<MultiVerTrimedVersionData> &data) const
1418 {
1419     if (dataStorage_ == nullptr) {
1420         return -E_INVALID_DB;
1421     }
1422 
1423     int errCode = E_OK;
1424     IKvDBMultiVerTransaction *transaction = nullptr;
1425     if (transaction_ == nullptr) {
1426         transaction = dataStorage_->StartRead(KvDataType::KV_DATA_SYNC_P2P, version, errCode);
1427         if (transaction == nullptr) {
1428             LOGE("Failed to get the transaction:%d", errCode);
1429             goto END;
1430         }
1431     } else {
1432         transaction = transaction_;
1433     }
1434 
1435     errCode = transaction->GetOverwrittenNonClearTypeEntries(version, hashKey, data);
1436 END:
1437     if (transaction != transaction_) {
1438         dataStorage_->ReleaseTransaction(transaction);
1439         transaction = nullptr;
1440     }
1441 
1442     return CheckCorruptedStatus(errCode);
1443 }
1444 
DeleteEntriesByHashKey(Version version,const Key & hashKey)1445 int MultiVerStorageExecutor::DeleteEntriesByHashKey(Version version, const Key &hashKey)
1446 {
1447     if (transaction_ == nullptr) {
1448         LOGI("You need start transaction before this operation!");
1449         return -E_NOT_PERMIT;
1450     }
1451 
1452     Value savedValue;
1453     int errCode = transaction_->GetValueForTrimSlice(hashKey, version, savedValue);
1454     if (errCode != E_OK) {
1455         return CheckCorruptedStatus(errCode);
1456     }
1457 
1458     errCode = transaction_->DeleteEntriesByHashKey(version, hashKey);
1459     if (errCode != E_OK) {
1460         return CheckCorruptedStatus(errCode);
1461     }
1462 
1463     MultiVerValueObject valueObject;
1464     errCode = valueObject.DeSerialData(savedValue);
1465     // savedValue empty is del or clear record
1466     if (!valueObject.IsHash() || savedValue.empty()) {
1467         return E_OK;
1468     }
1469     if (errCode != E_OK) {
1470         return errCode;
1471     }
1472 
1473     std::vector<ValueSliceHash> sliceHashVect;
1474     errCode = valueObject.GetValueHash(sliceHashVect);
1475     if (errCode != E_OK) {
1476         return errCode;
1477     }
1478 
1479     for (const auto &item : sliceHashVect) {
1480         errCode = DeleteValueSliceInner(sliceTransaction_, item);
1481         if (errCode != E_OK) {
1482             LOGI("Value slice delete fail!");
1483             break;
1484         }
1485     }
1486 
1487     return CheckCorruptedStatus(errCode);
1488 }
1489 
UpdateTrimedFlag(Version version,const Key & hashKey)1490 int MultiVerStorageExecutor::UpdateTrimedFlag(Version version, const Key &hashKey)
1491 {
1492     (void)version;
1493     (void)hashKey;
1494     return E_OK;
1495 }
1496 
UpdateTrimedFlag(const CommitID & commit)1497 int MultiVerStorageExecutor::UpdateTrimedFlag(const CommitID &commit)
1498 {
1499     (void)commit;
1500     return E_OK;
1501 }
1502 } // namespace DistributedDB
1503 #endif
1504