1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_single_ver_database_upgrader.h"
30 #include "sqlite_single_ver_natural_store.h"
31 #include "sqlite_single_ver_schema_database_upgrader.h"
32 
33 namespace DistributedDB {
SQLiteSingleVerStorageEngine()34 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
35     : executorState_(ExecutorState::INVALID),
36       cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
37       isCorrupted_(false),
38       isNeedUpdateSecOpt_(false)
39 {}
40 
~SQLiteSingleVerStorageEngine()41 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
42 {
43 }
44 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const45 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
46 {
47     return handle->MigrateLocalData();
48 }
49 
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)50 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
51 {
52     auto kvdbManager = KvDBManager::GetInstance();
53     if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
54         return -E_INVALID_DB;
55     }
56     auto identifier = GetIdentifier();
57     auto kvdb = kvdbManager->FindKvDB(identifier);
58     if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
59         LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
60         return -E_INVALID_DB;
61     }
62 
63     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
64     for (const auto &devId : removeDevices) {
65         int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
66         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
67             RefObject::DecObjRef(kvdb);
68             return errCode;
69         }
70     }
71 
72     RefObject::DecObjRef(kvdb);
73     return E_OK;
74 }
75 
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const76 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
77     std::set<std::string> &removeDevices, bool &isNeedHash) const
78 {
79     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
80         return -E_INVALID_DB;
81     }
82     if (item.value.empty()) { // Device ID has been set to value in cache db
83         // Empty means remove all device data, get device id from meta key
84         // LCOV_EXCL_BR_LINE
85         int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
86         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
87             LOGE("Get remove devices list from meta failed. err=%d", errCode);
88             return errCode;
89         }
90         isNeedHash = false;
91     } else {
92         std::string deviceName;
93         DBCommon::VectorToString(item.value, deviceName);
94         removeDevices.insert(deviceName);
95     }
96     return E_OK;
97 }
98 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)99 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
100     const std::vector<DataItem> &dataItems)
101 {
102     int errCode = E_OK;
103     for (const auto &dataItem : dataItems) {
104         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
105             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
106             bool isNeedHash = true;
107             std::set<std::string> removeDevices;
108             errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
109             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
110                 LOGE("Get remove device id failed. err=%d", errCode);
111                 return errCode;
112             }
113 
114             // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
115             errCode = ReleaseExecutor(handle);
116             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
117                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
118                 return errCode;
119             }
120 
121             errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
122             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
123                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
124                 return errCode;
125             }
126 
127             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
128                 errCode));
129             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
130                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
131                 return errCode;
132             }
133         }
134     }
135     return errCode;
136 }
137 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)138 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
139     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
140 {
141     if (syncData.committedData == nullptr) {
142         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
143         if (syncData.committedData == nullptr) {
144             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
145             return -E_OUT_OF_MEMORY;
146         }
147     }
148     InitConflictNotifiedFlag(syncData.committedData);
149 
150     std::vector<DataItem> dataItems;
151     uint64_t minVerIncurCacheDb = 0;
152     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
153     if (errCode != E_OK) {
154         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
155         return errCode;
156     }
157 
158     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
159         ++curMigrateVer;
160         return E_OK;
161     }
162 
163     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
164         curMigrateVer = minVerIncurCacheDb;
165     }
166 
167     // Call the syncer module to erase the water mark.
168     errCode = EraseDeviceWaterMark(handle, dataItems);
169     if (errCode != E_OK) {
170         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
171         return errCode;
172     }
173 
174     // next version need process
175     LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
176         curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
177     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
178     if (errCode != E_OK) {
179         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
180         return errCode;
181     }
182 
183     errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
184     if (errCode != E_OK) {
185         return errCode;
186     }
187 
188     return E_OK;
189 }
190 
191 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)192 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
193     NotifyMigrateSyncData &syncData)
194 {
195     int errCode = ReleaseExecutor(handle);
196     if (errCode != E_OK) {
197         LOGE("release executor for reopen database! errCode = [%d]", errCode);
198         return errCode;
199     }
200 
201     CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
202 
203     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
204     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
205     if (errCode != E_OK) {
206         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
207         return errCode;
208     }
209     return errCode;
210 }
211 
AddSubscribeToMainDBInMigrate()212 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
213 {
214     LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
215     std::lock_guard<std::mutex> lock(subscribeMutex_);
216     if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
217         return E_OK;
218     }
219     int errCode = E_OK;
220     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
221     if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
222         LOGE("Get available executor for add subscribe failed. %d", errCode);
223         return errCode;
224     }
225     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
226     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
227         goto END;
228     }
229     for (auto item : subscribeQuery_) {
230         errCode = handle->AddSubscribeTrigger(item.second, item.first);
231         if (errCode != E_OK) {
232             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
233         }
234     }
235     subscribeQuery_.clear();
236     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
237     (void)handle->Commit();
238 END:
239     ReleaseExecutor(handle);
240     return errCode;
241 }
242 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)243 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
244 {
245     int errCode = E_OK;
246     if (handle == nullptr) {
247         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
248         if (errCode != E_OK) {
249             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
250             return errCode;
251         }
252     }
253 
254     LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
255     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
256     NotifyMigrateSyncData syncData;
257     auto kvdbManager = KvDBManager::GetInstance();
258     if (kvdbManager != nullptr) {
259         auto identifier = GetIdentifier();
260         auto kvdb = kvdbManager->FindKvDB(identifier);
261         if (kvdb != nullptr) {
262             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
263             syncData.isPermitForceWrite =
264                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
265             RefObject::DecObjRef(kvdb);
266         } else {
267             LOGE("[SingleVerEngine] kvdb is null.");
268         }
269     }
270     // cache atomic version represents version of cacheDb input next time
271     while (curMigrateVer < GetCacheRecordVersion()) {
272         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
273         if (errCode != E_OK) {
274             LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
275             break;
276         }
277         if (!syncData.isRemote) {
278             isNeedTriggerSync = true;
279         }
280     }
281     if (syncData.committedData != nullptr) {
282         RefObject::DecObjRef(syncData.committedData);
283         syncData.committedData = nullptr;
284     }
285     // When finished Migrating sync data, will fix engine state
286     return errCode;
287 }
288 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)289 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
290     EngineState stateBeforeMigrate)
291 {
292     LOGD("Begin attach main db and cache db by executor!");
293     // Judge the file corresponding to db by the engine status and attach it to another file
294     int errCode = E_OK;
295     std::string attachAbsPath;
296     if (stateBeforeMigrate == EngineState::MAINDB) {
297         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
298             DBConstant::DB_EXTENSION;
299         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
300     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
301         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
302         DBConstant::DB_EXTENSION;
303         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
304     } else {
305         return -E_NOT_SUPPORT;
306     }
307     if (errCode != E_OK) {
308         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
309         return errCode;
310     }
311 
312     uint64_t maxVersion = 0;
313     errCode = handle->GetMaxVersionInCacheDb(maxVersion);
314     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
315         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
316     }
317 
318     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
319     return errCode;
320 }
321 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const322 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
323 {
324     LOGD("Begin attach main db and cache db by sqlite handle!");
325     // Judge the file corresponding to db by the engine status and attach it to another file
326     int errCode = E_OK;
327     std::string attachAbsPath;
328     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
329         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
330             DBConstant::DB_EXTENSION;
331         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
332     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
333         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
334             DBConstant::DB_EXTENSION;
335         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
336     } else {
337         return -E_NOT_SUPPORT;
338     }
339     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
340         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
341         return errCode;
342     }
343 
344     return errCode;
345 }
346 
ReInit()347 int SQLiteSingleVerStorageEngine::ReInit()
348 {
349     return Init();
350 }
351 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)352 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
353 {
354     if (handle == nullptr) {
355         return E_OK;
356     }
357     StorageExecutor *databaseHandle = handle;
358     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
359     Recycle(databaseHandle);
360     handle = nullptr;
361     if (isCorrupted_) {
362         LOGE("Database is corrupted or invalid passwd!");
363         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
364     }
365     return E_OK;
366 }
367 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)368 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
369     EngineState stateBeforeMigrate)
370 {
371     LOGI("Begin to finish migrate and reinit db state!");
372     int errCode;
373     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
374         return -E_INVALID_ARGS;
375     }
376 
377     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
378         sqlite3 *dbHandle = nullptr;
379         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
380         if (errCode != E_OK) {
381             LOGE("Get Db handle failed! errCode = [%d]", errCode);
382             return errCode;
383         }
384 
385         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
386         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
387             LOGE("Execute the SQLite detach failed:%d", errCode);
388             return errCode;
389         }
390         // delete cachedb
391         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
392         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
393             LOGE("Remove files of cache database after detach:%d", errCode);
394         }
395 
396         SetEngineState(EngineState::MAINDB);
397         return errCode;
398     }
399 
400     errCode = ReleaseExecutor(handle);
401     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
402         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
403         return errCode;
404     }
405 
406     // close db for reinit this engine
407     Release();
408 
409     // delete cache db
410     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
411     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
412         LOGE("Remove files of cache database after release current db:%d", errCode);
413         return errCode;
414     }
415 
416     // reInit, it will reset engine state
417     errCode = ReInit();
418     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
419         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
420         return errCode;
421     }
422 
423     return E_OK;
424 }
425 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)426 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
427     EngineState preMigrateState)
428 {
429     // after attach main and cache need change operate data sql, changing state forbid operate database
430     SetEngineState(EngineState::MIGRATING);
431 
432     int errCode = E_OK;
433     // check if has been attach and attach cache and main for migrate
434     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
435         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
436         if (errCode != E_OK) {
437             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
438             // For lock state open db, can not attach main and cache
439             return errCode;
440         }
441     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
442         // Has been attach, maybe ever crashed, need update version
443         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
444         uint64_t maxVersion = 0;
445         errCode = handle->GetMaxVersionInCacheDb(maxVersion);
446         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
447             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
448         }
449         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
450     } else {
451         return -E_UNEXPECTED_DATA;
452     }
453 
454     return errCode;
455 }
456 
ExecuteMigrate()457 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
458 {
459     EngineState preState = GetEngineState();
460     std::lock_guard<std::mutex> lock(migrateLock_);
461     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
462         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
463         DBConstant::DB_EXTENSION)) {
464         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
465         return E_OK;
466     }
467 
468     // Get write executor for migrate
469     int errCode = E_OK;
470     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
471     if (errCode != E_OK) {
472         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
473         return errCode;
474     }
475 
476     isMigrating_.store(true);
477     LOGD("Migrate start.");
478     bool isNeedTriggerSync = false;
479     errCode = InitExecuteMigrate(handle, preState);
480     if (errCode != E_OK) {
481         LOGE("Init migrate data fail, errCode = [%d]", errCode);
482         goto END;
483     }
484 
485     LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
486         static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
487     // has been attached, Mark start of migration and it can migrate data
488     errCode = MigrateLocalData(handle);
489     if (errCode != E_OK) {
490         LOGE("Migrate local data fail, errCode = [%d]", errCode);
491         goto END;
492     }
493 
494     errCode = MigrateSyncData(handle, isNeedTriggerSync);
495     if (errCode != E_OK) {
496         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
497         goto END;
498     }
499 
500     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
501 
502     // detach database and delete cachedb
503     errCode = FinishMigrateData(handle, preState);
504     if (errCode != E_OK) {
505         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
506         goto END;
507     }
508 
509 END: // after FinishMigrateData, it will reset engine state
510     // there is no need cover the errCode
511     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
512     isMigrating_.store(false);
513     LOGD("Migrate stop.");
514     return errCode;
515 }
516 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)517 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
518     int errCode, bool isNeedTriggerSync)
519 {
520     LOGD("Finish migrating data! errCode = [%d]", errCode);
521     if (errCode != E_OK) {
522         SetEngineState(stateBeforeMigrate);
523     }
524     if (handle != nullptr) {
525         handle->ClearMigrateData();
526     }
527     errCode = ReleaseExecutor(handle);
528     if (errCode != E_OK) {
529         LOGE("release executor after migrating! errCode = [%d]", errCode);
530     }
531 
532     errCode = AddSubscribeToMainDBInMigrate();
533     if (errCode != E_OK) {
534         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
535     }
536 
537     // Notify max timestamp offset for SyncEngine.
538     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
539     RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
540     if (isNeedTriggerSync) {
541         commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
542             nullptr);
543     }
544     return;
545 }
546 
IsEngineCorrupted() const547 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
548 {
549     return isCorrupted_;
550 }
551 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)552 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
553 {
554     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
555     if (executor == nullptr) {
556         return executor;
557     }
558     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
559     return executor;
560 }
561 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)562 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
563 {
564     // Only could get the main database handle in the uninitialized and the main status.
565     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
566         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
567         return -E_EKEYREVOKED;
568     }
569 
570     if (!option_.isMemDb) {
571         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
572             DBConstant::DB_EXTENSION;
573     }
574 
575     OpenDbProperties optionTemp = option_;
576     if (!isWrite) {
577         optionTemp.createIfNecessary = false;
578     }
579 
580     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
581     if (errCode != E_OK) {
582         if (errno == EKEYREVOKED) {
583             LOGI("Failed to open the main database for key revoked[%d]", errCode);
584             errCode = -E_EKEYREVOKED;
585         }
586         return errCode;
587     }
588 
589     executorState_ = ExecutorState::MAINDB;
590     // Set the engine state to main status for that the main database is valid.
591     SetEngineState(EngineState::MAINDB);
592 
593     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
594         DBConstant::DB_EXTENSION)) {
595         // In status cacheDb crash
596         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
597         if (errCode != E_OK) {
598             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
599             return E_OK; // not care err to return, only use for print log
600         }
601         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
602         // cache and main existed together, can not read data, must execute migrate first
603         SetEngineState(EngineState::ATTACHING);
604     }
605 
606     return errCode;
607 }
608 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)609 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
610 {
611     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
612     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
613         secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
614     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
615         return errCode;
616     }
617     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
618         DBConstant::DB_EXTENSION;
619     if (!isWrite || GetEngineState() != EngineState::INVALID ||
620         OS::CheckPathExistence(cacheDbPath)) {
621         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
622             isWrite, GetEngineState());
623         return -E_EKEYREVOKED;
624     }
625 
626     errCode = GetCacheDbHandle(dbHandle);
627     if (errCode != E_OK) {
628         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
629         return errCode;
630     }
631     SetEngineState(EngineState::CACHEDB);
632     executorState_ = ExecutorState::CACHEDB;
633 
634     ResetCacheRecordVersion();
635     // Get handle means maindb file ekeyevoked, not need attach to
636     return errCode;
637 }
638 
639 namespace CacheDbSqls {
640 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
641     "CREATE TABLE IF NOT EXISTS local_data(" \
642         "key     BLOB   NOT NULL," \
643         "value  BLOB," \
644         "timestamp  INT," \
645         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
646         "flag  INT  NOT NULL);";
647 
648 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
649     "CREATE TABLE IF NOT EXISTS sync_data(" \
650         "key         BLOB NOT NULL," \
651         "value       BLOB," \
652         "timestamp   INT  NOT NULL," \
653         "flag        INT  NOT NULL," \
654         "device      BLOB," \
655         "ori_device  BLOB," \
656         "hash_key    BLOB  NOT NULL," \
657         "w_timestamp INT," \
658         "version     INT  NOT NULL," \
659         "PRIMARY Key(version, hash_key));";
660 }
661 
662 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
663 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)664 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
665 {
666     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
667         DBConstant::DB_EXTENSION;
668     // creatTable
669     option_.sqls = {
670         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
671         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
672     };
673 
674     if (!option_.createIfNecessary) {
675         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
676             DBConstant::DB_EXTENSION;
677         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
678             return -E_INVALID_DB;
679         }
680     }
681 
682     OpenDbProperties option = option_; // copy for no change it
683     option.createIfNecessary = true;
684     int errCode = SQLiteUtils::OpenDatabase(option, db);
685     if (errCode != E_OK) {
686         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
687         return errCode;
688     }
689     return errCode;
690 }
691 
CheckDatabaseSecOpt(const SecurityOption & secOption) const692 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
693 {
694     if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
695         secOption.securityLabel != SecurityLabel::NOT_SET &&
696         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
697         LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
698             secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
699             option_.securityOpt.securityFlag);
700     }
701 }
702 
CreateNewDirsAndSetSecOpt() const703 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
704 {
705     LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
706     return CreateNewDirsAndSetSecOption(option_);
707 }
708 
GetExistedSecOption(SecurityOption & secOption) const709 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
710 {
711     LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
712     return GetExistedSecOpt(option_, secOption);
713 }
714 
ClearCorruptedFlag()715 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
716 {
717     isCorrupted_ = false;
718 }
719 
PreCreateExecutor(bool isWrite,SecurityOption & existedSecOpt)720 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite, SecurityOption &existedSecOpt)
721 {
722     // Assume that create the write executor firstly and the write one we will not be released.
723     // If the write one would be released in the future, should take care the pass through.
724     if (!isWrite) {
725         return E_OK;
726     }
727 
728     if (option_.isMemDb) {
729         return E_OK;
730     }
731 
732     // check sqlite open ok
733     int errCode = CheckStoreStatus(option_);
734     if (errCode != E_OK) {
735         return errCode;
736     }
737 
738     // Get the existed database secure option.
739     errCode = GetExistedSecOption(existedSecOpt);
740     if (errCode != E_OK) {
741         return errCode;
742     }
743 
744     CheckDatabaseSecOpt(existedSecOpt);
745 
746     // Judge whether need update the security option of the engine.
747     // Should update the security in the import or rekey scene(inner) or exist is not set.
748     if (IsUseExistedSecOption(existedSecOpt, option_.securityOpt)) {
749         option_.securityOpt = existedSecOpt;
750     } else {
751         isNeedUpdateSecOpt_ = true;
752     }
753 
754     errCode = CreateNewDirsAndSetSecOpt();
755     if (errCode != E_OK) {
756         return errCode;
757     }
758 
759     if (!isUpdated_) {
760         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
761         if (errCode != E_OK) {
762             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
763             return errCode;
764         }
765     }
766 
767     return E_OK;
768 }
769 
EndCreateExecutor(sqlite3 * db,bool isWrite,bool isDetachMeta)770 int SQLiteSingleVerStorageEngine::EndCreateExecutor(sqlite3 *db, bool isWrite, bool isDetachMeta)
771 {
772     if (option_.isMemDb || !isWrite) {
773         return E_OK;
774     }
775 
776     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
777         isNeedUpdateSecOpt_);
778     if (errCode != E_OK) {
779         if (errCode == -E_NOT_SUPPORT) {
780             option_.securityOpt = SecurityOption();
781             errCode = E_OK;
782         }
783         LOGE("SetSecOption failed:%d", errCode);
784         return errCode;
785     }
786 
787     // after setting secOption, the database file operation ends
788     // database create completed, delete the token
789     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
790         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
791         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
792         return -E_SYSTEM_API_FAIL;
793     }
794     if (isDetachMeta) {
795         errCode = SQLiteUtils::ExecuteRawSQL(db, "DETACH 'meta'");
796         if (errCode != E_OK) {
797             LOGE("Detach meta db failed %d", errCode);
798         } else {
799             LOGI("Detach meta db success");
800         }
801     }
802     return errCode;
803 }
804 
TryAttachMetaDb(const SecurityOption & existedSecOpt,sqlite3 * & dbHandle,bool & isAttachMeta,bool & isNeedDetachMeta)805 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(const SecurityOption &existedSecOpt, sqlite3 *&dbHandle,
806     bool &isAttachMeta, bool &isNeedDetachMeta)
807 {
808     bool isCurrentSESECE = ParamCheckUtils::IsS3SECEOpt(existedSecOpt);
809     bool isOpenSESECE = ParamCheckUtils::IsS3SECEOpt(option_.securityOpt);
810     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
811     if ((!option_.isMemDb) && (isOpenSESECE || (isNeedUpdateSecOpt_ && isCurrentSESECE))) {
812         int errCode = AttachMetaDatabase(dbHandle, option_);
813         if (errCode != E_OK) {
814             (void)sqlite3_close_v2(dbHandle);
815             dbHandle = nullptr;
816             return errCode;
817         }
818         isAttachMeta = isOpenSESECE; // only open with S3 SECE need in attach mode
819         isNeedDetachMeta = !isOpenSESECE && isCurrentSESECE; // NOT S3 SECE no need meta.db
820     }
821     return E_OK;
822 }
823 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)824 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
825 {
826     SecurityOption existedSecOpt;
827     int errCode = PreCreateExecutor(isWrite, existedSecOpt);
828     if (errCode != E_OK) {
829         return errCode;
830     }
831 
832     sqlite3 *dbHandle = nullptr;
833     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
834     if (errCode != E_OK) {
835         return errCode;
836     }
837 
838     bool isAttachMeta = false;
839     bool isDetachMeta = false;
840     errCode = TryAttachMetaDb(existedSecOpt, dbHandle, isAttachMeta, isDetachMeta);
841     if (errCode != E_OK) {
842         return errCode;
843     }
844 
845     RegisterFunctionIfNeed(dbHandle);
846     errCode = Upgrade(dbHandle);
847     if (errCode != E_OK) {
848         (void)sqlite3_close_v2(dbHandle);
849         dbHandle = nullptr;
850         return errCode;
851     }
852 
853     errCode = EndCreateExecutor(dbHandle, isWrite, isDetachMeta);
854     if (errCode != E_OK) {
855         LOGE("After create executor, set security option incomplete!");
856         (void)sqlite3_close_v2(dbHandle);
857         dbHandle = nullptr;
858         return errCode;
859     }
860 
861     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
862     if (handle == nullptr) {
863         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
864         (void)sqlite3_close_v2(dbHandle);
865         dbHandle = nullptr;
866         return -E_OUT_OF_MEMORY;
867     }
868     if (isAttachMeta) {
869         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
870         singleVerHandle->SetAttachMetaMode(isAttachMeta);
871     }
872     return E_OK;
873 }
874 
Upgrade(sqlite3 * db)875 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
876 {
877     if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
878         LOGI("Storage engine [%.6s] is in cache status or has been upgraded[%d]!",
879             DBCommon::TransferStringToHex(identifier_).c_str(), isUpdated_);
880         return E_OK;
881     }
882 
883     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
884     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
885     if (option_.schema.empty()) {
886         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
887     } else {
888         SchemaObject schema;
889         int errCode = schema.ParseFromSchemaString(option_.schema);
890         if (errCode != E_OK) {
891             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
892             return errCode;
893         }
894         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
895             option_.securityOpt, option_.isMemDb);
896     }
897 
898     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
899     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
900     SecurityOption secOpt = option_.securityOpt;
901     int errCode = E_OK;
902     if (isNeedUpdateSecOpt_) {
903         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
904         if (errCode != E_OK) {
905             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
906             if (errCode != -E_NOT_SUPPORT) {
907                 return errCode;
908             }
909             secOpt = SecurityOption();
910         }
911     }
912 
913     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
914     upgrader->SetSubdir(option_.subdir);
915     errCode = upgrader->Upgrade();
916     if (errCode != E_OK) {
917         LOGE("Single ver database upgrade failed:%d", errCode);
918         return errCode;
919     }
920 
921     LOGD("Finish upgrade single ver database!");
922     isUpdated_ = true; // Identification to avoid repeated upgrades
923     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
924     isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
925     return errCode;
926 }
927 
928 // Attention: This function should be called before "Upgrade".
929 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const930 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
931 {
932     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
933     // not a newly created database, the meta-Table should exist and can be accessed.
934     std::string schemaStr = option_.schema;
935     if (schemaStr.empty()) {
936         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
937         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
938         if (errCode != E_OK) {
939             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
940         }
941     }
942     if (!schemaStr.empty()) {
943         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
944         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
945         if (errCode != E_OK) { // Not very likely
946             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
947             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
948         }
949     }
950 
951     // This function is used to update meta_data in triggers when it's attached to mainDB
952     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
953     if (errCode != E_OK) {
954         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
955     }
956 }
957 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const958 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
959 {
960     int errCode;
961     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
962     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
963         DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
964     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
965     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
966         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
967         if (errCode != E_OK) {
968             return errCode;
969         }
970     }
971     CipherPassword passwd;
972     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
973     if (errCode != E_OK) {
974         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
975     }
976     return errCode;
977 }
978 
ResetCacheRecordVersion()979 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
980 {
981     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
982 }
983 
IncreaseCacheRecordVersion()984 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
985 {
986     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
987 }
988 
GetAndIncreaseCacheRecordVersion()989 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
990 {
991     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
992 }
993 
GetCacheRecordVersion() const994 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
995 {
996     return cacheRecordVersion_.load(std::memory_order_seq_cst);
997 }
998 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const999 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1000     int eventType) const
1001 {
1002     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1003     if (commitNotifyFunc_ == nullptr) {
1004         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1005         RefObject::DecObjRef(committedData);
1006         committedData = nullptr;
1007         return;
1008     }
1009     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1010     committedData = nullptr;
1011 }
1012 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1013 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1014 {
1015     if (committedData == nullptr) {
1016         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1017         return;
1018     }
1019     auto identifier = GetIdentifier();
1020     auto kvDBManager = KvDBManager::GetInstance();
1021     if (kvDBManager == nullptr) {
1022         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1023         return;
1024     }
1025     auto kvdb = kvDBManager->FindKvDB(identifier);
1026     if (kvdb == nullptr) {
1027         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1028         return;
1029     }
1030     unsigned int conflictFlag = 0;
1031     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1032         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1033         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1034     }
1035     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1036         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1037         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1038     }
1039     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1040         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1041         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1042     }
1043     RefObject::DecObjRef(kvdb);
1044     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1045     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1046 }
1047 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1048 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1049 {
1050     const auto &isRemote = syncData.isRemote;
1051     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1052     auto &committedData = syncData.committedData;
1053     auto &entries = syncData.entries;
1054 
1055     // Put data. Including insert, update and delete.
1056     if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1057         if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1058             int eventType = static_cast<int>(isRemote ?
1059                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1060                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1061             CommitAndReleaseNotifyData(committedData, eventType);
1062         }
1063         return;
1064     }
1065 
1066     // Remove device data.
1067     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1068         return;
1069     }
1070     size_t totalSize = 0;
1071     for (auto iter = entries.begin(); iter != entries.end();) {
1072         auto &entry = *iter;
1073         if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1074             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1075             if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1076                 LOGE("Alloc committed notify data failed.");
1077                 return;
1078             }
1079         }
1080         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() >
1081             DBConstant::MAX_VALUE_SIZE) { // LCOV_EXCL_BR_LINE
1082             iter++;
1083             continue;
1084         }
1085         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1086             CommitAndReleaseNotifyData(committedData,
1087                 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1088             totalSize = 0;
1089             continue;
1090         }
1091         totalSize += (entry.key.size() + entry.value.size());
1092         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1093         iter++;
1094     }
1095     if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1096         CommitAndReleaseNotifyData(committedData,
1097             static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1098     }
1099 }
1100 
1101 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1102 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1103 {
1104     std::lock_guard<std::mutex> lock(subscribeMutex_);
1105     subscribeQuery_[subscribeId] = query;
1106 }
1107 
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1108 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1109     const SecurityOption &openSecOpt)
1110 {
1111     if (isNeedUpdateSecOpt_) {
1112         return false;
1113     }
1114     if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1115         return false;
1116     }
1117     return true;
1118 }
1119 
UpgradeLocalMetaData()1120 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1121 {
1122     std::function<int(void)> schemaChangedFunc = nullptr;
1123     {
1124         std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1125         if (isSchemaChanged_) {
1126             schemaChangedFunc = schemaChangedFunc_;
1127             isSchemaChanged_ = false;
1128         }
1129     }
1130     if (schemaChangedFunc != nullptr) {
1131         return schemaChangedFunc();
1132     }
1133     return E_OK;
1134 }
1135 }
1136