1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sqlite_cloud_kv_store.h"
16 
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "query_utils.h"
22 #include "runtime_context.h"
23 #include "sqlite_cloud_kv_executor_utils.h"
24 #include "sqlite_single_ver_continue_token.h"
25 
26 namespace DistributedDB {
SqliteCloudKvStore(KvStorageHandle * handle)27 SqliteCloudKvStore::SqliteCloudKvStore(KvStorageHandle *handle)
28     : storageHandle_(handle), transactionHandle_(nullptr)
29 {
30 }
31 
GetMetaData(const Key & key,Value & value) const32 int SqliteCloudKvStore::GetMetaData(const Key &key, Value &value) const
33 {
34     return storageHandle_->GetMetaData(key, value);
35 }
36 
PutMetaData(const Key & key,const Value & value)37 int SqliteCloudKvStore::PutMetaData(const Key &key, const Value &value)
38 {
39     return storageHandle_->PutMetaData(key, value, false);
40 }
41 
ChkSchema(const TableName & tableName)42 int SqliteCloudKvStore::ChkSchema(const TableName &tableName)
43 {
44     return E_OK;
45 }
46 
SetCloudDbSchema(const DataBaseSchema & schema)47 int SqliteCloudKvStore::SetCloudDbSchema(const DataBaseSchema &schema)
48 {
49     return E_OK;
50 }
51 
SetCloudDbSchema(const std::map<std::string,DataBaseSchema> & schema)52 int SqliteCloudKvStore::SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema)
53 {
54     std::lock_guard<std::mutex> autoLock(schemaMutex_);
55     if (!CheckSchema(schema)) {
56         return -E_INVALID_SCHEMA;
57     }
58     schema_ = schema;
59     return E_OK;
60 }
61 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)62 int SqliteCloudKvStore::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
63 {
64     std::lock_guard<std::mutex> autoLock(schemaMutex_);
65     cloudSchema = std::make_shared<DataBaseSchema>(schema_[user_]);
66     return E_OK;
67 }
68 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)69 int SqliteCloudKvStore::GetCloudTableSchema(const TableName &tableName,
70     TableSchema &tableSchema)
71 {
72     std::lock_guard<std::mutex> autoLock(schemaMutex_);
73     if (schema_.find(user_) == schema_.end()) {
74         LOGE("[SqliteCloudKvStore] not set cloud schema");
75         return -E_SCHEMA_MISMATCH;
76     }
77     auto it = std::find_if(schema_[user_].tables.begin(), schema_[user_].tables.end(), [&](const auto &table) {
78         return table.name == tableName;
79     });
80     if (it != schema_[user_].tables.end()) {
81         tableSchema = *it;
82         return E_OK;
83     }
84     LOGW("[SqliteCloudKvStore] not found table schema");
85     return -E_NOT_FOUND;
86 }
87 
StartTransaction(TransactType type)88 int SqliteCloudKvStore::StartTransaction(TransactType type)
89 {
90     {
91         std::lock_guard<std::mutex> autoLock(transactionMutex_);
92         if (transactionHandle_ != nullptr) {
93             LOGW("[SqliteCloudKvStore] transaction has been started");
94             return E_OK;
95         }
96     }
97     auto [errCode, handle] = storageHandle_->GetStorageExecutor(type == TransactType::IMMEDIATE);
98     if (errCode != E_OK) {
99         return errCode;
100     }
101     if (handle == nullptr) {
102         LOGE("[SqliteCloudKvStore] get handle return null");
103         return -E_INTERNAL_ERROR;
104     }
105     errCode = handle->StartTransaction(type);
106     std::lock_guard<std::mutex> autoLock(transactionMutex_);
107     transactionHandle_ = handle;
108     LOGD("[SqliteCloudKvStore] start transaction!");
109     return errCode;
110 }
111 
Commit()112 int SqliteCloudKvStore::Commit()
113 {
114     SQLiteSingleVerStorageExecutor *handle;
115     {
116         std::lock_guard<std::mutex> autoLock(transactionMutex_);
117         if (transactionHandle_ == nullptr) {
118             LOGW("[SqliteCloudKvStore] no need to commit, transaction has not been started");
119             return E_OK;
120         }
121         handle = transactionHandle_;
122         transactionHandle_ = nullptr;
123     }
124     int errCode = handle->Commit();
125     storageHandle_->RecycleStorageExecutor(handle);
126     LOGD("[SqliteCloudKvStore] commit transaction!");
127     return errCode;
128 }
129 
Rollback()130 int SqliteCloudKvStore::Rollback()
131 {
132     SQLiteSingleVerStorageExecutor *handle;
133     {
134         std::lock_guard<std::mutex> autoLock(transactionMutex_);
135         if (transactionHandle_ == nullptr) {
136             LOGW("[SqliteCloudKvStore] no need to rollback, transaction has not been started");
137             return E_OK;
138         }
139         handle = transactionHandle_;
140         transactionHandle_ = nullptr;
141     }
142     int errCode = handle->Rollback();
143     storageHandle_->RecycleStorageExecutor(handle);
144     LOGD("[SqliteCloudKvStore] rollback transaction!");
145     return errCode;
146 }
147 
148 int SqliteCloudKvStore::GetUploadCount([[gnu::unused]] const QuerySyncObject &query,
149     const Timestamp &timestamp, bool isCloudForcePush, [[gnu::unused]] bool isCompensatedTask,
150     int64_t &count)
151 {
152     auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
153     if (db == nullptr) {
154         LOGE("[SqliteCloudKvStore] get upload count without transaction");
155         return -E_INTERNAL_ERROR;
156     }
157     int errCode = E_OK;
158     std::tie(errCode, count) = SqliteCloudKvExecutorUtils::CountCloudData(db, isMemory, timestamp, user_,
159         isCloudForcePush);
160     return errCode;
161 }
162 
163 int SqliteCloudKvStore::GetAllUploadCount(const QuerySyncObject &query,
164     const std::vector<Timestamp> &timestampVec, bool isCloudForcePush, [[gnu::unused]] bool isCompensatedTask,
165     int64_t &count)
166 {
167     auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
168     if (db == nullptr) {
169         LOGE("[SqliteCloudKvStore] get upload count without transaction");
170         return -E_INTERNAL_ERROR;
171     }
172     int errCode = E_OK;
173     QuerySyncObject queryObj = query;
174     std::tie(errCode, count) = SqliteCloudKvExecutorUtils::CountAllCloudData({ db, isMemory }, timestampVec, user_,
175         isCloudForcePush, queryObj);
176     return errCode;
177 }
178 
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & object,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)179 int SqliteCloudKvStore::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object,
180     const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
181 {
182     SyncTimeRange timeRange;
183     // memory db use watermark
184     timeRange.beginTime = GetTransactionDbHandleAndMemoryStatus().second ? beginTime : 0;
185     auto token = new (std::nothrow) SQLiteSingleVerContinueToken(timeRange, object);
186     if (token == nullptr) {
187         LOGE("[SqliteCloudKvStore] create token failed");
188         return -E_OUT_OF_MEMORY;
189     }
190     token->SetUser(user_);
191     recorder_.SetUser(user_);
192     cloudDataResult.tableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
193     continueStmtToken = static_cast<ContinueToken>(token);
194     return GetCloudDataNext(continueStmtToken, cloudDataResult);
195 }
196 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)197 int SqliteCloudKvStore::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
198 {
199     if (continueStmtToken == nullptr) {
200         LOGE("[SqliteCloudKvStore] token is null");
201         return -E_INVALID_ARGS;
202     }
203     auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
204     if (!token->CheckValid()) {
205         LOGE("[SqliteCloudKvStore] token is invalid");
206         return -E_INVALID_ARGS;
207     }
208     auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
209     if (db == nullptr) {
210         LOGE("[SqliteCloudKvStore] the transaction has not been started, release the token");
211         ReleaseCloudDataToken(continueStmtToken);
212         return -E_INTERNAL_ERROR;
213     }
214     int errCode = SqliteCloudKvExecutorUtils::GetCloudData(GetCloudSyncConfig(), {db, isMemory}, recorder_, *token,
215         cloudDataResult);
216     if (errCode != -E_UNFINISHED) {
217         ReleaseCloudDataToken(continueStmtToken);
218     } else {
219         continueStmtToken = token;
220     }
221     return errCode;
222 }
223 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)224 int SqliteCloudKvStore::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
225 {
226     if (continueStmtToken == nullptr) {
227         return E_OK;
228     }
229     auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
230     if (!token->CheckValid()) {
231         return E_OK;
232     }
233     token->ReleaseCloudQueryStmt();
234     delete token;
235     continueStmtToken = nullptr;
236     return E_OK;
237 }
238 
239 int SqliteCloudKvStore::GetInfoByPrimaryKeyOrGid([[gnu::unused]] const std::string &tableName, const VBucket &vBucket,
240     DataInfoWithLog &dataInfoWithLog, [[gnu::unused]] VBucket &assetInfo)
241 {
242     auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
243     if (db == nullptr) {
244         LOGE("[SqliteCloudKvStore] the transaction has not been started");
245         return -E_INTERNAL_ERROR;
246     }
247     int errCode = E_OK;
248     std::tie(errCode, dataInfoWithLog) = SqliteCloudKvExecutorUtils::GetLogInfo(db, isMemory, vBucket, user_);
249     return errCode;
250 }
251 
252 int SqliteCloudKvStore::PutCloudSyncData([[gnu::unused]] const std::string &tableName, DownloadData &downloadData)
253 {
254     auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
255     if (db == nullptr) {
256         LOGE("[SqliteCloudKvStore] the transaction has not been started");
257         return -E_INTERNAL_ERROR;
258     }
259     downloadData.timeOffset = storageHandle_->GetLocalTimeOffsetForCloud();
260     return SqliteCloudKvExecutorUtils::PutCloudData(db, isMemory, downloadData);
261 }
262 
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)263 int SqliteCloudKvStore::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset,
264     bool ignoreEmptyGid)
265 {
266     auto [errCode, handle] = storageHandle_->GetStorageExecutor(true);
267     if (errCode != E_OK) {
268         LOGE("[SqliteCloudKvStore] get handle failed %d when fill log", errCode);
269         return errCode;
270     }
271     if (handle->IsMemory()) {
272         errCode = Commit();
273         if (errCode != E_OK) {
274             LOGE("[SqliteCloudKvStore] commit failed %d before fill log", errCode);
275             storageHandle_->RecycleStorageExecutor(handle);
276             return errCode;
277         }
278     }
279     sqlite3 *db = nullptr;
280     (void)handle->GetDbHandle(db);
281     errCode = SqliteCloudKvExecutorUtils::FillCloudLog({db, ignoreEmptyGid}, opType, data, user_, recorder_);
282     int ret = E_OK;
283     if (handle->IsMemory()) {
284         ret = StartTransaction(TransactType::DEFERRED);
285         if (ret != E_OK) {
286             LOGE("[SqliteCloudKvStore] restart transaction failed %d", ret);
287         }
288     }
289     storageHandle_->RecycleStorageExecutor(handle);
290     return errCode == E_OK ? ret : errCode;
291 }
292 
FilterCloudVersionPrefixKey(std::vector<std::vector<Type>> & changeValList)293 void SqliteCloudKvStore::FilterCloudVersionPrefixKey(std::vector<std::vector<Type>> &changeValList)
294 {
295     changeValList.erase(std::remove_if(changeValList.begin(), changeValList.end(),
296         [&](const std::vector<Type> &existPkVal) {
297             bool isFilter = false;
298             for (auto type : existPkVal) {
299                 std::string prefixKey;
300                 int errCode = CloudStorageUtils::GetValueFromOneField(type, prefixKey);
301                 if (errCode != E_OK) {
302                     LOGE("[SqliteCloudKvStore] can not get key from changedData, %d", errCode);
303                     break;
304                 }
305                 isFilter = !prefixKey.empty() && prefixKey.find(CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY) == 0;
306                 if (isFilter) {
307                     break;
308                 }
309             }
310             return isFilter;
311         }), changeValList.end());
312 }
313 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)314 void SqliteCloudKvStore::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
315     bool isChangedData)
316 {
317     {
318         std::lock_guard<std::mutex> autoLock(observerMapMutex_);
319         if (cloudObserverMap_.empty()) {
320             return;
321         }
322     }
323     for (auto &changeValList : changedData.primaryData) {
324         FilterCloudVersionPrefixKey(changeValList);
325     }
326     RefObject::IncObjRef(this);
327     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, deviceName, changedData, isChangedData]() {
328         {
329             std::lock_guard<std::mutex> autoLock(observerMapMutex_);
330             for (const auto &item : cloudObserverMap_) {
331                 ChangedData observerChangeData = changedData;
332                 item.second(deviceName, std::move(observerChangeData), isChangedData);
333             }
334         }
335         RefObject::DecObjRef(this);
336     });
337     if (errCode != E_OK) {
338         LOGW("[SqliteCloudKvStore] Trigger observer action failed %d", errCode);
339         RefObject::DecObjRef(this);
340     }
341 }
342 
GetIdentify() const343 std::string SqliteCloudKvStore::GetIdentify() const
344 {
345     return "";
346 }
347 
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)348 int SqliteCloudKvStore::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
349     bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
350 {
351     auto[errCode, handle] = storageHandle_->GetStorageExecutor(false);
352     if (errCode != E_OK) {
353         LOGE("[SqliteCloudKvStore] get handle failed %d", errCode);
354         return errCode;
355     }
356     sqlite3 *db = nullptr;
357     (void)handle->GetDbHandle(db);
358     QuerySyncObject query = querySyncObject;
359     errCode = SqliteCloudKvExecutorUtils::QueryCloudGid(db, handle->IsMemory(), user_, query, cloudGid);
360     storageHandle_->RecycleStorageExecutor(handle);
361     if (errCode != E_OK) {
362         LOGE("[SqliteCloudKvStore] Query cloud gid failed %d", errCode);
363     }
364     return errCode;
365 }
366 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)367 int SqliteCloudKvStore::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
368 {
369     return E_OK;
370 }
371 
SetLogTriggerStatus(bool status)372 int SqliteCloudKvStore::SetLogTriggerStatus(bool status)
373 {
374     return E_OK;
375 }
376 
SetCursorIncFlag(bool status)377 int SqliteCloudKvStore::SetCursorIncFlag(bool status)
378 {
379     return E_OK;
380 }
381 
CheckQueryValid(const QuerySyncObject & query)382 int SqliteCloudKvStore::CheckQueryValid(const QuerySyncObject &query)
383 {
384     return E_OK;
385 }
386 
IsSharedTable(const std::string & tableName)387 bool SqliteCloudKvStore::IsSharedTable(const std::string &tableName)
388 {
389     return false;
390 }
391 
SetUser(const std::string & user)392 void SqliteCloudKvStore::SetUser(const std::string &user)
393 {
394     user_ = user;
395 }
396 
GetTransactionDbHandleAndMemoryStatus()397 std::pair<sqlite3 *, bool> SqliteCloudKvStore::GetTransactionDbHandleAndMemoryStatus()
398 {
399     std::lock_guard<std::mutex> autoLock(transactionMutex_);
400     if (transactionHandle_ == nullptr) {
401         return {nullptr, false};
402     }
403     sqlite3 *db = nullptr;
404     (void)transactionHandle_->GetDbHandle(db);
405     return {db, transactionHandle_->IsMemory()};
406 }
407 
RegisterObserverAction(const KvStoreObserver * observer,const ObserverAction & action)408 void SqliteCloudKvStore::RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action)
409 {
410     std::lock_guard<std::mutex> autoLock(observerMapMutex_);
411     cloudObserverMap_[observer] = action;
412 }
413 
UnRegisterObserverAction(const KvStoreObserver * observer)414 void SqliteCloudKvStore::UnRegisterObserverAction(const KvStoreObserver *observer)
415 {
416     std::lock_guard<std::mutex> autoLock(observerMapMutex_);
417     cloudObserverMap_.erase(observer);
418 }
419 
GetCloudVersion(const std::string & device,std::map<std::string,std::string> & versionMap)420 int SqliteCloudKvStore::GetCloudVersion(const std::string &device, std::map<std::string, std::string> &versionMap)
421 {
422     auto[errCode, handle] = storageHandle_->GetStorageExecutor(false);
423     if (errCode != E_OK) {
424         LOGE("[SqliteCloudKvStore] get handle failed %d", errCode);
425         return errCode;
426     }
427     sqlite3 *db = nullptr;
428     (void)handle->GetDbHandle(db);
429     std::vector<VBucket> dataVector = {};
430     errCode = SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(db, handle->IsMemory(), user_, device, dataVector);
431     storageHandle_->RecycleStorageExecutor(handle);
432     if (errCode != E_OK) {
433         LOGE("[SqliteCloudKvStore] get cloud version record failed %d", errCode);
434         return errCode;
435     }
436     for (VBucket &data : dataVector) {
437         auto [errCodeNext, dataItem] = CloudStorageUtils::GetDataItemFromCloudVersionData(data);
438         if (errCodeNext != E_OK) {
439             LOGE("[SqliteCloudKvStore] get dataItem failed %d", errCodeNext);
440             return errCodeNext;
441         }
442         dataItem.dev = DBBase64Utils::DecodeIfNeed(dataItem.dev);
443         std::vector<uint8_t> blob = dataItem.value;
444         std::string version = std::string(blob.begin(), blob.end());
445         std::pair<std::string, std::string> versionPair = std::pair<std::string, std::string>(dataItem.dev, version);
446         versionMap.insert(versionPair);
447     }
448     return E_OK;
449 }
450 
GetLocalCloudVersion()451 std::pair<int, CloudSyncData> SqliteCloudKvStore::GetLocalCloudVersion()
452 {
453     std::pair<int, CloudSyncData> res;
454     auto &[errCode, data] = res;
455     Timestamp currentTime = storageHandle_->GetCurrentTimestamp();
456     TimeOffset timeOffset = storageHandle_->GetLocalTimeOffsetForCloud();
457     Timestamp rawSysTime = static_cast<Timestamp>(static_cast<TimeOffset>(currentTime) - timeOffset);
458     SQLiteSingleVerStorageExecutor *handle = nullptr;
459     std::tie(errCode, handle) = storageHandle_->GetStorageExecutor(false);
460     if (errCode != E_OK) {
461         LOGE("[SqliteCloudKvStore] get handle failed %d when fill log", errCode);
462         return res;
463     }
464     sqlite3 *db = nullptr;
465     (void)handle->GetDbHandle(db);
466     std::tie(errCode, data) = SqliteCloudKvExecutorUtils::GetLocalCloudVersion(db, handle->IsMemory(), user_);
467     data.isCloudVersionRecord = true;
468     storageHandle_->RecycleStorageExecutor(handle);
469     FillTimestamp(rawSysTime, currentTime, data.insData);
470     FillTimestamp(rawSysTime, currentTime, data.updData);
471     data.tableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
472     return res;
473 }
474 
FillTimestamp(Timestamp rawSystemTime,Timestamp virtualTime,CloudSyncBatch & syncBatch)475 void SqliteCloudKvStore::FillTimestamp(Timestamp rawSystemTime, Timestamp virtualTime, CloudSyncBatch &syncBatch)
476 {
477     for (auto &item : syncBatch.extend) {
478         item[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(rawSystemTime);
479         if (item.find(CloudDbConstant::CREATE_FIELD) == item.end()) {
480             item[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(rawSystemTime);
481             item[CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME] = static_cast<int64_t>(virtualTime);
482         }
483     }
484 }
485 
CheckSchema(std::map<std::string,DataBaseSchema> schema)486 bool SqliteCloudKvStore::CheckSchema(std::map<std::string, DataBaseSchema> schema)
487 {
488     if (schema.size() == 0) {
489         LOGE("[SqliteCloudKvStore] empty schema.");
490         return false;
491     }
492     for (auto it = schema.begin(); it != schema.end(); it++) {
493         std::vector<TableSchema> tables = it->second.tables;
494         if (tables.size() != 1) {
495             LOGE("[SqliteCloudKvStore] invalid tables num: %zu", tables.size());
496             return false;
497         }
498         TableSchema actualTable = tables[0];
499         std::string expectTableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
500         std::string expectSharedTableName = "";
501         std::vector<Field> expectFields = {
502             {CloudDbConstant::CLOUD_KV_FIELD_KEY, TYPE_INDEX<std::string>, true, true},
503             {CloudDbConstant::CLOUD_KV_FIELD_DEVICE, TYPE_INDEX<std::string>, false, true},
504             {CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, TYPE_INDEX<std::string>, false, true},
505             {CloudDbConstant::CLOUD_KV_FIELD_VALUE, TYPE_INDEX<std::string>, false, true},
506             {CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME, TYPE_INDEX<int64_t>, false, true}
507         };
508         if (actualTable.name != expectTableName || actualTable.sharedTableName != expectSharedTableName ||
509             actualTable.fields.size() != expectFields.size()) {
510             LOGE("[SqliteCloudKvStore] check table failed.");
511             return false;
512         }
513         for (uint32_t i = 0; i < actualTable.fields.size(); i++) {
514             Field actualField = actualTable.fields[i];
515             if (std::find(expectFields.begin(), expectFields.end(), actualField) == expectFields.end()) {
516                 LOGE("[SqliteCloudKvStore] check fields failed.");
517                 return false;
518             }
519         }
520     }
521     return true;
522 }
523 
SetCloudSyncConfig(const CloudSyncConfig & config)524 void SqliteCloudKvStore::SetCloudSyncConfig(const CloudSyncConfig &config)
525 {
526     std::lock_guard<std::mutex> autoLock(configMutex_);
527     config_ = config;
528 }
529 
GetCloudSyncConfig() const530 CloudSyncConfig SqliteCloudKvStore::GetCloudSyncConfig() const
531 {
532     std::lock_guard<std::mutex> autoLock(configMutex_);
533     return config_;
534 }
535 
GetDataBaseSchemas()536 std::map<std::string, DataBaseSchema> SqliteCloudKvStore::GetDataBaseSchemas()
537 {
538     std::lock_guard<std::mutex> autoLock(schemaMutex_);
539     return schema_;
540 }
541 
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)542 void SqliteCloudKvStore::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
543     Timestamp localMark)
544 {
545     recorder_.ReleaseUploadRecord(tableName, type, localMark);
546 }
547 
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)548 bool SqliteCloudKvStore::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
549     SingleVerConflictResolvePolicy policy)
550 {
551     std::string cloudInfoDev;
552     auto decodeCloudInfoDev = DBBase64Utils::Decode(cloudInfo.device);
553     if (!decodeCloudInfoDev.empty()) {
554         cloudInfoDev = std::string(decodeCloudInfoDev.begin(), decodeCloudInfoDev.end());
555     }
556     if (policy == SingleVerConflictResolvePolicy::DENY_OTHER_DEV_AMEND_CUR_DEV_DATA &&
557         !localInfo.originDev.empty() && localInfo.originDev == cloudInfoDev) {
558         return true;
559     }
560     std::string device;
561     if (RuntimeContext::GetInstance()->GetLocalIdentity(device) != E_OK) {
562         LOGE("[SqliteCloudKvStore] GetLocalIdentity device failed.");
563         return false;
564     }
565     device = DBCommon::TransferHashString(device);
566     std::string localInfoDev = localInfo.device;
567     if (localInfoDev.empty()) {
568         return false;
569     }
570     bool isLocal = (localInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL)) ==
571         static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
572     if (cloudInfoDev.empty()) {
573         return !isLocal;
574     }
575     return localInfoDev == cloudInfoDev && localInfoDev != device;
576 }
577 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)578 int SqliteCloudKvStore::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
579     std::vector<std::string> &users)
580 {
581     std::shared_ptr<DataBaseSchema> cloudSchema;
582     (void)GetCloudDbSchema(cloudSchema);
583     if (cloudSchema == nullptr) {
584         return -E_INVALID_SCHEMA;
585     }
586     if (cloudSchema->tables.empty()) {
587         return E_OK;
588     }
589     int ret = StartTransaction(TransactType::DEFERRED);
590     if (ret != E_OK) {
591         return ret;
592     }
593     sqlite3 *db = nullptr;
594     (void)transactionHandle_->GetDbHandle(db);
595     for (const auto &table: cloudSchema->tables) {
596         std::vector<VBucket> syncDataPk;
597         std::vector<VBucket> syncDataUserId;
598         int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(db, transactionHandle_->IsMemory(),
599             syncDataPk, syncDataUserId);
600         if (errCode != E_OK) {
601             LOGW("[SqliteCloudKvStore] Get wait compensated sync date failed, continue! errCode=%d", errCode);
602             continue;
603         }
604         if (syncDataPk.empty()) {
605             continue;
606         }
607         QuerySyncObject syncObject;
608         errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, true, syncObject);
609         if (errCode != E_OK) {
610             LOGW("[SqliteCloudKvStore] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
611             continue;
612         }
613         syncQuery.push_back(syncObject);
614         for (auto &oneRow : syncDataUserId) {
615             std::string user;
616             errCode = CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_USERID, oneRow, user);
617             if (errCode != E_OK) {
618                 LOGW("[SqliteCloudKvStore] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
619                 continue;
620             }
621             users.push_back(user);
622         }
623     }
624     return Commit();
625 }
626 }