1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "sqlite_log_table_manager.h"
26 #include "sqlite_relational_store_connection.h"
27 #include "storage_engine_manager.h"
28 #include "cloud_sync_utils.h"
29
30 namespace DistributedDB {
31 namespace {
32 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
33 }
34
~SQLiteRelationalStore()35 SQLiteRelationalStore::~SQLiteRelationalStore()
36 {
37 sqliteStorageEngine_ = nullptr;
38 }
39
40 // Called when a new connection created.
IncreaseConnectionCounter()41 void SQLiteRelationalStore::IncreaseConnectionCounter()
42 {
43 connectionCount_.fetch_add(1, std::memory_order_seq_cst);
44 if (connectionCount_.load() > 0) {
45 sqliteStorageEngine_->SetConnectionFlag(true);
46 }
47 }
48
GetDBConnection(int & errCode)49 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
50 {
51 std::lock_guard<std::mutex> lock(connectMutex_);
52 RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
53 if (connection == nullptr) {
54 errCode = -E_OUT_OF_MEMORY;
55 return nullptr;
56 }
57 IncObjRef(this);
58 IncreaseConnectionCounter();
59 return connection;
60 }
61
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)62 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
63 {
64 option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
65 option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
66 if (properties.IsEncrypted()) {
67 option.cipherType = properties.GetCipherType();
68 option.passwd = properties.GetPasswd();
69 option.iterTimes = properties.GetIterTimes();
70 }
71 }
72
InitStorageEngine(const RelationalDBProperties & properties)73 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
74 {
75 OpenDbProperties option;
76 InitDataBaseOption(properties, option);
77 std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
78
79 StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
80 int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
81 if (errCode != E_OK) {
82 LOGE("Init the sqlite storage engine failed:%d", errCode);
83 }
84 return errCode;
85 }
86
ReleaseResources()87 void SQLiteRelationalStore::ReleaseResources()
88 {
89 if (sqliteStorageEngine_ != nullptr) {
90 sqliteStorageEngine_->ClearEnginePasswd();
91 sqliteStorageEngine_ = nullptr;
92 }
93 if (cloudSyncer_ != nullptr) {
94 cloudSyncer_->Close();
95 RefObject::KillAndDecObjRef(cloudSyncer_);
96 cloudSyncer_ = nullptr;
97 }
98 RefObject::DecObjRef(storageEngine_);
99 }
100
CheckDBMode()101 int SQLiteRelationalStore::CheckDBMode()
102 {
103 int errCode = E_OK;
104 auto *handle = GetHandle(true, errCode);
105 if (handle == nullptr) {
106 return errCode;
107 }
108 errCode = handle->CheckDBModeForRelational();
109 if (errCode != E_OK) {
110 LOGE("check relational DB mode failed. %d", errCode);
111 }
112
113 ReleaseHandle(handle);
114 return errCode;
115 }
116
GetSchemaFromMeta(RelationalSchemaObject & schema)117 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
118 {
119 Key schemaKey;
120 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
121 Value schemaVal;
122 int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
123 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
124 LOGE("Get relational schema from meta table failed. %d", errCode);
125 return errCode;
126 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
127 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
128 return -E_NOT_FOUND;
129 }
130
131 std::string schemaStr;
132 DBCommon::VectorToString(schemaVal, schemaStr);
133 errCode = schema.ParseFromSchemaString(schemaStr);
134 if (errCode != E_OK) {
135 LOGE("Parse schema string from meta table failed.");
136 return errCode;
137 }
138
139 sqliteStorageEngine_->SetSchema(schema);
140 return E_OK;
141 }
142
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)143 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
144 {
145 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
146 Value modeVal;
147 int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
148 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
149 LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
150 return errCode;
151 }
152
153 DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
154 if (!modeVal.empty()) {
155 std::string value(modeVal.begin(), modeVal.end());
156 orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
157 } else if (isUnSet) {
158 return E_OK; // First set table mode.
159 }
160
161 if (orgMode != mode) {
162 LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
163 return -E_INVALID_ARGS;
164 }
165 return E_OK;
166 }
167
CheckProperties(RelationalDBProperties properties)168 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
169 {
170 RelationalSchemaObject schema;
171 int errCode = GetSchemaFromMeta(schema);
172 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
173 LOGE("Get relational schema from meta failed. errcode=%d", errCode);
174 return errCode;
175 }
176 int ret = InitTrackerSchemaFromMeta();
177 if (ret != E_OK) {
178 LOGE("Init tracker schema from meta failed. errcode=%d", ret);
179 return ret;
180 }
181 properties.SetSchema(schema);
182
183 // Empty schema means no distributed table has been used, we may set DB to any table mode
184 // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
185 bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
186 auto mode = static_cast<DistributedTableMode>(
187 properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
188 errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
189 if (errCode != E_OK) {
190 LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
191 return errCode;
192 }
193 if (!isSchemaEmpty) {
194 return errCode;
195 }
196
197 errCode = SaveTableModeToMeta(mode);
198 if (errCode != E_OK) {
199 LOGE("Save table mode to meta failed. errCode=%d", errCode);
200 return errCode;
201 }
202
203 return E_OK;
204 }
205
SaveSchemaToMeta()206 int SQLiteRelationalStore::SaveSchemaToMeta()
207 {
208 Key schemaKey;
209 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
210 Value schemaVal;
211 DBCommon::StringToVector(sqliteStorageEngine_->GetSchema().ToSchemaString(), schemaVal);
212 int errCode = storageEngine_->PutMetaData(schemaKey, schemaVal);
213 if (errCode != E_OK) {
214 LOGE("Save relational schema to meta table failed. %d", errCode);
215 }
216 return errCode;
217 }
218
SaveTableModeToMeta(DistributedTableMode mode)219 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
220 {
221 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
222 Value modeVal;
223 DBCommon::StringToVector(std::to_string(mode), modeVal);
224 int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
225 if (errCode != E_OK) {
226 LOGE("Save relational schema to meta table failed. %d", errCode);
227 }
228 return errCode;
229 }
230
SaveLogTableVersionToMeta()231 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
232 {
233 LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
234 const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY.begin(), DBConstant::LOG_TABLE_VERSION_KEY.end());
235 Value logVersion;
236 int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
237 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
238 LOGE("Get log version from meta table failed. %d", errCode);
239 return errCode;
240 }
241 std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
242 Value logVersionVal(versionStr.begin(), versionStr.end());
243 // log version is same, no need to update
244 if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
245 return errCode;
246 }
247 // If the log version does not exist or is different, update the log version
248 errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
249 if (errCode != E_OK) {
250 LOGE("save log table version to meta table failed. %d", errCode);
251 }
252 return errCode;
253 }
254
CleanDistributedDeviceTable()255 int SQLiteRelationalStore::CleanDistributedDeviceTable()
256 {
257 std::vector<std::string> missingTables;
258 int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
259 if (errCode != E_OK) {
260 LOGE("Clean distributed device table failed. %d", errCode);
261 }
262 for (const auto &deviceTableName : missingTables) {
263 std::string deviceHash;
264 std::string tableName;
265 DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
266 syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
267 if (errCode != E_OK) {
268 LOGE("Erase water mark failed:%d", errCode);
269 return errCode;
270 }
271 }
272 return errCode;
273 }
274
Open(const RelationalDBProperties & properties)275 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
276 {
277 std::lock_guard<std::mutex> lock(initalMutex_);
278 if (isInitialized_) {
279 LOGD("[RelationalStore][Open] relational db was already initialized.");
280 return E_OK;
281 }
282 int errCode = InitSQLiteStorageEngine(properties);
283 if (errCode != E_OK) {
284 return errCode;
285 }
286
287 do {
288 errCode = InitStorageEngine(properties);
289 if (errCode != E_OK) {
290 LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
291 break;
292 }
293
294 storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
295 if (storageEngine_ == nullptr) {
296 LOGE("[RelationalStore][Open] Create syncable storage failed");
297 errCode = -E_OUT_OF_MEMORY;
298 break;
299 }
300
301 syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
302 // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
303 // be destructed when close store
304 storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
305 cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
306
307 errCode = CheckDBMode();
308 if (errCode != E_OK) {
309 break;
310 }
311
312 errCode = CheckProperties(properties);
313 if (errCode != E_OK) {
314 break;
315 }
316
317 errCode = SaveLogTableVersionToMeta();
318 if (errCode != E_OK) {
319 break;
320 }
321
322 errCode = CleanDistributedDeviceTable();
323 if (errCode != E_OK) {
324 break;
325 }
326
327 isInitialized_ = true;
328 return E_OK;
329 } while (false);
330
331 ReleaseResources();
332 return errCode;
333 }
334
OnClose(const std::function<void (void)> & notifier)335 void SQLiteRelationalStore::OnClose(const std::function<void(void)> ¬ifier)
336 {
337 AutoLock lockGuard(this);
338 if (notifier) {
339 closeNotifiers_.push_back(notifier);
340 } else {
341 LOGW("Register 'Close()' notifier failed, notifier is null.");
342 }
343 }
344
GetHandle(bool isWrite,int & errCode) const345 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
346 {
347 if (sqliteStorageEngine_ == nullptr) {
348 errCode = -E_INVALID_DB;
349 return nullptr;
350 }
351
352 return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
353 sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
354 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const355 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
356 {
357 if (handle == nullptr) {
358 return;
359 }
360
361 if (sqliteStorageEngine_ != nullptr) {
362 StorageExecutor *databaseHandle = handle;
363 sqliteStorageEngine_->Recycle(databaseHandle);
364 handle = nullptr;
365 }
366 }
367
Sync(const ISyncer::SyncParma & syncParam,uint64_t connectionId)368 int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId)
369 {
370 return syncAbleEngine_->Sync(syncParam, connectionId);
371 }
372
373 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)374 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
375 {
376 int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
377 if (count <= 0) {
378 LOGF("Decrease db connection counter failed, count <= 0.");
379 return;
380 }
381 if (storageEngine_ != nullptr) {
382 storageEngine_->EraseDataChangeCallback(connectionId);
383 }
384 if (count != 1) {
385 return;
386 }
387
388 LockObj();
389 auto notifiers = std::move(closeNotifiers_);
390 UnlockObj();
391 for (const auto ¬ifier : notifiers) {
392 if (notifier) {
393 notifier();
394 }
395 }
396
397 // Sync Close
398 syncAbleEngine_->Close();
399
400 if (cloudSyncer_ != nullptr) {
401 cloudSyncer_->Close();
402 RefObject::KillAndDecObjRef(cloudSyncer_);
403 cloudSyncer_ = nullptr;
404 }
405
406 if (sqliteStorageEngine_ != nullptr) {
407 sqliteStorageEngine_ = nullptr;
408 }
409 {
410 if (storageEngine_ != nullptr) {
411 storageEngine_->RegisterHeartBeatListener(nullptr);
412 }
413 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
414 StopLifeCycleTimer();
415 lifeCycleNotifier_ = nullptr;
416 }
417 // close will dec sync ref of storageEngine_
418 DecObjRef(storageEngine_);
419 }
420
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)421 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
422 {
423 if (connectionCount_.load() == 1) {
424 sqliteStorageEngine_->SetConnectionFlag(false);
425 }
426
427 connectMutex_.lock();
428 if (connection != nullptr) {
429 KillAndDecObjRef(connection);
430 DecreaseConnectionCounter(connectionId);
431 connectMutex_.unlock();
432 KillAndDecObjRef(this);
433 } else {
434 connectMutex_.unlock();
435 }
436 }
437
WakeUpSyncer()438 void SQLiteRelationalStore::WakeUpSyncer()
439 {
440 syncAbleEngine_->WakeUpSyncer();
441 }
442
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)443 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
444 bool trackerSchemaChanged)
445 {
446 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
447 TableInfo tableInfo = localSchema.GetTable(tableName);
448 if (!tableInfo.Empty()) {
449 bool isSharedTable = tableInfo.GetSharedTableMark();
450 if (isSharedTable && !trackerSchemaChanged) {
451 return E_OK; // shared table will create distributed table when use SetCloudDbSchema
452 }
453 }
454
455 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
456 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
457
458 std::string localIdentity; // collaboration mode need local identify
459 if (mode == DistributedTableMode::COLLABORATION) {
460 int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
461 if (errCode != E_OK || localIdentity.empty()) {
462 LOGD("Get local identity failed, can not create.");
463 return -E_NOT_SUPPORT;
464 }
465 }
466
467 bool schemaChanged = false;
468 int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(localIdentity),
469 schemaChanged, syncType, trackerSchemaChanged);
470 if (errCode != E_OK) {
471 LOGE("Create distributed table failed. %d", errCode);
472 }
473 if (schemaChanged) {
474 LOGD("Notify schema changed.");
475 storageEngine_->NotifySchemaChanged();
476 }
477 return errCode;
478 }
479
GetCloudSyncTaskCount()480 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
481 {
482 if (cloudSyncer_ == nullptr) {
483 LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
484 return -1;
485 }
486 return cloudSyncer_->GetCloudSyncTaskCount();
487 }
488
CleanCloudData(ClearMode mode)489 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
490 {
491 auto tableMode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
492 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
493 if (tableMode == DistributedTableMode::COLLABORATION) {
494 LOGE("Not support remove device data in collaboration mode.");
495 return -E_NOT_SUPPORT;
496 }
497 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
498 TableInfoMap tables = localSchema.GetTables();
499 std::vector<std::string> cloudTableNameList;
500 for (const auto &tableInfo : tables) {
501 bool isSharedTable = tableInfo.second.GetSharedTableMark();
502 if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
503 continue;
504 }
505 if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
506 cloudTableNameList.push_back(tableInfo.first);
507 }
508 }
509 if (cloudTableNameList.empty()) {
510 LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
511 return E_OK;
512 }
513 if (cloudSyncer_ == nullptr) {
514 LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
515 return -E_INVALID_DB;
516 }
517 int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
518 if (errCode != E_OK) {
519 LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
520 }
521
522 return errCode;
523 }
524
RemoveDeviceData()525 int SQLiteRelationalStore::RemoveDeviceData()
526 {
527 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
528 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
529 if (mode == DistributedTableMode::COLLABORATION) {
530 LOGE("Not support remove device data in collaboration mode.");
531 return -E_NOT_SUPPORT;
532 }
533
534 std::vector<std::string> tableNameList = GetAllDistributedTableName();
535 if (tableNameList.empty()) {
536 return E_OK;
537 }
538 // erase watermark first
539 int errCode = EraseAllDeviceWatermark(tableNameList);
540 if (errCode != E_OK) {
541 LOGE("remove watermark failed %d", errCode);
542 return errCode;
543 }
544 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
545 errCode = GetHandleAndStartTransaction(handle);
546 if (handle == nullptr) {
547 return errCode;
548 }
549
550 for (const auto &table : tableNameList) {
551 errCode = handle->DeleteDistributedDeviceTable("", table);
552 if (errCode != E_OK) {
553 LOGE("delete device data failed. %d", errCode);
554 break;
555 }
556
557 errCode = handle->DeleteDistributedAllDeviceTableLog(table);
558 if (errCode != E_OK) {
559 LOGE("delete device data failed. %d", errCode);
560 break;
561 }
562 }
563
564 if (errCode != E_OK) {
565 (void)handle->Rollback();
566 ReleaseHandle(handle);
567 return errCode;
568 }
569
570 errCode = handle->Commit();
571 ReleaseHandle(handle);
572 storageEngine_->NotifySchemaChanged();
573 return errCode;
574 }
575
RemoveDeviceData(const std::string & device,const std::string & tableName)576 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
577 {
578 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
579 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
580 if (mode == DistributedTableMode::COLLABORATION) {
581 LOGE("Not support remove device data in collaboration mode.");
582 return -E_NOT_SUPPORT;
583 }
584
585 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
586 auto iter = tables.find(tableName);
587 if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
588 LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
589 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
590 }
591 // cloud mode is not permit
592 if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
593 LOGE("Remove device data with cloud sync table name.");
594 return -E_NOT_SUPPORT;
595 }
596 bool isNeedHash = false;
597 std::string hashDeviceId;
598 int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
599 if (errCode == -E_NOT_SUPPORT) {
600 isNeedHash = true;
601 hashDeviceId = device;
602 errCode = E_OK;
603 }
604 if (errCode != E_OK) {
605 return errCode;
606 }
607 if (isNeedHash) {
608 // check device is uuid in meta
609 std::set<std::string> hashDevices;
610 errCode = GetExistDevices(hashDevices);
611 if (errCode != E_OK) {
612 return errCode;
613 }
614 if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
615 LOGD("[SQLiteRelationalStore] not match device, just return");
616 return E_OK;
617 }
618 }
619 return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
620 }
621
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)622 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
623 const RelationalObserverAction &action)
624 {
625 return storageEngine_->RegisterObserverAction(connectionId, observer, action);
626 }
627
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)628 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
629 {
630 return storageEngine_->UnRegisterObserverAction(connectionId, observer);
631 }
632
StopLifeCycleTimer()633 int SQLiteRelationalStore::StopLifeCycleTimer()
634 {
635 auto runtimeCxt = RuntimeContext::GetInstance();
636 if (runtimeCxt == nullptr) {
637 return -E_INVALID_ARGS;
638 }
639 if (lifeTimerId_ != 0) {
640 TimerId timerId = lifeTimerId_;
641 lifeTimerId_ = 0;
642 runtimeCxt->RemoveTimer(timerId, false);
643 }
644 return E_OK;
645 }
646
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)647 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier)
648 {
649 auto runtimeCxt = RuntimeContext::GetInstance();
650 if (runtimeCxt == nullptr) {
651 return -E_INVALID_ARGS;
652 }
653 RefObject::IncObjRef(this);
654 TimerId timerId = 0;
655 int errCode = runtimeCxt->SetTimer(
656 DBConstant::DEF_LIFE_CYCLE_TIME,
657 [this](TimerId id) -> int {
658 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
659 if (lifeCycleNotifier_) {
660 // normal identifier mode
661 std::string identifier;
662 if (sqliteStorageEngine_->GetProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
663 identifier = sqliteStorageEngine_->GetProperties().GetStringProp(
664 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
665 } else {
666 identifier = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
667 }
668 auto userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
669 lifeCycleNotifier_(identifier, userId);
670 }
671 return 0;
672 },
673 [this]() {
674 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
675 if (ret != E_OK) {
676 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
677 }
678 },
679 timerId);
680 if (errCode != E_OK) {
681 lifeTimerId_ = 0;
682 LOGE("SetTimer failed:%d", errCode);
683 RefObject::DecObjRef(this);
684 return errCode;
685 }
686
687 lifeCycleNotifier_ = notifier;
688 lifeTimerId_ = timerId;
689 return E_OK;
690 }
691
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)692 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier)
693 {
694 int errCode;
695 {
696 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
697 if (lifeTimerId_ != 0) {
698 errCode = StopLifeCycleTimer();
699 if (errCode != E_OK) {
700 LOGE("Stop the life cycle timer failed:%d", errCode);
701 return errCode;
702 }
703 }
704
705 if (!notifier) {
706 return E_OK;
707 }
708 errCode = StartLifeCycleTimer(notifier);
709 if (errCode != E_OK) {
710 LOGE("Register life cycle timer failed:%d", errCode);
711 return errCode;
712 }
713 }
714 auto listener = [this] { HeartBeat(); };
715 storageEngine_->RegisterHeartBeatListener(listener);
716 return errCode;
717 }
718
HeartBeat()719 void SQLiteRelationalStore::HeartBeat()
720 {
721 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
722 int errCode = ResetLifeCycleTimer();
723 if (errCode != E_OK) {
724 LOGE("Heart beat for life cycle failed:%d", errCode);
725 }
726 }
727
ResetLifeCycleTimer()728 int SQLiteRelationalStore::ResetLifeCycleTimer()
729 {
730 if (lifeTimerId_ == 0) {
731 return E_OK;
732 }
733 auto lifeNotifier = lifeCycleNotifier_;
734 lifeCycleNotifier_ = nullptr;
735 int errCode = StopLifeCycleTimer();
736 if (errCode != E_OK) {
737 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
738 }
739 return StartLifeCycleTimer(lifeNotifier);
740 }
741
GetStorePath() const742 std::string SQLiteRelationalStore::GetStorePath() const
743 {
744 return sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
745 }
746
GetProperties() const747 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
748 {
749 return sqliteStorageEngine_->GetProperties();
750 }
751
StopSync(uint64_t connectionId)752 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
753 {
754 return syncAbleEngine_->StopSync(connectionId);
755 }
756
Dump(int fd)757 void SQLiteRelationalStore::Dump(int fd)
758 {
759 std::string userId = "";
760 std::string appId = "";
761 std::string storeId = "";
762 std::string label = "";
763 if (sqliteStorageEngine_ != nullptr) {
764 userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
765 appId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, "");
766 storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
767 label = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
768 }
769 label = DBCommon::TransferStringToHex(label);
770 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
771 storeId.c_str(), label.c_str());
772 if (syncAbleEngine_ != nullptr) {
773 syncAbleEngine_->Dump(fd);
774 }
775 }
776
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)777 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
778 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
779 {
780 if (sqliteStorageEngine_ == nullptr) {
781 return -E_INVALID_DB;
782 }
783 if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
784 LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
785 return -E_MAX_LIMITS;
786 }
787
788 if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
789 LOGW("not a distributed relational store.");
790 return -E_NOT_SUPPORT;
791 }
792 const auto &properties = sqliteStorageEngine_->GetProperties();
793 int tableMode =
794 properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE);
795 if (tableMode != DistributedTableMode::SPLIT_BY_DEVICE) {
796 LOGW("only support split mode.");
797 return -E_NOT_SUPPORT;
798 }
799
800 // Check whether to be able to operate the db.
801 int errCode = E_OK;
802 auto *handle = GetHandle(false, errCode);
803 if (handle == nullptr) {
804 return errCode;
805 }
806 errCode = handle->CheckEncryptedOrCorrupted();
807 ReleaseHandle(handle);
808 if (errCode != E_OK) {
809 return errCode;
810 }
811
812 return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
813 }
814
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)815 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
816 {
817 std::set<std::string> devices;
818 int errCode = GetExistDevices(devices);
819 if (errCode != E_OK) {
820 return errCode;
821 }
822 for (const auto &tableName : tableNameList) {
823 for (const auto &device : devices) {
824 errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
825 if (errCode != E_OK) {
826 return errCode;
827 }
828 }
829 }
830 return E_OK;
831 }
832
GetDevTableName(const std::string & device,const std::string & hashDev) const833 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
834 {
835 std::string devTableName;
836 StoreInfo info = { sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
837 sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
838 sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "") };
839 if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
840 devTableName = hashDev;
841 }
842 return devTableName;
843 }
844
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const845 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
846 {
847 int errCode = E_OK;
848 handle = GetHandle(true, errCode);
849 if (handle == nullptr) {
850 LOGE("get handle failed %d", errCode);
851 return errCode;
852 }
853
854 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
855 if (errCode != E_OK) {
856 LOGE("start transaction failed %d", errCode);
857 ReleaseHandle(handle);
858 }
859 return errCode;
860 }
861
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)862 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
863 const std::string &tableName, bool isNeedHash)
864 {
865 std::string hashHexDev;
866 std::string hashDev;
867 std::string devTableName;
868 if (!isNeedHash) {
869 // if is not need hash mappingDev mean hash(uuid) device is param device
870 hashHexDev = DBCommon::TransferStringToHex(mappingDev);
871 hashDev = mappingDev;
872 devTableName = device;
873 } else {
874 // if is need hash mappingDev mean uuid
875 hashDev = DBCommon::TransferHashString(mappingDev);
876 hashHexDev = DBCommon::TransferStringToHex(hashDev);
877 devTableName = GetDevTableName(mappingDev, hashHexDev);
878 }
879 // erase watermark first
880 int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
881 if (errCode != E_OK) {
882 LOGE("erase watermark failed %d", errCode);
883 return errCode;
884 }
885 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
886 errCode = GetHandleAndStartTransaction(handle);
887 if (handle == nullptr) {
888 return errCode;
889 }
890
891 errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
892 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
893 if (errCode != E_OK) {
894 LOGE("delete device data failed. %d", errCode);
895 tables.clear();
896 }
897
898 for (const auto &it : tables) {
899 if (tableName.empty() || it.second.GetTableName() == tableName) {
900 errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
901 if (errCode != E_OK) {
902 LOGE("delete device data failed. %d", errCode);
903 break;
904 }
905 }
906 }
907
908 if (errCode != E_OK) {
909 (void)handle->Rollback();
910 ReleaseHandle(handle);
911 return errCode;
912 }
913 errCode = handle->Commit();
914 ReleaseHandle(handle);
915 storageEngine_->NotifySchemaChanged();
916 return errCode;
917 }
918
GetExistDevices(std::set<std::string> & hashDevices) const919 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
920 {
921 int errCode = E_OK;
922 auto *handle = GetHandle(true, errCode);
923 if (handle == nullptr) {
924 LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
925 return errCode;
926 }
927 errCode = handle->GetExistsDeviceList(hashDevices);
928 if (errCode != E_OK) {
929 LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
930 }
931 ReleaseHandle(handle);
932 return errCode;
933 }
934
GetAllDistributedTableName()935 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName()
936 {
937 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
938 std::vector<std::string> tableNames;
939 for (const auto &table : tables) {
940 if (table.second.GetTableSyncType() == TableSyncType::CLOUD_COOPERATION) {
941 continue;
942 }
943 tableNames.push_back(table.second.GetTableName());
944 }
945 return tableNames;
946 }
947
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)948 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
949 {
950 if (cloudSyncer_ == nullptr) {
951 LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
952 return -E_INVALID_DB;
953 }
954 cloudSyncer_->SetCloudDB(cloudDb);
955 return E_OK;
956 }
957
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)958 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
959 std::vector<Field> &addFields)
960 {
961 for (const auto &newField : newFields) {
962 if (equalFields.find(newField.colName) == equalFields.end()) {
963 addFields.push_back(newField);
964 }
965 }
966 }
967
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)968 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
969 std::vector<Field> &addFields)
970 {
971 std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
972 if (newFields.size() < oldFields.size()) {
973 return false;
974 }
975 std::set<std::string> equalFields;
976 for (const auto &oldField : oldFields) {
977 bool isFieldExist = false;
978 for (const auto &newField : newFields) {
979 if (newField.colName != oldField.GetFieldName()) {
980 continue;
981 }
982 isFieldExist = true;
983 int32_t type = newField.type;
984 // Field type need to match storage type
985 // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
986 // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
987 if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
988 type++; // storage type - field type = 1
989 } else if (type == TYPE_INDEX<bool>) {
990 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
991 } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
992 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
993 }
994 auto primaryKeyMap = tableInfo.GetPrimaryKey();
995 auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
996 [&newField](const std::map<int, std::string>::value_type &pair) {
997 return pair.second == newField.colName;
998 });
999 if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
1000 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
1001 return false;
1002 }
1003 equalFields.insert(newField.colName);
1004 }
1005 if (!isFieldExist) {
1006 return false;
1007 }
1008 }
1009 AddFields(newFields, equalFields, addFields);
1010 return true;
1011 }
1012
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)1013 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
1014 std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
1015 {
1016 std::set<std::string> tableNames;
1017 std::map<std::string, std::string> sharedTableNamesMap;
1018 std::map<std::string, std::vector<Field>> fieldsMap;
1019 for (const auto &table : schema.tables) {
1020 tableNames.insert(table.name);
1021 sharedTableNamesMap[table.name] = table.sharedTableName;
1022 std::vector<Field> fields = table.fields;
1023 bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
1024 Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
1025 Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
1026 fields.insert(fields.begin(), privilegeField);
1027 fields.insert(fields.begin(), ownerField);
1028 fieldsMap[table.name] = fields;
1029 }
1030
1031 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1032 TableInfoMap tableList = localSchema.GetTables();
1033 for (const auto &tableInfo : tableList) {
1034 if (!tableInfo.second.GetSharedTableMark()) {
1035 continue;
1036 }
1037 std::string oldSharedTableName = tableInfo.second.GetTableName();
1038 std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1039 std::vector<Field> addFields;
1040 if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1041 deleteTableNames.push_back(oldSharedTableName);
1042 } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1043 deleteTableNames.push_back(oldSharedTableName);
1044 } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1045 if (!addFields.empty()) {
1046 updateTableNames[oldSharedTableName] = addFields;
1047 }
1048 if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1049 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1050 }
1051 } else {
1052 return false;
1053 }
1054 }
1055 return true;
1056 }
1057
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1058 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1059 {
1060 if (storageEngine_ == nullptr) {
1061 LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1062 return -E_INVALID_DB;
1063 }
1064 int errCode = CheckCloudSchema(schema);
1065 if (errCode != E_OK) {
1066 return errCode;
1067 }
1068 // delete, update and create shared table and its distributed table
1069 errCode = ExecuteCreateSharedTable(schema);
1070 if (errCode != E_OK) {
1071 LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1072 return errCode;
1073 }
1074 return storageEngine_->SetCloudDbSchema(schema);
1075 }
1076
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1077 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1078 {
1079 if (cloudSyncer_ == nullptr) {
1080 LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1081 return -E_INVALID_DB;
1082 }
1083 cloudSyncer_->SetIAssetLoader(loader);
1084 return E_OK;
1085 }
1086
ChkSchema(const TableName & tableName)1087 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1088 {
1089 // check schema is ok
1090 int errCode = E_OK;
1091 auto *handle = GetHandle(false, errCode);
1092 if (handle == nullptr) {
1093 LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1094 return errCode;
1095 }
1096 errCode = handle->CompareSchemaTableColumns(tableName);
1097 ReleaseHandle(handle);
1098 if (errCode != E_OK) {
1099 LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1100 return errCode;
1101 }
1102 if (storageEngine_ == nullptr) {
1103 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1104 return -E_INVALID_DB;
1105 }
1106 return storageEngine_->ChkSchema(tableName);
1107 }
1108
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1109 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1110 {
1111 if (storageEngine_ == nullptr) {
1112 LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1113 return -E_INVALID_DB;
1114 }
1115 int errCode = CheckBeforeSync(option);
1116 if (errCode != E_OK) {
1117 return errCode;
1118 }
1119 LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1120 if (option.compensatedSyncOnly) {
1121 CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1122 info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1123 cloudSyncer_->GenerateCompensatedSync(info);
1124 return E_OK;
1125 }
1126 CloudSyncer::CloudTaskInfo info;
1127 FillSyncInfo(option, onProcess, info);
1128 auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1129 if (ret != E_OK) {
1130 return ret;
1131 }
1132 ret = ReFillSyncInfoTable(table, info);
1133 if (ret != E_OK) {
1134 return ret;
1135 }
1136 info.taskId = taskId;
1137 errCode = cloudSyncer_->Sync(info);
1138 return errCode;
1139 }
1140
CheckBeforeSync(const CloudSyncOption & option)1141 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1142 {
1143 if (cloudSyncer_ == nullptr) {
1144 LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1145 return -E_INVALID_DB;
1146 }
1147 if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1148 return -E_INVALID_ARGS;
1149 }
1150 int errCode = CheckQueryValid(option);
1151 if (errCode != E_OK) {
1152 return errCode;
1153 }
1154 SecurityOption securityOption;
1155 errCode = storageEngine_->GetSecurityOption(securityOption);
1156 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1157 return -E_SECURITY_OPTION_CHECK_ERROR;
1158 }
1159 if (errCode == E_OK && securityOption.securityLabel == S4) {
1160 return -E_SECURITY_OPTION_CHECK_ERROR;
1161 }
1162 return E_OK;
1163 }
1164
CheckQueryValid(const CloudSyncOption & option)1165 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1166 {
1167 if (option.compensatedSyncOnly) {
1168 return E_OK;
1169 }
1170 QuerySyncObject syncObject(option.query);
1171 int errCode = syncObject.GetValidStatus();
1172 if (errCode != E_OK) {
1173 LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1174 return errCode;
1175 }
1176 std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1177 bool isFromTable = object.empty();
1178 if (!option.priorityTask && !isFromTable) {
1179 LOGE("[RelationalStore] not support normal sync with query");
1180 return -E_NOT_SUPPORT;
1181 }
1182 const auto tableNames = syncObject.GetRelationTableNames();
1183 for (const auto &tableName : tableNames) {
1184 QuerySyncObject querySyncObject;
1185 querySyncObject.SetTableName(tableName);
1186 object.push_back(querySyncObject);
1187 }
1188 std::vector<std::string> syncTableNames;
1189 for (const auto &item : object) {
1190 std::string tableName = item.GetRelationTableName();
1191 syncTableNames.emplace_back(tableName);
1192 }
1193 errCode = CheckTableName(syncTableNames);
1194 if (errCode != E_OK) {
1195 return errCode;
1196 }
1197 return CheckObjectValid(option.priorityTask, object, isFromTable);
1198 }
1199
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1200 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1201 bool isFromTable)
1202 {
1203 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1204 for (const auto &item : object) {
1205 if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1206 LOGE("[RelationalStore] not support priority sync with full table");
1207 return -E_INVALID_ARGS;
1208 }
1209 int errCode = storageEngine_->CheckQueryValid(item);
1210 if (errCode != E_OK) {
1211 return errCode;
1212 }
1213 if (!priorityTask || isFromTable) {
1214 continue;
1215 }
1216 if (!item.IsInValueOutOfLimit()) {
1217 LOGE("[RelationalStore] not support priority sync in count out of limit");
1218 return -E_MAX_LIMITS;
1219 }
1220 std::string tableName = item.GetRelationTableName();
1221 TableInfo tableInfo = localSchema.GetTable(tableName);
1222 if (!tableInfo.Empty()) {
1223 const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1224 errCode = item.CheckPrimaryKey(primaryKeyMap);
1225 if (errCode != E_OK) {
1226 return errCode;
1227 }
1228 }
1229 }
1230 return E_OK;
1231 }
1232
CheckTableName(const std::vector<std::string> & tableNames)1233 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1234 {
1235 if (tableNames.empty()) {
1236 LOGE("[RelationalStore] sync with empty table");
1237 return -E_INVALID_ARGS;
1238 }
1239 for (const auto &table : tableNames) {
1240 int errCode = ChkSchema(table);
1241 if (errCode != E_OK) {
1242 LOGE("[RelationalStore] schema check failed when sync");
1243 return errCode;
1244 }
1245 }
1246 return E_OK;
1247 }
1248
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1249 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1250 CloudSyncer::CloudTaskInfo &info)
1251 {
1252 auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1253 if (syncObject.empty()) {
1254 QuerySyncObject querySyncObject(option.query);
1255 info.table = querySyncObject.GetRelationTableNames();
1256 for (const auto &item : info.table) {
1257 QuerySyncObject object(Query::Select());
1258 object.SetTableName(item);
1259 info.queryList.push_back(object);
1260 }
1261 } else {
1262 for (auto &item : syncObject) {
1263 info.table.push_back(item.GetRelationTableName());
1264 info.queryList.push_back(std::move(item));
1265 }
1266 }
1267 info.devices = option.devices;
1268 info.mode = option.mode;
1269 info.callback = onProcess;
1270 info.timeout = option.waitTime;
1271 info.priorityTask = option.priorityTask;
1272 info.compensatedTask = option.compensatedSyncOnly;
1273 info.users.push_back("");
1274 info.lockAction = option.lockAction;
1275 info.merge = option.merge;
1276 info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1277 info.prepareTraceId = option.prepareTraceId;
1278 }
1279
SetTrackerTable(const TrackerSchema & trackerSchema)1280 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1281 {
1282 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1283 TableInfo tableInfo = localSchema.GetTable(trackerSchema.tableName);
1284 if (tableInfo.Empty()) {
1285 return sqliteStorageEngine_->SetTrackerTable(trackerSchema);
1286 }
1287 bool isFirstCreate = false;
1288 int errCode = sqliteStorageEngine_->CheckAndCacheTrackerSchema(trackerSchema, tableInfo, isFirstCreate);
1289 if (errCode != E_OK) {
1290 return errCode == -E_IGNORE_DATA ? E_OK : errCode;
1291 }
1292 errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1293 if (errCode != E_OK) {
1294 return errCode;
1295 }
1296 return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1297 }
1298
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1299 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1300 {
1301 if (condition.sql.empty()) {
1302 LOGE("[RelationalStore] execute sql is empty.");
1303 return -E_INVALID_ARGS;
1304 }
1305 return sqliteStorageEngine_->ExecuteSql(condition, records);
1306 }
1307
CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor * & handle,std::set<std::string> & clearWaterMarkTable)1308 int SQLiteRelationalStore::CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle,
1309 std::set<std::string> &clearWaterMarkTable)
1310 {
1311 int errCode = E_OK;
1312 for (const auto &tableName : clearWaterMarkTable) {
1313 std::string cloudWaterMark;
1314 Value blobMetaVal;
1315 errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1316 if (errCode != E_OK) {
1317 LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1318 return errCode;
1319 }
1320 errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1321 if (errCode != E_OK) {
1322 LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1323 return errCode;
1324 }
1325 errCode = handle->CleanUploadFinishedFlag(tableName);
1326 if (errCode != E_OK) {
1327 LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1328 return errCode;
1329 }
1330 }
1331 errCode = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTable);
1332 if (errCode != E_OK) {
1333 LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1334 }
1335 return errCode;
1336 }
1337
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1338 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1339 {
1340 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1341 int errCode = GetHandleAndStartTransaction(handle);
1342 if (errCode != E_OK) {
1343 LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1344 return errCode;
1345 }
1346 std::set<std::string> clearWaterMarkTables;
1347 RelationalSchemaObject schema;
1348 errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1349 if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1350 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1351 (void)handle->Rollback();
1352 ReleaseHandle(handle);
1353 return errCode;
1354 }
1355
1356 if (!clearWaterMarkTables.empty()) {
1357 storageEngine_->SetReusedHandle(handle);
1358 int ret = CleanWaterMark(handle, clearWaterMarkTables);
1359 if (ret != E_OK) {
1360 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", ret);
1361 storageEngine_->SetReusedHandle(nullptr);
1362 (void)handle->Rollback();
1363 ReleaseHandle(handle);
1364 return ret;
1365 }
1366 storageEngine_->SetReusedHandle(nullptr);
1367 LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1368 }
1369
1370 int ret = handle->Commit();
1371 ReleaseHandle(handle);
1372 if (ret == E_OK) {
1373 sqliteStorageEngine_->SetSchema(schema);
1374 return errCode;
1375 }
1376 LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1377 return ret;
1378 }
1379
InitTrackerSchemaFromMeta()1380 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1381 {
1382 int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1383 return errCode == -E_NOT_FOUND ? E_OK : errCode;
1384 }
1385
CleanTrackerData(const std::string & tableName,int64_t cursor)1386 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1387 {
1388 if (tableName.empty()) {
1389 return -E_INVALID_ARGS;
1390 }
1391 return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1392 }
1393
ExecuteCreateSharedTable(const DataBaseSchema & schema)1394 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1395 {
1396 if (sqliteStorageEngine_ == nullptr) {
1397 LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1398 return -E_INVALID_DB;
1399 }
1400 std::vector<std::string> deleteTableNames;
1401 std::map<std::string, std::vector<Field>> updateTableNames;
1402 std::map<std::string, std::string> alterTableNames;
1403 if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1404 LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1405 return -E_INVALID_ARGS;
1406 }
1407 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1408 // upgrade contains delete, alter, update and create
1409 int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1410 if (errCode != E_OK) {
1411 LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1412 } else {
1413 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1414 }
1415 return errCode;
1416 }
1417
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1418 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1419 CloudSyncer::CloudTaskInfo &info)
1420 {
1421 if (info.priorityTask && actualTable.size() != info.table.size()) {
1422 LOGE("[RelationalStore] Not support regenerate table with priority task");
1423 return -E_NOT_SUPPORT;
1424 }
1425 if (actualTable.size() == info.table.size()) {
1426 return E_OK;
1427 }
1428 LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1429 info.table = actualTable;
1430 info.queryList.clear();
1431 for (const auto &item : info.table) {
1432 QuerySyncObject object(Query::Select());
1433 object.SetTableName(item);
1434 info.queryList.push_back(object);
1435 }
1436 return E_OK;
1437 }
1438
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1439 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1440 {
1441 if (cmd != LOGIC_DELETE_SYNC_DATA) {
1442 return -E_NOT_SUPPORT;
1443 }
1444 if (pragmaData == nullptr) {
1445 return -E_INVALID_ARGS;
1446 }
1447 auto logicDelete = *(static_cast<bool *>(pragmaData));
1448 if (storageEngine_ == nullptr) {
1449 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1450 return -E_INVALID_DB;
1451 }
1452 storageEngine_->SetLogicDelete(logicDelete);
1453 return E_OK;
1454 }
1455
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1456 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1457 const std::vector<VBucket> &records)
1458 {
1459 if (storageEngine_ == nullptr) {
1460 LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1461 return -E_INVALID_DB;
1462 }
1463 int errCode = CheckParamForUpsertData(status, tableName, records);
1464 if (errCode != E_OK) {
1465 return errCode;
1466 }
1467 return storageEngine_->UpsertData(status, tableName, records);
1468 }
1469
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1470 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1471 const std::vector<VBucket> &records)
1472 {
1473 if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1474 LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1475 return -E_INVALID_ARGS;
1476 }
1477 if (records.empty()) {
1478 LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1479 return -E_INVALID_ARGS;
1480 }
1481 size_t recordSize = records.size();
1482 if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1483 LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1484 return -E_MAX_LIMITS;
1485 }
1486 return CheckSchemaForUpsertData(tableName, records);
1487 }
1488
ChkTable(const TableInfo & table)1489 static int ChkTable(const TableInfo &table)
1490 {
1491 if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1492 LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1493 return -E_NOT_SUPPORT;
1494 }
1495 if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1496 return -E_NOT_FOUND;
1497 }
1498 return E_OK;
1499 }
1500
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1501 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1502 {
1503 if (tableName.empty()) {
1504 return -E_INVALID_ARGS;
1505 }
1506 auto schema = storageEngine_->GetSchemaInfo();
1507 auto table = schema.GetTable(tableName);
1508 int errCode = ChkTable(table);
1509 if (errCode != E_OK) {
1510 return errCode;
1511 }
1512 TableSchema cloudTableSchema;
1513 errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1514 if (errCode != E_OK) {
1515 LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1516 return errCode;
1517 }
1518 errCode = ChkSchema(tableName);
1519 if (errCode != E_OK) {
1520 return errCode;
1521 }
1522 std::set<std::string> dbPkFields;
1523 for (auto &field : table.GetIdentifyKey()) {
1524 dbPkFields.insert(field);
1525 }
1526 std::set<std::string> schemaFields;
1527 for (auto &fieldInfo : table.GetFieldInfos()) {
1528 schemaFields.insert(fieldInfo.GetFieldName());
1529 }
1530 for (const auto &record : records) {
1531 std::set<std::string> recordPkFields;
1532 for (const auto &item : record) {
1533 if (schemaFields.find(item.first) == schemaFields.end()) {
1534 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1535 return -E_INVALID_ARGS;
1536 }
1537 if (dbPkFields.find(item.first) == dbPkFields.end()) {
1538 continue;
1539 }
1540 recordPkFields.insert(item.first);
1541 }
1542 if (recordPkFields.size() != dbPkFields.size()) {
1543 LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1544 recordPkFields.size(), dbPkFields.size());
1545 return -E_INVALID_ARGS;
1546 }
1547 }
1548 return errCode;
1549 }
1550
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1551 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1552 {
1553 auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1554 if (engine == nullptr) {
1555 LOGE("[RelationalStore][Open] Create storage engine failed");
1556 return -E_OUT_OF_MEMORY;
1557 }
1558 sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1559 [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1560 RefObject::KillAndDecObjRef(releaseEngine);
1561 });
1562 return E_OK;
1563 }
1564
CheckCloudSchema(const DataBaseSchema & schema)1565 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1566 {
1567 if (storageEngine_ == nullptr) {
1568 LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1569 return -E_INVALID_DB;
1570 }
1571 std::shared_ptr<DataBaseSchema> cloudSchema;
1572 (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1573 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1574 for (const auto &tableSchema : schema.tables) {
1575 TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1576 if (tableInfo.Empty()) {
1577 continue;
1578 }
1579 if (tableInfo.GetSharedTableMark()) {
1580 LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1581 return -E_INVALID_ARGS;
1582 }
1583 }
1584 for (const auto &tableSchema : schema.tables) {
1585 if (cloudSchema == nullptr) {
1586 continue;
1587 }
1588 for (const auto &oldSchema : cloudSchema->tables) {
1589 if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1590 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1591 return -E_INVALID_ARGS;
1592 }
1593 }
1594 }
1595 return E_OK;
1596 }
1597
SetCloudSyncConfig(const CloudSyncConfig & config)1598 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1599 {
1600 if (storageEngine_ == nullptr) {
1601 LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1602 return -E_INVALID_DB;
1603 }
1604 storageEngine_->SetCloudSyncConfig(config);
1605 return E_OK;
1606 }
1607
GetCloudTaskStatus(uint64_t taskId)1608 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1609 {
1610 return cloudSyncer_->GetCloudTaskStatus(taskId);
1611 }
1612 } //namespace DistributedDB
1613 #endif
1614