1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "sqlite_log_table_manager.h"
26 #include "sqlite_relational_store_connection.h"
27 #include "storage_engine_manager.h"
28 #include "cloud_sync_utils.h"
29 
30 namespace DistributedDB {
31 namespace {
32 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
33 }
34 
~SQLiteRelationalStore()35 SQLiteRelationalStore::~SQLiteRelationalStore()
36 {
37     sqliteStorageEngine_ = nullptr;
38 }
39 
40 // Called when a new connection created.
IncreaseConnectionCounter()41 void SQLiteRelationalStore::IncreaseConnectionCounter()
42 {
43     connectionCount_.fetch_add(1, std::memory_order_seq_cst);
44     if (connectionCount_.load() > 0) {
45         sqliteStorageEngine_->SetConnectionFlag(true);
46     }
47 }
48 
GetDBConnection(int & errCode)49 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
50 {
51     std::lock_guard<std::mutex> lock(connectMutex_);
52     RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
53     if (connection == nullptr) {
54         errCode = -E_OUT_OF_MEMORY;
55         return nullptr;
56     }
57     IncObjRef(this);
58     IncreaseConnectionCounter();
59     return connection;
60 }
61 
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)62 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
63 {
64     option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
65     option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
66     if (properties.IsEncrypted()) {
67         option.cipherType = properties.GetCipherType();
68         option.passwd = properties.GetPasswd();
69         option.iterTimes = properties.GetIterTimes();
70     }
71 }
72 
InitStorageEngine(const RelationalDBProperties & properties)73 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
74 {
75     OpenDbProperties option;
76     InitDataBaseOption(properties, option);
77     std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
78 
79     StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
80     int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
81     if (errCode != E_OK) {
82         LOGE("Init the sqlite storage engine failed:%d", errCode);
83     }
84     return errCode;
85 }
86 
ReleaseResources()87 void SQLiteRelationalStore::ReleaseResources()
88 {
89     if (sqliteStorageEngine_ != nullptr) {
90         sqliteStorageEngine_->ClearEnginePasswd();
91         sqliteStorageEngine_ = nullptr;
92     }
93     if (cloudSyncer_ != nullptr) {
94         cloudSyncer_->Close();
95         RefObject::KillAndDecObjRef(cloudSyncer_);
96         cloudSyncer_ = nullptr;
97     }
98     RefObject::DecObjRef(storageEngine_);
99 }
100 
CheckDBMode()101 int SQLiteRelationalStore::CheckDBMode()
102 {
103     int errCode = E_OK;
104     auto *handle = GetHandle(true, errCode);
105     if (handle == nullptr) {
106         return errCode;
107     }
108     errCode = handle->CheckDBModeForRelational();
109     if (errCode != E_OK) {
110         LOGE("check relational DB mode failed. %d", errCode);
111     }
112 
113     ReleaseHandle(handle);
114     return errCode;
115 }
116 
GetSchemaFromMeta(RelationalSchemaObject & schema)117 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
118 {
119     Key schemaKey;
120     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
121     Value schemaVal;
122     int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
123     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
124         LOGE("Get relational schema from meta table failed. %d", errCode);
125         return errCode;
126     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
127         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
128         return -E_NOT_FOUND;
129     }
130 
131     std::string schemaStr;
132     DBCommon::VectorToString(schemaVal, schemaStr);
133     errCode = schema.ParseFromSchemaString(schemaStr);
134     if (errCode != E_OK) {
135         LOGE("Parse schema string from meta table failed.");
136         return errCode;
137     }
138 
139     sqliteStorageEngine_->SetSchema(schema);
140     return E_OK;
141 }
142 
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)143 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
144 {
145     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
146     Value modeVal;
147     int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
148     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
149         LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
150         return errCode;
151     }
152 
153     DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
154     if (!modeVal.empty()) {
155         std::string value(modeVal.begin(), modeVal.end());
156         orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
157     } else if (isUnSet) {
158         return E_OK; // First set table mode.
159     }
160 
161     if (orgMode != mode) {
162         LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
163         return -E_INVALID_ARGS;
164     }
165     return E_OK;
166 }
167 
CheckProperties(RelationalDBProperties properties)168 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
169 {
170     RelationalSchemaObject schema;
171     int errCode = GetSchemaFromMeta(schema);
172     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
173         LOGE("Get relational schema from meta failed. errcode=%d", errCode);
174         return errCode;
175     }
176     int ret = InitTrackerSchemaFromMeta();
177     if (ret != E_OK) {
178         LOGE("Init tracker schema from meta failed. errcode=%d", ret);
179         return ret;
180     }
181     properties.SetSchema(schema);
182 
183     // Empty schema means no distributed table has been used, we may set DB to any table mode
184     // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
185     bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
186     auto mode = static_cast<DistributedTableMode>(
187         properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
188     errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
189     if (errCode != E_OK) {
190         LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
191         return errCode;
192     }
193     if (!isSchemaEmpty) {
194         return errCode;
195     }
196 
197     errCode = SaveTableModeToMeta(mode);
198     if (errCode != E_OK) {
199         LOGE("Save table mode to meta failed. errCode=%d", errCode);
200         return errCode;
201     }
202 
203     return E_OK;
204 }
205 
SaveSchemaToMeta()206 int SQLiteRelationalStore::SaveSchemaToMeta()
207 {
208     Key schemaKey;
209     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
210     Value schemaVal;
211     DBCommon::StringToVector(sqliteStorageEngine_->GetSchema().ToSchemaString(), schemaVal);
212     int errCode = storageEngine_->PutMetaData(schemaKey, schemaVal);
213     if (errCode != E_OK) {
214         LOGE("Save relational schema to meta table failed. %d", errCode);
215     }
216     return errCode;
217 }
218 
SaveTableModeToMeta(DistributedTableMode mode)219 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
220 {
221     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
222     Value modeVal;
223     DBCommon::StringToVector(std::to_string(mode), modeVal);
224     int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
225     if (errCode != E_OK) {
226         LOGE("Save relational schema to meta table failed. %d", errCode);
227     }
228     return errCode;
229 }
230 
SaveLogTableVersionToMeta()231 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
232 {
233     LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
234     const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY.begin(), DBConstant::LOG_TABLE_VERSION_KEY.end());
235     Value logVersion;
236     int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
237     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
238         LOGE("Get log version from meta table failed. %d", errCode);
239         return errCode;
240     }
241     std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
242     Value logVersionVal(versionStr.begin(), versionStr.end());
243     // log version is same, no need to update
244     if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
245         return errCode;
246     }
247     // If the log version does not exist or is different, update the log version
248     errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
249     if (errCode != E_OK) {
250         LOGE("save log table version to meta table failed. %d", errCode);
251     }
252     return errCode;
253 }
254 
CleanDistributedDeviceTable()255 int SQLiteRelationalStore::CleanDistributedDeviceTable()
256 {
257     std::vector<std::string> missingTables;
258     int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
259     if (errCode != E_OK) {
260         LOGE("Clean distributed device table failed. %d", errCode);
261     }
262     for (const auto &deviceTableName : missingTables) {
263         std::string deviceHash;
264         std::string tableName;
265         DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
266         syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
267         if (errCode != E_OK) {
268             LOGE("Erase water mark failed:%d", errCode);
269             return errCode;
270         }
271     }
272     return errCode;
273 }
274 
Open(const RelationalDBProperties & properties)275 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
276 {
277     std::lock_guard<std::mutex> lock(initalMutex_);
278     if (isInitialized_) {
279         LOGD("[RelationalStore][Open] relational db was already initialized.");
280         return E_OK;
281     }
282     int errCode = InitSQLiteStorageEngine(properties);
283     if (errCode != E_OK) {
284         return errCode;
285     }
286 
287     do {
288         errCode = InitStorageEngine(properties);
289         if (errCode != E_OK) {
290             LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
291             break;
292         }
293 
294         storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
295         if (storageEngine_ == nullptr) {
296             LOGE("[RelationalStore][Open] Create syncable storage failed");
297             errCode = -E_OUT_OF_MEMORY;
298             break;
299         }
300 
301         syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
302         // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
303         // be destructed when close store
304         storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
305         cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
306 
307         errCode = CheckDBMode();
308         if (errCode != E_OK) {
309             break;
310         }
311 
312         errCode = CheckProperties(properties);
313         if (errCode != E_OK) {
314             break;
315         }
316 
317         errCode = SaveLogTableVersionToMeta();
318         if (errCode != E_OK) {
319             break;
320         }
321 
322         errCode = CleanDistributedDeviceTable();
323         if (errCode != E_OK) {
324             break;
325         }
326 
327         isInitialized_ = true;
328         return E_OK;
329     } while (false);
330 
331     ReleaseResources();
332     return errCode;
333 }
334 
OnClose(const std::function<void (void)> & notifier)335 void SQLiteRelationalStore::OnClose(const std::function<void(void)> &notifier)
336 {
337     AutoLock lockGuard(this);
338     if (notifier) {
339         closeNotifiers_.push_back(notifier);
340     } else {
341         LOGW("Register 'Close()' notifier failed, notifier is null.");
342     }
343 }
344 
GetHandle(bool isWrite,int & errCode) const345 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
346 {
347     if (sqliteStorageEngine_ == nullptr) {
348         errCode = -E_INVALID_DB;
349         return nullptr;
350     }
351 
352     return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
353         sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
354 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const355 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
356 {
357     if (handle == nullptr) {
358         return;
359     }
360 
361     if (sqliteStorageEngine_ != nullptr) {
362         StorageExecutor *databaseHandle = handle;
363         sqliteStorageEngine_->Recycle(databaseHandle);
364         handle = nullptr;
365     }
366 }
367 
Sync(const ISyncer::SyncParma & syncParam,uint64_t connectionId)368 int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId)
369 {
370     return syncAbleEngine_->Sync(syncParam, connectionId);
371 }
372 
373 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)374 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
375 {
376     int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
377     if (count <= 0) {
378         LOGF("Decrease db connection counter failed, count <= 0.");
379         return;
380     }
381     if (storageEngine_ != nullptr) {
382         storageEngine_->EraseDataChangeCallback(connectionId);
383     }
384     if (count != 1) {
385         return;
386     }
387 
388     LockObj();
389     auto notifiers = std::move(closeNotifiers_);
390     UnlockObj();
391     for (const auto &notifier : notifiers) {
392         if (notifier) {
393             notifier();
394         }
395     }
396 
397     // Sync Close
398     syncAbleEngine_->Close();
399 
400     if (cloudSyncer_ != nullptr) {
401         cloudSyncer_->Close();
402         RefObject::KillAndDecObjRef(cloudSyncer_);
403         cloudSyncer_ = nullptr;
404     }
405 
406     if (sqliteStorageEngine_ != nullptr) {
407         sqliteStorageEngine_ = nullptr;
408     }
409     {
410         if (storageEngine_ != nullptr) {
411             storageEngine_->RegisterHeartBeatListener(nullptr);
412         }
413         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
414         StopLifeCycleTimer();
415         lifeCycleNotifier_ = nullptr;
416     }
417     // close will dec sync ref of storageEngine_
418     DecObjRef(storageEngine_);
419 }
420 
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)421 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
422 {
423     if (connectionCount_.load() == 1) {
424         sqliteStorageEngine_->SetConnectionFlag(false);
425     }
426 
427     connectMutex_.lock();
428     if (connection != nullptr) {
429         KillAndDecObjRef(connection);
430         DecreaseConnectionCounter(connectionId);
431         connectMutex_.unlock();
432         KillAndDecObjRef(this);
433     } else {
434         connectMutex_.unlock();
435     }
436 }
437 
WakeUpSyncer()438 void SQLiteRelationalStore::WakeUpSyncer()
439 {
440     syncAbleEngine_->WakeUpSyncer();
441 }
442 
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)443 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
444     bool trackerSchemaChanged)
445 {
446     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
447     TableInfo tableInfo = localSchema.GetTable(tableName);
448     if (!tableInfo.Empty()) {
449         bool isSharedTable = tableInfo.GetSharedTableMark();
450         if (isSharedTable && !trackerSchemaChanged) {
451             return E_OK; // shared table will create distributed table when use SetCloudDbSchema
452         }
453     }
454 
455     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
456         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
457 
458     std::string localIdentity; // collaboration mode need local identify
459     if (mode == DistributedTableMode::COLLABORATION) {
460         int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
461         if (errCode != E_OK || localIdentity.empty()) {
462             LOGD("Get local identity failed, can not create.");
463             return -E_NOT_SUPPORT;
464         }
465     }
466 
467     bool schemaChanged = false;
468     int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(localIdentity),
469         schemaChanged, syncType, trackerSchemaChanged);
470     if (errCode != E_OK) {
471         LOGE("Create distributed table failed. %d", errCode);
472     }
473     if (schemaChanged) {
474         LOGD("Notify schema changed.");
475         storageEngine_->NotifySchemaChanged();
476     }
477     return errCode;
478 }
479 
GetCloudSyncTaskCount()480 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
481 {
482     if (cloudSyncer_ == nullptr) {
483         LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
484         return -1;
485     }
486     return cloudSyncer_->GetCloudSyncTaskCount();
487 }
488 
CleanCloudData(ClearMode mode)489 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
490 {
491     auto tableMode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
492         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
493     if (tableMode == DistributedTableMode::COLLABORATION) {
494         LOGE("Not support remove device data in collaboration mode.");
495         return -E_NOT_SUPPORT;
496     }
497     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
498     TableInfoMap tables = localSchema.GetTables();
499     std::vector<std::string> cloudTableNameList;
500     for (const auto &tableInfo : tables) {
501         bool isSharedTable = tableInfo.second.GetSharedTableMark();
502         if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
503             continue;
504         }
505         if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
506             cloudTableNameList.push_back(tableInfo.first);
507         }
508     }
509     if (cloudTableNameList.empty()) {
510         LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
511         return E_OK;
512     }
513     if (cloudSyncer_ == nullptr) {
514         LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
515         return -E_INVALID_DB;
516     }
517     int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
518     if (errCode != E_OK) {
519         LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
520     }
521 
522     return errCode;
523 }
524 
RemoveDeviceData()525 int SQLiteRelationalStore::RemoveDeviceData()
526 {
527     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
528         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
529     if (mode == DistributedTableMode::COLLABORATION) {
530         LOGE("Not support remove device data in collaboration mode.");
531         return -E_NOT_SUPPORT;
532     }
533 
534     std::vector<std::string> tableNameList = GetAllDistributedTableName();
535     if (tableNameList.empty()) {
536         return E_OK;
537     }
538     // erase watermark first
539     int errCode = EraseAllDeviceWatermark(tableNameList);
540     if (errCode != E_OK) {
541         LOGE("remove watermark failed %d", errCode);
542         return errCode;
543     }
544     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
545     errCode = GetHandleAndStartTransaction(handle);
546     if (handle == nullptr) {
547         return errCode;
548     }
549 
550     for (const auto &table : tableNameList) {
551         errCode = handle->DeleteDistributedDeviceTable("", table);
552         if (errCode != E_OK) {
553             LOGE("delete device data failed. %d", errCode);
554             break;
555         }
556 
557         errCode = handle->DeleteDistributedAllDeviceTableLog(table);
558         if (errCode != E_OK) {
559             LOGE("delete device data failed. %d", errCode);
560             break;
561         }
562     }
563 
564     if (errCode != E_OK) {
565         (void)handle->Rollback();
566         ReleaseHandle(handle);
567         return errCode;
568     }
569 
570     errCode = handle->Commit();
571     ReleaseHandle(handle);
572     storageEngine_->NotifySchemaChanged();
573     return errCode;
574 }
575 
RemoveDeviceData(const std::string & device,const std::string & tableName)576 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
577 {
578     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
579         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
580     if (mode == DistributedTableMode::COLLABORATION) {
581         LOGE("Not support remove device data in collaboration mode.");
582         return -E_NOT_SUPPORT;
583     }
584 
585     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
586     auto iter = tables.find(tableName);
587     if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
588         LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
589         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
590     }
591     // cloud mode is not permit
592     if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
593         LOGE("Remove device data with cloud sync table name.");
594         return -E_NOT_SUPPORT;
595     }
596     bool isNeedHash = false;
597     std::string hashDeviceId;
598     int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
599     if (errCode == -E_NOT_SUPPORT) {
600         isNeedHash = true;
601         hashDeviceId = device;
602         errCode = E_OK;
603     }
604     if (errCode != E_OK) {
605         return errCode;
606     }
607     if (isNeedHash) {
608         // check device is uuid in meta
609         std::set<std::string> hashDevices;
610         errCode = GetExistDevices(hashDevices);
611         if (errCode != E_OK) {
612             return errCode;
613         }
614         if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
615             LOGD("[SQLiteRelationalStore] not match device, just return");
616             return E_OK;
617         }
618     }
619     return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
620 }
621 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)622 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
623     const RelationalObserverAction &action)
624 {
625     return storageEngine_->RegisterObserverAction(connectionId, observer, action);
626 }
627 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)628 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
629 {
630     return storageEngine_->UnRegisterObserverAction(connectionId, observer);
631 }
632 
StopLifeCycleTimer()633 int SQLiteRelationalStore::StopLifeCycleTimer()
634 {
635     auto runtimeCxt = RuntimeContext::GetInstance();
636     if (runtimeCxt == nullptr) {
637         return -E_INVALID_ARGS;
638     }
639     if (lifeTimerId_ != 0) {
640         TimerId timerId = lifeTimerId_;
641         lifeTimerId_ = 0;
642         runtimeCxt->RemoveTimer(timerId, false);
643     }
644     return E_OK;
645 }
646 
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)647 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier)
648 {
649     auto runtimeCxt = RuntimeContext::GetInstance();
650     if (runtimeCxt == nullptr) {
651         return -E_INVALID_ARGS;
652     }
653     RefObject::IncObjRef(this);
654     TimerId timerId = 0;
655     int errCode = runtimeCxt->SetTimer(
656         DBConstant::DEF_LIFE_CYCLE_TIME,
657         [this](TimerId id) -> int {
658             std::lock_guard<std::mutex> lock(lifeCycleMutex_);
659             if (lifeCycleNotifier_) {
660                 // normal identifier mode
661                 std::string identifier;
662                 if (sqliteStorageEngine_->GetProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
663                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(
664                         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
665                 } else {
666                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
667                 }
668                 auto userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
669                 lifeCycleNotifier_(identifier, userId);
670             }
671             return 0;
672         },
673         [this]() {
674             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
675             if (ret != E_OK) {
676                 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
677             }
678         },
679         timerId);
680     if (errCode != E_OK) {
681         lifeTimerId_ = 0;
682         LOGE("SetTimer failed:%d", errCode);
683         RefObject::DecObjRef(this);
684         return errCode;
685     }
686 
687     lifeCycleNotifier_ = notifier;
688     lifeTimerId_ = timerId;
689     return E_OK;
690 }
691 
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)692 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier)
693 {
694     int errCode;
695     {
696         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
697         if (lifeTimerId_ != 0) {
698             errCode = StopLifeCycleTimer();
699             if (errCode != E_OK) {
700                 LOGE("Stop the life cycle timer failed:%d", errCode);
701                 return errCode;
702             }
703         }
704 
705         if (!notifier) {
706             return E_OK;
707         }
708         errCode = StartLifeCycleTimer(notifier);
709         if (errCode != E_OK) {
710             LOGE("Register life cycle timer failed:%d", errCode);
711             return errCode;
712         }
713     }
714     auto listener = [this] { HeartBeat(); };
715     storageEngine_->RegisterHeartBeatListener(listener);
716     return errCode;
717 }
718 
HeartBeat()719 void SQLiteRelationalStore::HeartBeat()
720 {
721     std::lock_guard<std::mutex> lock(lifeCycleMutex_);
722     int errCode = ResetLifeCycleTimer();
723     if (errCode != E_OK) {
724         LOGE("Heart beat for life cycle failed:%d", errCode);
725     }
726 }
727 
ResetLifeCycleTimer()728 int SQLiteRelationalStore::ResetLifeCycleTimer()
729 {
730     if (lifeTimerId_ == 0) {
731         return E_OK;
732     }
733     auto lifeNotifier = lifeCycleNotifier_;
734     lifeCycleNotifier_ = nullptr;
735     int errCode = StopLifeCycleTimer();
736     if (errCode != E_OK) {
737         LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
738     }
739     return StartLifeCycleTimer(lifeNotifier);
740 }
741 
GetStorePath() const742 std::string SQLiteRelationalStore::GetStorePath() const
743 {
744     return sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
745 }
746 
GetProperties() const747 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
748 {
749     return sqliteStorageEngine_->GetProperties();
750 }
751 
StopSync(uint64_t connectionId)752 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
753 {
754     return syncAbleEngine_->StopSync(connectionId);
755 }
756 
Dump(int fd)757 void SQLiteRelationalStore::Dump(int fd)
758 {
759     std::string userId = "";
760     std::string appId = "";
761     std::string storeId = "";
762     std::string label = "";
763     if (sqliteStorageEngine_ != nullptr) {
764         userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
765         appId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, "");
766         storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
767         label = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
768     }
769     label = DBCommon::TransferStringToHex(label);
770     DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
771         storeId.c_str(), label.c_str());
772     if (syncAbleEngine_ != nullptr) {
773         syncAbleEngine_->Dump(fd);
774     }
775 }
776 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)777 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
778     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
779 {
780     if (sqliteStorageEngine_ == nullptr) {
781         return -E_INVALID_DB;
782     }
783     if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
784         LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
785         return -E_MAX_LIMITS;
786     }
787 
788     if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
789         LOGW("not a distributed relational store.");
790         return -E_NOT_SUPPORT;
791     }
792     const auto &properties = sqliteStorageEngine_->GetProperties();
793     int tableMode =
794         properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE);
795     if (tableMode != DistributedTableMode::SPLIT_BY_DEVICE) {
796         LOGW("only support split mode.");
797         return -E_NOT_SUPPORT;
798     }
799 
800     // Check whether to be able to operate the db.
801     int errCode = E_OK;
802     auto *handle = GetHandle(false, errCode);
803     if (handle == nullptr) {
804         return errCode;
805     }
806     errCode = handle->CheckEncryptedOrCorrupted();
807     ReleaseHandle(handle);
808     if (errCode != E_OK) {
809         return errCode;
810     }
811 
812     return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
813 }
814 
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)815 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
816 {
817     std::set<std::string> devices;
818     int errCode = GetExistDevices(devices);
819     if (errCode != E_OK) {
820         return errCode;
821     }
822     for (const auto &tableName : tableNameList) {
823         for (const auto &device : devices) {
824             errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
825             if (errCode != E_OK) {
826                 return errCode;
827             }
828         }
829     }
830     return E_OK;
831 }
832 
GetDevTableName(const std::string & device,const std::string & hashDev) const833 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
834 {
835     std::string devTableName;
836     StoreInfo info = { sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
837         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
838         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "") };
839     if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
840         devTableName = hashDev;
841     }
842     return devTableName;
843 }
844 
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const845 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
846 {
847     int errCode = E_OK;
848     handle = GetHandle(true, errCode);
849     if (handle == nullptr) {
850         LOGE("get handle failed %d", errCode);
851         return errCode;
852     }
853 
854     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
855     if (errCode != E_OK) {
856         LOGE("start transaction failed %d", errCode);
857         ReleaseHandle(handle);
858     }
859     return errCode;
860 }
861 
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)862 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
863     const std::string &tableName, bool isNeedHash)
864 {
865     std::string hashHexDev;
866     std::string hashDev;
867     std::string devTableName;
868     if (!isNeedHash) {
869         // if is not need hash mappingDev mean hash(uuid) device is param device
870         hashHexDev = DBCommon::TransferStringToHex(mappingDev);
871         hashDev = mappingDev;
872         devTableName = device;
873     } else {
874         // if is need hash mappingDev mean uuid
875         hashDev = DBCommon::TransferHashString(mappingDev);
876         hashHexDev = DBCommon::TransferStringToHex(hashDev);
877         devTableName = GetDevTableName(mappingDev, hashHexDev);
878     }
879     // erase watermark first
880     int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
881     if (errCode != E_OK) {
882         LOGE("erase watermark failed %d", errCode);
883         return errCode;
884     }
885     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
886     errCode = GetHandleAndStartTransaction(handle);
887     if (handle == nullptr) {
888         return errCode;
889     }
890 
891     errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
892     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
893     if (errCode != E_OK) {
894         LOGE("delete device data failed. %d", errCode);
895         tables.clear();
896     }
897 
898     for (const auto &it : tables) {
899         if (tableName.empty() || it.second.GetTableName() == tableName) {
900             errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
901             if (errCode != E_OK) {
902                 LOGE("delete device data failed. %d", errCode);
903                 break;
904             }
905         }
906     }
907 
908     if (errCode != E_OK) {
909         (void)handle->Rollback();
910         ReleaseHandle(handle);
911         return errCode;
912     }
913     errCode = handle->Commit();
914     ReleaseHandle(handle);
915     storageEngine_->NotifySchemaChanged();
916     return errCode;
917 }
918 
GetExistDevices(std::set<std::string> & hashDevices) const919 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
920 {
921     int errCode = E_OK;
922     auto *handle = GetHandle(true, errCode);
923     if (handle == nullptr) {
924         LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
925         return errCode;
926     }
927     errCode = handle->GetExistsDeviceList(hashDevices);
928     if (errCode != E_OK) {
929         LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
930     }
931     ReleaseHandle(handle);
932     return errCode;
933 }
934 
GetAllDistributedTableName()935 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName()
936 {
937     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
938     std::vector<std::string> tableNames;
939     for (const auto &table : tables) {
940         if (table.second.GetTableSyncType() == TableSyncType::CLOUD_COOPERATION) {
941             continue;
942         }
943         tableNames.push_back(table.second.GetTableName());
944     }
945     return tableNames;
946 }
947 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)948 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
949 {
950     if (cloudSyncer_ == nullptr) {
951         LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
952         return -E_INVALID_DB;
953     }
954     cloudSyncer_->SetCloudDB(cloudDb);
955     return E_OK;
956 }
957 
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)958 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
959     std::vector<Field> &addFields)
960 {
961     for (const auto &newField : newFields) {
962         if (equalFields.find(newField.colName) == equalFields.end()) {
963             addFields.push_back(newField);
964         }
965     }
966 }
967 
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)968 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
969     std::vector<Field> &addFields)
970 {
971     std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
972     if (newFields.size() < oldFields.size()) {
973         return false;
974     }
975     std::set<std::string> equalFields;
976     for (const auto &oldField : oldFields) {
977         bool isFieldExist = false;
978         for (const auto &newField : newFields) {
979             if (newField.colName != oldField.GetFieldName()) {
980                 continue;
981             }
982             isFieldExist = true;
983             int32_t type = newField.type;
984             // Field type need to match storage type
985             // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
986             // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
987             if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
988                 type++; // storage type - field type = 1
989             } else if (type == TYPE_INDEX<bool>) {
990                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
991             } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
992                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
993             }
994             auto primaryKeyMap = tableInfo.GetPrimaryKey();
995             auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
996                 [&newField](const std::map<int, std::string>::value_type &pair) {
997                     return pair.second == newField.colName;
998                 });
999             if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
1000                 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
1001                 return false;
1002             }
1003             equalFields.insert(newField.colName);
1004         }
1005         if (!isFieldExist) {
1006             return false;
1007         }
1008     }
1009     AddFields(newFields, equalFields, addFields);
1010     return true;
1011 }
1012 
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)1013 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
1014     std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
1015 {
1016     std::set<std::string> tableNames;
1017     std::map<std::string, std::string> sharedTableNamesMap;
1018     std::map<std::string, std::vector<Field>> fieldsMap;
1019     for (const auto &table : schema.tables) {
1020         tableNames.insert(table.name);
1021         sharedTableNamesMap[table.name] = table.sharedTableName;
1022         std::vector<Field> fields = table.fields;
1023         bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
1024         Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
1025         Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
1026         fields.insert(fields.begin(), privilegeField);
1027         fields.insert(fields.begin(), ownerField);
1028         fieldsMap[table.name] = fields;
1029     }
1030 
1031     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1032     TableInfoMap tableList = localSchema.GetTables();
1033     for (const auto &tableInfo : tableList) {
1034         if (!tableInfo.second.GetSharedTableMark()) {
1035             continue;
1036         }
1037         std::string oldSharedTableName = tableInfo.second.GetTableName();
1038         std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1039         std::vector<Field> addFields;
1040         if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1041             deleteTableNames.push_back(oldSharedTableName);
1042         } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1043             deleteTableNames.push_back(oldSharedTableName);
1044         } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1045             if (!addFields.empty()) {
1046                 updateTableNames[oldSharedTableName] = addFields;
1047             }
1048             if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1049                 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1050             }
1051         } else {
1052             return false;
1053         }
1054     }
1055     return true;
1056 }
1057 
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1058 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1059 {
1060     if (storageEngine_ == nullptr) {
1061         LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1062         return -E_INVALID_DB;
1063     }
1064     int errCode = CheckCloudSchema(schema);
1065     if (errCode != E_OK) {
1066         return errCode;
1067     }
1068     // delete, update and create shared table and its distributed table
1069     errCode = ExecuteCreateSharedTable(schema);
1070     if (errCode != E_OK) {
1071         LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1072         return errCode;
1073     }
1074     return storageEngine_->SetCloudDbSchema(schema);
1075 }
1076 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1077 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1078 {
1079     if (cloudSyncer_ == nullptr) {
1080         LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1081         return -E_INVALID_DB;
1082     }
1083     cloudSyncer_->SetIAssetLoader(loader);
1084     return E_OK;
1085 }
1086 
ChkSchema(const TableName & tableName)1087 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1088 {
1089     // check schema is ok
1090     int errCode = E_OK;
1091     auto *handle = GetHandle(false, errCode);
1092     if (handle == nullptr) {
1093         LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1094         return errCode;
1095     }
1096     errCode = handle->CompareSchemaTableColumns(tableName);
1097     ReleaseHandle(handle);
1098     if (errCode != E_OK) {
1099         LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1100         return errCode;
1101     }
1102     if (storageEngine_ == nullptr) {
1103         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1104         return -E_INVALID_DB;
1105     }
1106     return storageEngine_->ChkSchema(tableName);
1107 }
1108 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1109 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1110 {
1111     if (storageEngine_ == nullptr) {
1112         LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1113         return -E_INVALID_DB;
1114     }
1115     int errCode = CheckBeforeSync(option);
1116     if (errCode != E_OK) {
1117         return errCode;
1118     }
1119     LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1120     if (option.compensatedSyncOnly) {
1121         CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1122         info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1123         cloudSyncer_->GenerateCompensatedSync(info);
1124         return E_OK;
1125     }
1126     CloudSyncer::CloudTaskInfo info;
1127     FillSyncInfo(option, onProcess, info);
1128     auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1129     if (ret != E_OK) {
1130         return ret;
1131     }
1132     ret = ReFillSyncInfoTable(table, info);
1133     if (ret != E_OK) {
1134         return ret;
1135     }
1136     info.taskId = taskId;
1137     errCode = cloudSyncer_->Sync(info);
1138     return errCode;
1139 }
1140 
CheckBeforeSync(const CloudSyncOption & option)1141 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1142 {
1143     if (cloudSyncer_ == nullptr) {
1144         LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1145         return -E_INVALID_DB;
1146     }
1147     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1148         return -E_INVALID_ARGS;
1149     }
1150     int errCode = CheckQueryValid(option);
1151     if (errCode != E_OK) {
1152         return errCode;
1153     }
1154     SecurityOption securityOption;
1155     errCode = storageEngine_->GetSecurityOption(securityOption);
1156     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1157         return -E_SECURITY_OPTION_CHECK_ERROR;
1158     }
1159     if (errCode == E_OK && securityOption.securityLabel == S4) {
1160         return -E_SECURITY_OPTION_CHECK_ERROR;
1161     }
1162     return E_OK;
1163 }
1164 
CheckQueryValid(const CloudSyncOption & option)1165 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1166 {
1167     if (option.compensatedSyncOnly) {
1168         return E_OK;
1169     }
1170     QuerySyncObject syncObject(option.query);
1171     int errCode = syncObject.GetValidStatus();
1172     if (errCode != E_OK) {
1173         LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1174         return errCode;
1175     }
1176     std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1177     bool isFromTable = object.empty();
1178     if (!option.priorityTask && !isFromTable) {
1179         LOGE("[RelationalStore] not support normal sync with query");
1180         return -E_NOT_SUPPORT;
1181     }
1182     const auto tableNames = syncObject.GetRelationTableNames();
1183     for (const auto &tableName : tableNames) {
1184         QuerySyncObject querySyncObject;
1185         querySyncObject.SetTableName(tableName);
1186         object.push_back(querySyncObject);
1187     }
1188     std::vector<std::string> syncTableNames;
1189     for (const auto &item : object) {
1190         std::string tableName = item.GetRelationTableName();
1191         syncTableNames.emplace_back(tableName);
1192     }
1193     errCode = CheckTableName(syncTableNames);
1194     if (errCode != E_OK) {
1195         return errCode;
1196     }
1197     return CheckObjectValid(option.priorityTask, object, isFromTable);
1198 }
1199 
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1200 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1201     bool isFromTable)
1202 {
1203     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1204     for (const auto &item : object) {
1205         if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1206             LOGE("[RelationalStore] not support priority sync with full table");
1207             return -E_INVALID_ARGS;
1208         }
1209         int errCode = storageEngine_->CheckQueryValid(item);
1210         if (errCode != E_OK) {
1211             return errCode;
1212         }
1213         if (!priorityTask || isFromTable) {
1214             continue;
1215         }
1216         if (!item.IsInValueOutOfLimit()) {
1217             LOGE("[RelationalStore] not support priority sync in count out of limit");
1218             return -E_MAX_LIMITS;
1219         }
1220         std::string tableName = item.GetRelationTableName();
1221         TableInfo tableInfo = localSchema.GetTable(tableName);
1222         if (!tableInfo.Empty()) {
1223             const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1224             errCode = item.CheckPrimaryKey(primaryKeyMap);
1225             if (errCode != E_OK) {
1226                 return errCode;
1227             }
1228         }
1229     }
1230     return E_OK;
1231 }
1232 
CheckTableName(const std::vector<std::string> & tableNames)1233 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1234 {
1235     if (tableNames.empty()) {
1236         LOGE("[RelationalStore] sync with empty table");
1237         return -E_INVALID_ARGS;
1238     }
1239     for (const auto &table : tableNames) {
1240         int errCode = ChkSchema(table);
1241         if (errCode != E_OK) {
1242             LOGE("[RelationalStore] schema check failed when sync");
1243             return errCode;
1244         }
1245     }
1246     return E_OK;
1247 }
1248 
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1249 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1250     CloudSyncer::CloudTaskInfo &info)
1251 {
1252     auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1253     if (syncObject.empty()) {
1254         QuerySyncObject querySyncObject(option.query);
1255         info.table = querySyncObject.GetRelationTableNames();
1256         for (const auto &item : info.table) {
1257             QuerySyncObject object(Query::Select());
1258             object.SetTableName(item);
1259             info.queryList.push_back(object);
1260         }
1261     } else {
1262         for (auto &item : syncObject) {
1263             info.table.push_back(item.GetRelationTableName());
1264             info.queryList.push_back(std::move(item));
1265         }
1266     }
1267     info.devices = option.devices;
1268     info.mode = option.mode;
1269     info.callback = onProcess;
1270     info.timeout = option.waitTime;
1271     info.priorityTask = option.priorityTask;
1272     info.compensatedTask = option.compensatedSyncOnly;
1273     info.users.push_back("");
1274     info.lockAction = option.lockAction;
1275     info.merge = option.merge;
1276     info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1277     info.prepareTraceId = option.prepareTraceId;
1278 }
1279 
SetTrackerTable(const TrackerSchema & trackerSchema)1280 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1281 {
1282     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1283     TableInfo tableInfo = localSchema.GetTable(trackerSchema.tableName);
1284     if (tableInfo.Empty()) {
1285         return sqliteStorageEngine_->SetTrackerTable(trackerSchema);
1286     }
1287     bool isFirstCreate = false;
1288     int errCode = sqliteStorageEngine_->CheckAndCacheTrackerSchema(trackerSchema, tableInfo, isFirstCreate);
1289     if (errCode != E_OK) {
1290         return errCode == -E_IGNORE_DATA ? E_OK : errCode;
1291     }
1292     errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1293     if (errCode != E_OK) {
1294         return errCode;
1295     }
1296     return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1297 }
1298 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1299 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1300 {
1301     if (condition.sql.empty()) {
1302         LOGE("[RelationalStore] execute sql is empty.");
1303         return -E_INVALID_ARGS;
1304     }
1305     return sqliteStorageEngine_->ExecuteSql(condition, records);
1306 }
1307 
CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor * & handle,std::set<std::string> & clearWaterMarkTable)1308 int SQLiteRelationalStore::CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle,
1309     std::set<std::string> &clearWaterMarkTable)
1310 {
1311     int errCode = E_OK;
1312     for (const auto &tableName : clearWaterMarkTable) {
1313         std::string cloudWaterMark;
1314         Value blobMetaVal;
1315         errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1316         if (errCode != E_OK) {
1317             LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1318             return errCode;
1319         }
1320         errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1321         if (errCode != E_OK) {
1322             LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1323             return errCode;
1324         }
1325         errCode = handle->CleanUploadFinishedFlag(tableName);
1326         if (errCode != E_OK) {
1327             LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1328             return errCode;
1329         }
1330     }
1331     errCode = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTable);
1332     if (errCode != E_OK) {
1333         LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1334     }
1335     return errCode;
1336 }
1337 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1338 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1339 {
1340     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1341     int errCode = GetHandleAndStartTransaction(handle);
1342     if (errCode != E_OK) {
1343         LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1344         return errCode;
1345     }
1346     std::set<std::string> clearWaterMarkTables;
1347     RelationalSchemaObject schema;
1348     errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1349     if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1350         LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1351         (void)handle->Rollback();
1352         ReleaseHandle(handle);
1353         return errCode;
1354     }
1355 
1356     if (!clearWaterMarkTables.empty()) {
1357         storageEngine_->SetReusedHandle(handle);
1358         int ret = CleanWaterMark(handle, clearWaterMarkTables);
1359         if (ret != E_OK) {
1360             LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", ret);
1361             storageEngine_->SetReusedHandle(nullptr);
1362             (void)handle->Rollback();
1363             ReleaseHandle(handle);
1364             return ret;
1365         }
1366         storageEngine_->SetReusedHandle(nullptr);
1367         LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1368     }
1369 
1370     int ret = handle->Commit();
1371     ReleaseHandle(handle);
1372     if (ret == E_OK) {
1373         sqliteStorageEngine_->SetSchema(schema);
1374         return errCode;
1375     }
1376     LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1377     return ret;
1378 }
1379 
InitTrackerSchemaFromMeta()1380 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1381 {
1382     int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1383     return errCode == -E_NOT_FOUND ? E_OK : errCode;
1384 }
1385 
CleanTrackerData(const std::string & tableName,int64_t cursor)1386 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1387 {
1388     if (tableName.empty()) {
1389         return -E_INVALID_ARGS;
1390     }
1391     return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1392 }
1393 
ExecuteCreateSharedTable(const DataBaseSchema & schema)1394 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1395 {
1396     if (sqliteStorageEngine_ == nullptr) {
1397         LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1398         return -E_INVALID_DB;
1399     }
1400     std::vector<std::string> deleteTableNames;
1401     std::map<std::string, std::vector<Field>> updateTableNames;
1402     std::map<std::string, std::string> alterTableNames;
1403     if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1404         LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1405         return -E_INVALID_ARGS;
1406     }
1407     LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1408     // upgrade contains delete, alter, update and create
1409     int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1410     if (errCode != E_OK) {
1411         LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1412     } else {
1413         LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1414     }
1415     return errCode;
1416 }
1417 
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1418 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1419     CloudSyncer::CloudTaskInfo &info)
1420 {
1421     if (info.priorityTask && actualTable.size() != info.table.size()) {
1422         LOGE("[RelationalStore] Not support regenerate table with priority task");
1423         return -E_NOT_SUPPORT;
1424     }
1425     if (actualTable.size() == info.table.size()) {
1426         return E_OK;
1427     }
1428     LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1429     info.table = actualTable;
1430     info.queryList.clear();
1431     for (const auto &item : info.table) {
1432         QuerySyncObject object(Query::Select());
1433         object.SetTableName(item);
1434         info.queryList.push_back(object);
1435     }
1436     return E_OK;
1437 }
1438 
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1439 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1440 {
1441     if (cmd != LOGIC_DELETE_SYNC_DATA) {
1442         return -E_NOT_SUPPORT;
1443     }
1444     if (pragmaData == nullptr) {
1445         return -E_INVALID_ARGS;
1446     }
1447     auto logicDelete = *(static_cast<bool *>(pragmaData));
1448     if (storageEngine_ == nullptr) {
1449         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1450         return -E_INVALID_DB;
1451     }
1452     storageEngine_->SetLogicDelete(logicDelete);
1453     return E_OK;
1454 }
1455 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1456 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1457     const std::vector<VBucket> &records)
1458 {
1459     if (storageEngine_ == nullptr) {
1460         LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1461         return -E_INVALID_DB;
1462     }
1463     int errCode = CheckParamForUpsertData(status, tableName, records);
1464     if (errCode != E_OK) {
1465         return errCode;
1466     }
1467     return storageEngine_->UpsertData(status, tableName, records);
1468 }
1469 
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1470 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1471     const std::vector<VBucket> &records)
1472 {
1473     if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1474         LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1475         return -E_INVALID_ARGS;
1476     }
1477     if (records.empty()) {
1478         LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1479         return -E_INVALID_ARGS;
1480     }
1481     size_t recordSize = records.size();
1482     if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1483         LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1484         return -E_MAX_LIMITS;
1485     }
1486     return CheckSchemaForUpsertData(tableName, records);
1487 }
1488 
ChkTable(const TableInfo & table)1489 static int ChkTable(const TableInfo &table)
1490 {
1491     if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1492         LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1493         return -E_NOT_SUPPORT;
1494     }
1495     if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1496         return -E_NOT_FOUND;
1497     }
1498     return E_OK;
1499 }
1500 
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1501 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1502 {
1503     if (tableName.empty()) {
1504         return -E_INVALID_ARGS;
1505     }
1506     auto schema = storageEngine_->GetSchemaInfo();
1507     auto table = schema.GetTable(tableName);
1508     int errCode = ChkTable(table);
1509     if (errCode != E_OK) {
1510         return errCode;
1511     }
1512     TableSchema cloudTableSchema;
1513     errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1514     if (errCode != E_OK) {
1515         LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1516         return errCode;
1517     }
1518     errCode = ChkSchema(tableName);
1519     if (errCode != E_OK) {
1520         return errCode;
1521     }
1522     std::set<std::string> dbPkFields;
1523     for (auto &field : table.GetIdentifyKey()) {
1524         dbPkFields.insert(field);
1525     }
1526     std::set<std::string> schemaFields;
1527     for (auto &fieldInfo : table.GetFieldInfos()) {
1528         schemaFields.insert(fieldInfo.GetFieldName());
1529     }
1530     for (const auto &record : records) {
1531         std::set<std::string> recordPkFields;
1532         for (const auto &item : record) {
1533             if (schemaFields.find(item.first) == schemaFields.end()) {
1534                 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1535                 return -E_INVALID_ARGS;
1536             }
1537             if (dbPkFields.find(item.first) == dbPkFields.end()) {
1538                 continue;
1539             }
1540             recordPkFields.insert(item.first);
1541         }
1542         if (recordPkFields.size() != dbPkFields.size()) {
1543             LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1544                 recordPkFields.size(), dbPkFields.size());
1545             return -E_INVALID_ARGS;
1546         }
1547     }
1548     return errCode;
1549 }
1550 
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1551 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1552 {
1553     auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1554     if (engine == nullptr) {
1555         LOGE("[RelationalStore][Open] Create storage engine failed");
1556         return -E_OUT_OF_MEMORY;
1557     }
1558     sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1559         [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1560         RefObject::KillAndDecObjRef(releaseEngine);
1561     });
1562     return E_OK;
1563 }
1564 
CheckCloudSchema(const DataBaseSchema & schema)1565 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1566 {
1567     if (storageEngine_ == nullptr) {
1568         LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1569         return -E_INVALID_DB;
1570     }
1571     std::shared_ptr<DataBaseSchema> cloudSchema;
1572     (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1573     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1574     for (const auto &tableSchema : schema.tables) {
1575         TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1576         if (tableInfo.Empty()) {
1577             continue;
1578         }
1579         if (tableInfo.GetSharedTableMark()) {
1580             LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1581             return -E_INVALID_ARGS;
1582         }
1583     }
1584     for (const auto &tableSchema : schema.tables) {
1585         if (cloudSchema == nullptr) {
1586             continue;
1587         }
1588         for (const auto &oldSchema : cloudSchema->tables) {
1589             if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1590                 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1591                 return -E_INVALID_ARGS;
1592             }
1593         }
1594     }
1595     return E_OK;
1596 }
1597 
SetCloudSyncConfig(const CloudSyncConfig & config)1598 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1599 {
1600     if (storageEngine_ == nullptr) {
1601         LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1602         return -E_INVALID_DB;
1603     }
1604     storageEngine_->SetCloudSyncConfig(config);
1605     return E_OK;
1606 }
1607 
GetCloudTaskStatus(uint64_t taskId)1608 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1609 {
1610     return cloudSyncer_->GetCloudTaskStatus(taskId);
1611 }
1612 } //namespace DistributedDB
1613 #endif
1614