1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "sqlite_cloud_kv_store.h"
16
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "query_utils.h"
22 #include "runtime_context.h"
23 #include "sqlite_cloud_kv_executor_utils.h"
24 #include "sqlite_single_ver_continue_token.h"
25
26 namespace DistributedDB {
SqliteCloudKvStore(KvStorageHandle * handle)27 SqliteCloudKvStore::SqliteCloudKvStore(KvStorageHandle *handle)
28 : storageHandle_(handle), transactionHandle_(nullptr)
29 {
30 }
31
GetMetaData(const Key & key,Value & value) const32 int SqliteCloudKvStore::GetMetaData(const Key &key, Value &value) const
33 {
34 return storageHandle_->GetMetaData(key, value);
35 }
36
PutMetaData(const Key & key,const Value & value)37 int SqliteCloudKvStore::PutMetaData(const Key &key, const Value &value)
38 {
39 return storageHandle_->PutMetaData(key, value, false);
40 }
41
ChkSchema(const TableName & tableName)42 int SqliteCloudKvStore::ChkSchema(const TableName &tableName)
43 {
44 return E_OK;
45 }
46
SetCloudDbSchema(const DataBaseSchema & schema)47 int SqliteCloudKvStore::SetCloudDbSchema(const DataBaseSchema &schema)
48 {
49 return E_OK;
50 }
51
SetCloudDbSchema(const std::map<std::string,DataBaseSchema> & schema)52 int SqliteCloudKvStore::SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema)
53 {
54 std::lock_guard<std::mutex> autoLock(schemaMutex_);
55 if (!CheckSchema(schema)) {
56 return -E_INVALID_SCHEMA;
57 }
58 schema_ = schema;
59 return E_OK;
60 }
61
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)62 int SqliteCloudKvStore::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
63 {
64 std::lock_guard<std::mutex> autoLock(schemaMutex_);
65 cloudSchema = std::make_shared<DataBaseSchema>(schema_[user_]);
66 return E_OK;
67 }
68
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)69 int SqliteCloudKvStore::GetCloudTableSchema(const TableName &tableName,
70 TableSchema &tableSchema)
71 {
72 std::lock_guard<std::mutex> autoLock(schemaMutex_);
73 if (schema_.find(user_) == schema_.end()) {
74 LOGE("[SqliteCloudKvStore] not set cloud schema");
75 return -E_SCHEMA_MISMATCH;
76 }
77 auto it = std::find_if(schema_[user_].tables.begin(), schema_[user_].tables.end(), [&](const auto &table) {
78 return table.name == tableName;
79 });
80 if (it != schema_[user_].tables.end()) {
81 tableSchema = *it;
82 return E_OK;
83 }
84 LOGW("[SqliteCloudKvStore] not found table schema");
85 return -E_NOT_FOUND;
86 }
87
StartTransaction(TransactType type)88 int SqliteCloudKvStore::StartTransaction(TransactType type)
89 {
90 {
91 std::lock_guard<std::mutex> autoLock(transactionMutex_);
92 if (transactionHandle_ != nullptr) {
93 LOGW("[SqliteCloudKvStore] transaction has been started");
94 return E_OK;
95 }
96 }
97 auto [errCode, handle] = storageHandle_->GetStorageExecutor(type == TransactType::IMMEDIATE);
98 if (errCode != E_OK) {
99 return errCode;
100 }
101 if (handle == nullptr) {
102 LOGE("[SqliteCloudKvStore] get handle return null");
103 return -E_INTERNAL_ERROR;
104 }
105 errCode = handle->StartTransaction(type);
106 std::lock_guard<std::mutex> autoLock(transactionMutex_);
107 transactionHandle_ = handle;
108 LOGD("[SqliteCloudKvStore] start transaction!");
109 return errCode;
110 }
111
Commit()112 int SqliteCloudKvStore::Commit()
113 {
114 SQLiteSingleVerStorageExecutor *handle;
115 {
116 std::lock_guard<std::mutex> autoLock(transactionMutex_);
117 if (transactionHandle_ == nullptr) {
118 LOGW("[SqliteCloudKvStore] no need to commit, transaction has not been started");
119 return E_OK;
120 }
121 handle = transactionHandle_;
122 transactionHandle_ = nullptr;
123 }
124 int errCode = handle->Commit();
125 storageHandle_->RecycleStorageExecutor(handle);
126 LOGD("[SqliteCloudKvStore] commit transaction!");
127 return errCode;
128 }
129
Rollback()130 int SqliteCloudKvStore::Rollback()
131 {
132 SQLiteSingleVerStorageExecutor *handle;
133 {
134 std::lock_guard<std::mutex> autoLock(transactionMutex_);
135 if (transactionHandle_ == nullptr) {
136 LOGW("[SqliteCloudKvStore] no need to rollback, transaction has not been started");
137 return E_OK;
138 }
139 handle = transactionHandle_;
140 transactionHandle_ = nullptr;
141 }
142 int errCode = handle->Rollback();
143 storageHandle_->RecycleStorageExecutor(handle);
144 LOGD("[SqliteCloudKvStore] rollback transaction!");
145 return errCode;
146 }
147
148 int SqliteCloudKvStore::GetUploadCount([[gnu::unused]] const QuerySyncObject &query,
149 const Timestamp ×tamp, bool isCloudForcePush, [[gnu::unused]] bool isCompensatedTask,
150 int64_t &count)
151 {
152 auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
153 if (db == nullptr) {
154 LOGE("[SqliteCloudKvStore] get upload count without transaction");
155 return -E_INTERNAL_ERROR;
156 }
157 int errCode = E_OK;
158 std::tie(errCode, count) = SqliteCloudKvExecutorUtils::CountCloudData(db, isMemory, timestamp, user_,
159 isCloudForcePush);
160 return errCode;
161 }
162
163 int SqliteCloudKvStore::GetAllUploadCount(const QuerySyncObject &query,
164 const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, [[gnu::unused]] bool isCompensatedTask,
165 int64_t &count)
166 {
167 auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
168 if (db == nullptr) {
169 LOGE("[SqliteCloudKvStore] get upload count without transaction");
170 return -E_INTERNAL_ERROR;
171 }
172 int errCode = E_OK;
173 QuerySyncObject queryObj = query;
174 std::tie(errCode, count) = SqliteCloudKvExecutorUtils::CountAllCloudData({ db, isMemory }, timestampVec, user_,
175 isCloudForcePush, queryObj);
176 return errCode;
177 }
178
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & object,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)179 int SqliteCloudKvStore::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object,
180 const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
181 {
182 SyncTimeRange timeRange;
183 // memory db use watermark
184 timeRange.beginTime = GetTransactionDbHandleAndMemoryStatus().second ? beginTime : 0;
185 auto token = new (std::nothrow) SQLiteSingleVerContinueToken(timeRange, object);
186 if (token == nullptr) {
187 LOGE("[SqliteCloudKvStore] create token failed");
188 return -E_OUT_OF_MEMORY;
189 }
190 token->SetUser(user_);
191 recorder_.SetUser(user_);
192 cloudDataResult.tableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
193 continueStmtToken = static_cast<ContinueToken>(token);
194 return GetCloudDataNext(continueStmtToken, cloudDataResult);
195 }
196
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)197 int SqliteCloudKvStore::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
198 {
199 if (continueStmtToken == nullptr) {
200 LOGE("[SqliteCloudKvStore] token is null");
201 return -E_INVALID_ARGS;
202 }
203 auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
204 if (!token->CheckValid()) {
205 LOGE("[SqliteCloudKvStore] token is invalid");
206 return -E_INVALID_ARGS;
207 }
208 auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
209 if (db == nullptr) {
210 LOGE("[SqliteCloudKvStore] the transaction has not been started, release the token");
211 ReleaseCloudDataToken(continueStmtToken);
212 return -E_INTERNAL_ERROR;
213 }
214 int errCode = SqliteCloudKvExecutorUtils::GetCloudData(GetCloudSyncConfig(), {db, isMemory}, recorder_, *token,
215 cloudDataResult);
216 if (errCode != -E_UNFINISHED) {
217 ReleaseCloudDataToken(continueStmtToken);
218 } else {
219 continueStmtToken = token;
220 }
221 return errCode;
222 }
223
ReleaseCloudDataToken(ContinueToken & continueStmtToken)224 int SqliteCloudKvStore::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
225 {
226 if (continueStmtToken == nullptr) {
227 return E_OK;
228 }
229 auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
230 if (!token->CheckValid()) {
231 return E_OK;
232 }
233 token->ReleaseCloudQueryStmt();
234 delete token;
235 continueStmtToken = nullptr;
236 return E_OK;
237 }
238
239 int SqliteCloudKvStore::GetInfoByPrimaryKeyOrGid([[gnu::unused]] const std::string &tableName, const VBucket &vBucket,
240 DataInfoWithLog &dataInfoWithLog, [[gnu::unused]] VBucket &assetInfo)
241 {
242 auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
243 if (db == nullptr) {
244 LOGE("[SqliteCloudKvStore] the transaction has not been started");
245 return -E_INTERNAL_ERROR;
246 }
247 int errCode = E_OK;
248 std::tie(errCode, dataInfoWithLog) = SqliteCloudKvExecutorUtils::GetLogInfo(db, isMemory, vBucket, user_);
249 return errCode;
250 }
251
252 int SqliteCloudKvStore::PutCloudSyncData([[gnu::unused]] const std::string &tableName, DownloadData &downloadData)
253 {
254 auto [db, isMemory] = GetTransactionDbHandleAndMemoryStatus();
255 if (db == nullptr) {
256 LOGE("[SqliteCloudKvStore] the transaction has not been started");
257 return -E_INTERNAL_ERROR;
258 }
259 downloadData.timeOffset = storageHandle_->GetLocalTimeOffsetForCloud();
260 return SqliteCloudKvExecutorUtils::PutCloudData(db, isMemory, downloadData);
261 }
262
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)263 int SqliteCloudKvStore::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset,
264 bool ignoreEmptyGid)
265 {
266 auto [errCode, handle] = storageHandle_->GetStorageExecutor(true);
267 if (errCode != E_OK) {
268 LOGE("[SqliteCloudKvStore] get handle failed %d when fill log", errCode);
269 return errCode;
270 }
271 if (handle->IsMemory()) {
272 errCode = Commit();
273 if (errCode != E_OK) {
274 LOGE("[SqliteCloudKvStore] commit failed %d before fill log", errCode);
275 storageHandle_->RecycleStorageExecutor(handle);
276 return errCode;
277 }
278 }
279 sqlite3 *db = nullptr;
280 (void)handle->GetDbHandle(db);
281 errCode = SqliteCloudKvExecutorUtils::FillCloudLog({db, ignoreEmptyGid}, opType, data, user_, recorder_);
282 int ret = E_OK;
283 if (handle->IsMemory()) {
284 ret = StartTransaction(TransactType::DEFERRED);
285 if (ret != E_OK) {
286 LOGE("[SqliteCloudKvStore] restart transaction failed %d", ret);
287 }
288 }
289 storageHandle_->RecycleStorageExecutor(handle);
290 return errCode == E_OK ? ret : errCode;
291 }
292
FilterCloudVersionPrefixKey(std::vector<std::vector<Type>> & changeValList)293 void SqliteCloudKvStore::FilterCloudVersionPrefixKey(std::vector<std::vector<Type>> &changeValList)
294 {
295 changeValList.erase(std::remove_if(changeValList.begin(), changeValList.end(),
296 [&](const std::vector<Type> &existPkVal) {
297 bool isFilter = false;
298 for (auto type : existPkVal) {
299 std::string prefixKey;
300 int errCode = CloudStorageUtils::GetValueFromOneField(type, prefixKey);
301 if (errCode != E_OK) {
302 LOGE("[SqliteCloudKvStore] can not get key from changedData, %d", errCode);
303 break;
304 }
305 isFilter = !prefixKey.empty() && prefixKey.find(CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY) == 0;
306 if (isFilter) {
307 break;
308 }
309 }
310 return isFilter;
311 }), changeValList.end());
312 }
313
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)314 void SqliteCloudKvStore::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
315 bool isChangedData)
316 {
317 {
318 std::lock_guard<std::mutex> autoLock(observerMapMutex_);
319 if (cloudObserverMap_.empty()) {
320 return;
321 }
322 }
323 for (auto &changeValList : changedData.primaryData) {
324 FilterCloudVersionPrefixKey(changeValList);
325 }
326 RefObject::IncObjRef(this);
327 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, deviceName, changedData, isChangedData]() {
328 {
329 std::lock_guard<std::mutex> autoLock(observerMapMutex_);
330 for (const auto &item : cloudObserverMap_) {
331 ChangedData observerChangeData = changedData;
332 item.second(deviceName, std::move(observerChangeData), isChangedData);
333 }
334 }
335 RefObject::DecObjRef(this);
336 });
337 if (errCode != E_OK) {
338 LOGW("[SqliteCloudKvStore] Trigger observer action failed %d", errCode);
339 RefObject::DecObjRef(this);
340 }
341 }
342
GetIdentify() const343 std::string SqliteCloudKvStore::GetIdentify() const
344 {
345 return "";
346 }
347
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)348 int SqliteCloudKvStore::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
349 bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
350 {
351 auto[errCode, handle] = storageHandle_->GetStorageExecutor(false);
352 if (errCode != E_OK) {
353 LOGE("[SqliteCloudKvStore] get handle failed %d", errCode);
354 return errCode;
355 }
356 sqlite3 *db = nullptr;
357 (void)handle->GetDbHandle(db);
358 QuerySyncObject query = querySyncObject;
359 errCode = SqliteCloudKvExecutorUtils::QueryCloudGid(db, handle->IsMemory(), user_, query, cloudGid);
360 storageHandle_->RecycleStorageExecutor(handle);
361 if (errCode != E_OK) {
362 LOGE("[SqliteCloudKvStore] Query cloud gid failed %d", errCode);
363 }
364 return errCode;
365 }
366
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)367 int SqliteCloudKvStore::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
368 {
369 return E_OK;
370 }
371
SetLogTriggerStatus(bool status)372 int SqliteCloudKvStore::SetLogTriggerStatus(bool status)
373 {
374 return E_OK;
375 }
376
SetCursorIncFlag(bool status)377 int SqliteCloudKvStore::SetCursorIncFlag(bool status)
378 {
379 return E_OK;
380 }
381
CheckQueryValid(const QuerySyncObject & query)382 int SqliteCloudKvStore::CheckQueryValid(const QuerySyncObject &query)
383 {
384 return E_OK;
385 }
386
IsSharedTable(const std::string & tableName)387 bool SqliteCloudKvStore::IsSharedTable(const std::string &tableName)
388 {
389 return false;
390 }
391
SetUser(const std::string & user)392 void SqliteCloudKvStore::SetUser(const std::string &user)
393 {
394 user_ = user;
395 }
396
GetTransactionDbHandleAndMemoryStatus()397 std::pair<sqlite3 *, bool> SqliteCloudKvStore::GetTransactionDbHandleAndMemoryStatus()
398 {
399 std::lock_guard<std::mutex> autoLock(transactionMutex_);
400 if (transactionHandle_ == nullptr) {
401 return {nullptr, false};
402 }
403 sqlite3 *db = nullptr;
404 (void)transactionHandle_->GetDbHandle(db);
405 return {db, transactionHandle_->IsMemory()};
406 }
407
RegisterObserverAction(const KvStoreObserver * observer,const ObserverAction & action)408 void SqliteCloudKvStore::RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action)
409 {
410 std::lock_guard<std::mutex> autoLock(observerMapMutex_);
411 cloudObserverMap_[observer] = action;
412 }
413
UnRegisterObserverAction(const KvStoreObserver * observer)414 void SqliteCloudKvStore::UnRegisterObserverAction(const KvStoreObserver *observer)
415 {
416 std::lock_guard<std::mutex> autoLock(observerMapMutex_);
417 cloudObserverMap_.erase(observer);
418 }
419
GetCloudVersion(const std::string & device,std::map<std::string,std::string> & versionMap)420 int SqliteCloudKvStore::GetCloudVersion(const std::string &device, std::map<std::string, std::string> &versionMap)
421 {
422 auto[errCode, handle] = storageHandle_->GetStorageExecutor(false);
423 if (errCode != E_OK) {
424 LOGE("[SqliteCloudKvStore] get handle failed %d", errCode);
425 return errCode;
426 }
427 sqlite3 *db = nullptr;
428 (void)handle->GetDbHandle(db);
429 std::vector<VBucket> dataVector = {};
430 errCode = SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(db, handle->IsMemory(), user_, device, dataVector);
431 storageHandle_->RecycleStorageExecutor(handle);
432 if (errCode != E_OK) {
433 LOGE("[SqliteCloudKvStore] get cloud version record failed %d", errCode);
434 return errCode;
435 }
436 for (VBucket &data : dataVector) {
437 auto [errCodeNext, dataItem] = CloudStorageUtils::GetDataItemFromCloudVersionData(data);
438 if (errCodeNext != E_OK) {
439 LOGE("[SqliteCloudKvStore] get dataItem failed %d", errCodeNext);
440 return errCodeNext;
441 }
442 dataItem.dev = DBBase64Utils::DecodeIfNeed(dataItem.dev);
443 std::vector<uint8_t> blob = dataItem.value;
444 std::string version = std::string(blob.begin(), blob.end());
445 std::pair<std::string, std::string> versionPair = std::pair<std::string, std::string>(dataItem.dev, version);
446 versionMap.insert(versionPair);
447 }
448 return E_OK;
449 }
450
GetLocalCloudVersion()451 std::pair<int, CloudSyncData> SqliteCloudKvStore::GetLocalCloudVersion()
452 {
453 std::pair<int, CloudSyncData> res;
454 auto &[errCode, data] = res;
455 Timestamp currentTime = storageHandle_->GetCurrentTimestamp();
456 TimeOffset timeOffset = storageHandle_->GetLocalTimeOffsetForCloud();
457 Timestamp rawSysTime = static_cast<Timestamp>(static_cast<TimeOffset>(currentTime) - timeOffset);
458 SQLiteSingleVerStorageExecutor *handle = nullptr;
459 std::tie(errCode, handle) = storageHandle_->GetStorageExecutor(false);
460 if (errCode != E_OK) {
461 LOGE("[SqliteCloudKvStore] get handle failed %d when fill log", errCode);
462 return res;
463 }
464 sqlite3 *db = nullptr;
465 (void)handle->GetDbHandle(db);
466 std::tie(errCode, data) = SqliteCloudKvExecutorUtils::GetLocalCloudVersion(db, handle->IsMemory(), user_);
467 data.isCloudVersionRecord = true;
468 storageHandle_->RecycleStorageExecutor(handle);
469 FillTimestamp(rawSysTime, currentTime, data.insData);
470 FillTimestamp(rawSysTime, currentTime, data.updData);
471 data.tableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
472 return res;
473 }
474
FillTimestamp(Timestamp rawSystemTime,Timestamp virtualTime,CloudSyncBatch & syncBatch)475 void SqliteCloudKvStore::FillTimestamp(Timestamp rawSystemTime, Timestamp virtualTime, CloudSyncBatch &syncBatch)
476 {
477 for (auto &item : syncBatch.extend) {
478 item[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(rawSystemTime);
479 if (item.find(CloudDbConstant::CREATE_FIELD) == item.end()) {
480 item[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(rawSystemTime);
481 item[CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME] = static_cast<int64_t>(virtualTime);
482 }
483 }
484 }
485
CheckSchema(std::map<std::string,DataBaseSchema> schema)486 bool SqliteCloudKvStore::CheckSchema(std::map<std::string, DataBaseSchema> schema)
487 {
488 if (schema.size() == 0) {
489 LOGE("[SqliteCloudKvStore] empty schema.");
490 return false;
491 }
492 for (auto it = schema.begin(); it != schema.end(); it++) {
493 std::vector<TableSchema> tables = it->second.tables;
494 if (tables.size() != 1) {
495 LOGE("[SqliteCloudKvStore] invalid tables num: %zu", tables.size());
496 return false;
497 }
498 TableSchema actualTable = tables[0];
499 std::string expectTableName = CloudDbConstant::CLOUD_KV_TABLE_NAME;
500 std::string expectSharedTableName = "";
501 std::vector<Field> expectFields = {
502 {CloudDbConstant::CLOUD_KV_FIELD_KEY, TYPE_INDEX<std::string>, true, true},
503 {CloudDbConstant::CLOUD_KV_FIELD_DEVICE, TYPE_INDEX<std::string>, false, true},
504 {CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, TYPE_INDEX<std::string>, false, true},
505 {CloudDbConstant::CLOUD_KV_FIELD_VALUE, TYPE_INDEX<std::string>, false, true},
506 {CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME, TYPE_INDEX<int64_t>, false, true}
507 };
508 if (actualTable.name != expectTableName || actualTable.sharedTableName != expectSharedTableName ||
509 actualTable.fields.size() != expectFields.size()) {
510 LOGE("[SqliteCloudKvStore] check table failed.");
511 return false;
512 }
513 for (uint32_t i = 0; i < actualTable.fields.size(); i++) {
514 Field actualField = actualTable.fields[i];
515 if (std::find(expectFields.begin(), expectFields.end(), actualField) == expectFields.end()) {
516 LOGE("[SqliteCloudKvStore] check fields failed.");
517 return false;
518 }
519 }
520 }
521 return true;
522 }
523
SetCloudSyncConfig(const CloudSyncConfig & config)524 void SqliteCloudKvStore::SetCloudSyncConfig(const CloudSyncConfig &config)
525 {
526 std::lock_guard<std::mutex> autoLock(configMutex_);
527 config_ = config;
528 }
529
GetCloudSyncConfig() const530 CloudSyncConfig SqliteCloudKvStore::GetCloudSyncConfig() const
531 {
532 std::lock_guard<std::mutex> autoLock(configMutex_);
533 return config_;
534 }
535
GetDataBaseSchemas()536 std::map<std::string, DataBaseSchema> SqliteCloudKvStore::GetDataBaseSchemas()
537 {
538 std::lock_guard<std::mutex> autoLock(schemaMutex_);
539 return schema_;
540 }
541
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)542 void SqliteCloudKvStore::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
543 Timestamp localMark)
544 {
545 recorder_.ReleaseUploadRecord(tableName, type, localMark);
546 }
547
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)548 bool SqliteCloudKvStore::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
549 SingleVerConflictResolvePolicy policy)
550 {
551 std::string cloudInfoDev;
552 auto decodeCloudInfoDev = DBBase64Utils::Decode(cloudInfo.device);
553 if (!decodeCloudInfoDev.empty()) {
554 cloudInfoDev = std::string(decodeCloudInfoDev.begin(), decodeCloudInfoDev.end());
555 }
556 if (policy == SingleVerConflictResolvePolicy::DENY_OTHER_DEV_AMEND_CUR_DEV_DATA &&
557 !localInfo.originDev.empty() && localInfo.originDev == cloudInfoDev) {
558 return true;
559 }
560 std::string device;
561 if (RuntimeContext::GetInstance()->GetLocalIdentity(device) != E_OK) {
562 LOGE("[SqliteCloudKvStore] GetLocalIdentity device failed.");
563 return false;
564 }
565 device = DBCommon::TransferHashString(device);
566 std::string localInfoDev = localInfo.device;
567 if (localInfoDev.empty()) {
568 return false;
569 }
570 bool isLocal = (localInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL)) ==
571 static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
572 if (cloudInfoDev.empty()) {
573 return !isLocal;
574 }
575 return localInfoDev == cloudInfoDev && localInfoDev != device;
576 }
577
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)578 int SqliteCloudKvStore::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
579 std::vector<std::string> &users)
580 {
581 std::shared_ptr<DataBaseSchema> cloudSchema;
582 (void)GetCloudDbSchema(cloudSchema);
583 if (cloudSchema == nullptr) {
584 return -E_INVALID_SCHEMA;
585 }
586 if (cloudSchema->tables.empty()) {
587 return E_OK;
588 }
589 int ret = StartTransaction(TransactType::DEFERRED);
590 if (ret != E_OK) {
591 return ret;
592 }
593 sqlite3 *db = nullptr;
594 (void)transactionHandle_->GetDbHandle(db);
595 for (const auto &table: cloudSchema->tables) {
596 std::vector<VBucket> syncDataPk;
597 std::vector<VBucket> syncDataUserId;
598 int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(db, transactionHandle_->IsMemory(),
599 syncDataPk, syncDataUserId);
600 if (errCode != E_OK) {
601 LOGW("[SqliteCloudKvStore] Get wait compensated sync date failed, continue! errCode=%d", errCode);
602 continue;
603 }
604 if (syncDataPk.empty()) {
605 continue;
606 }
607 QuerySyncObject syncObject;
608 errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, true, syncObject);
609 if (errCode != E_OK) {
610 LOGW("[SqliteCloudKvStore] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
611 continue;
612 }
613 syncQuery.push_back(syncObject);
614 for (auto &oneRow : syncDataUserId) {
615 std::string user;
616 errCode = CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_USERID, oneRow, user);
617 if (errCode != E_OK) {
618 LOGW("[SqliteCloudKvStore] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
619 continue;
620 }
621 users.push_back(user);
622 }
623 }
624 return Commit();
625 }
626 }