1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_executor.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27 
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31     int errCode = -E_NOT_SUPPORT;
32     if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
33         std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34             INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35         std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36             UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37         errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38             saveLocalStatements_);
39     } else if (type == SingleVerDataType::SYNC_TYPE) {
40         std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41             INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42         std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43             UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44         std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45             SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46         errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47     }
48     if (errCode != E_OK) {
49         LOGE("Prepare to save sync cache data failed:%d", errCode);
50     }
51     return CheckCorruptedStatus(errCode);
52 }
53 
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56     int errCode = E_OK;
57     if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
58         SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59         SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60         SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61     } else if (type == SingleVerDataType::SYNC_TYPE) {
62         SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63         SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64         SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65     }
66 
67     return CheckCorruptedStatus(errCode);
68 }
69 
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72     int errCode = E_OK;
73     SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74     SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75     SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76 
77     return CheckCorruptedStatus(errCode);
78 }
79 
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81     bool isNeedNotify, uint64_t recordVersion) const
82 {
83     // device name always hash string.
84     std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85 
86     Key hashKey;
87     int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88     if (errCode != E_OK) {
89         return errCode;
90     }
91 
92     DataItem dataItem;
93     dataItem.key = REMOVE_DEVICE_DATA_KEY;
94     dataItem.value = devVect;
95     if (isNeedNotify) {
96         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97     } else {
98         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99     }
100 
101     sqlite3_stmt *statement = nullptr;
102     std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103         INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105     if (errCode != E_OK) {
106         goto ERROR;
107     }
108 
109     errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110     if (errCode != E_OK) {
111         goto ERROR;
112     }
113 
114     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116         LOGE("Failed to execute rm the device synced data:%d", errCode);
117     } else {
118         errCode = E_OK;
119     }
120 
121 ERROR:
122     SQLiteUtils::ResetStatement(statement, true, errCode);
123     return CheckCorruptedStatus(errCode);
124 }
125 
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const126 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
127     std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
128 {
129     std::string sql;
130     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
131         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
132     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
133         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
134     } else {
135         return -E_INVALID_ARGS;
136     }
137 
138     sqlite3_stmt *statement = nullptr;
139     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
140     if (errCode != E_OK) {
141         LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
142         return CheckCorruptedStatus(errCode);
143     }
144 
145     errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
146     if (errCode != E_OK) {
147         LOGE("Failed to get all the data items by the min version:[%d]", errCode);
148     }
149 
150     SQLiteUtils::ResetStatement(statement, true, errCode);
151     return CheckCorruptedStatus(errCode);
152 }
153 
MigrateRmDevData(const DataItem & dataItem) const154 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
155 {
156     if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
157         LOGE("This item not means remove devices data, can not continue exe!");
158         return -E_INVALID_ARGS;
159     }
160 
161     std::string sql;
162     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
163         sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL : REMOVE_DEV_DATA_SQL;
164     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
165         sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL_FROM_CACHEHANDLE: REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
166     } else {
167         return -E_INVALID_ARGS;
168     }
169 
170     sqlite3_stmt *statement = nullptr;
171     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
172     if (errCode != E_OK) {
173         LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
174         return CheckCorruptedStatus(errCode);
175     }
176 
177     if (!dataItem.value.empty()) {
178         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
179         if (errCode != E_OK) {
180             LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
181             goto END;
182         }
183     }
184 
185     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
186     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
187         errCode = E_OK;
188     }
189 END:
190     SQLiteUtils::ResetStatement(statement, true, errCode);
191     return CheckCorruptedStatus(errCode);
192 }
193 
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)194 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
195     const std::string &attachDbAbsPath, EngineState engineState)
196 {
197     std::string attachAsName;
198     if (engineState == EngineState::MAINDB) {
199         attachAsName = "cache";
200     } else if (engineState == EngineState::CACHEDB)  {
201         attachAsName = "maindb";
202     } else if (engineState == EngineState::ATTACHING) {
203         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
204         return E_OK;
205     } else {
206         return -E_INVALID_ARGS;
207     }
208 
209     int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
210     if (errCode != E_OK) {
211         LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
212         return CheckCorruptedStatus(errCode);
213     }
214 
215     if (engineState == EngineState::MAINDB) {
216         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
217     } else if (engineState == EngineState::CACHEDB)  {
218         executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
219     } else {
220         return -E_INVALID_ARGS;
221     }
222     LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
223         static_cast<unsigned>(executorState_));
224     return errCode;
225 }
226 
GetMaxVersionInCacheDb(uint64_t & maxVersion) const227 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
228 {
229     sqlite3_stmt *statement = nullptr;
230     std::string sql;
231     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
232         sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
233     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
234         sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
235     } else {
236         return -E_INVALID_ARGS;
237     }
238 
239     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
240     if (errCode != E_OK) {
241         LOGE("GetStatement fail when get max version in cache db");
242         return CheckCorruptedStatus(errCode);
243     }
244 
245     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
246     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
247         maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
248         errCode = E_OK;
249     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
250         maxVersion = 0;
251         errCode = E_OK;
252     }
253     SQLiteUtils::ResetStatement(statement, true, errCode);
254     return CheckCorruptedStatus(errCode);
255 }
256 
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)257 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
258 {
259     // Put or delete. Prepare notify data here.
260     NotifyConflictAndObserverData notify;
261     notify.committedData = syncData.committedData;
262     int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
263         syncData.isPermitForceWrite);
264     if (errCode != E_OK) {
265         ResetForMigrateCacheData();
266         LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
267         return errCode;
268     }
269     // after solving conflict, the item should not be saved into mainDB
270     if (notify.dataStatus.isDefeated) {
271         LOGE("Data status is defeated:%d", errCode);
272         return errCode;
273     }
274     bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
275     sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
276     if (statement == nullptr) {
277         LOGE("GetStatement fail when put migrating-data to main! ");
278         return -E_INVALID_ARGS;
279     }
280 
281     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
282         errCode = EraseSyncData(dataItem.key);
283         goto END;
284     }
285 
286     errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
287     if (errCode != E_OK) {
288         goto END;
289     }
290 
291     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
292     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
293         errCode = E_OK;
294     } else {
295         LOGD("StepWithRetry fail when put migrating-data to main!");
296     }
297 END:
298     ResetForMigrateCacheData();
299     return errCode;
300 }
301 
CheckDataWithQuery(std::vector<DataItem> & dataItems)302 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
303 {
304     int errCode = E_OK;
305     sqlite3_stmt *stmt = nullptr;
306     for (auto &item : dataItems) {
307         if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
308             continue;
309         }
310         std::string sql;
311         DBCommon::VectorToString(item.value, sql);
312         if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
313             static const std::string SYNC_DATA_TABLE = "sync_data";
314             static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
315             std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
316             if (startPos != std::string::npos) {
317                 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
318             }
319         }
320         errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
321         if (errCode != E_OK) {
322             LOGE("Get Check miss query data statement failed. %d", errCode);
323             return errCode;
324         }
325 
326         errCode = CheckMissQueryDataItem(stmt, item.dev, item);
327         if (errCode != E_OK) {
328             LOGE("Check miss query data item failed. %d", errCode);
329             break;
330         }
331         SQLiteUtils::ResetStatement(stmt, true, errCode);
332     }
333     SQLiteUtils::ResetStatement(stmt, true, errCode);
334     return CheckCorruptedStatus(errCode);
335 }
336 
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)337 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
338 {
339     syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
340     syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
341         (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
342 
343     int errCode = CheckDataWithQuery(dataItems);
344     if (errCode != E_OK) {
345         LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
346         goto END;
347     }
348 
349     for (auto &item : dataItems) {
350         // Remove device data owns one version itself.
351         // Get entry here. Prepare notify data in storageEngine.
352         if (syncData.isRemoveDeviceData) {
353             errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
354             if (errCode != E_OK) {
355                 LOGE("Failed to get remove devices data");
356                 return errCode;
357             }
358             errCode = MigrateRmDevData(item);
359             LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
360             if (errCode != E_OK) {
361                 break;
362             }
363             continue;
364         }
365 
366         if (item.neglect) { // Do not save this record if it is neglected
367             continue;
368         }
369 
370         errCode = MigrateDataItem(item, syncData);
371         if (errCode != E_OK) {
372             LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
373             break;
374         }
375     }
376 END:
377     ResetForMigrateCacheData();
378     return CheckCorruptedStatus(errCode);
379 }
380 
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)381 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
382     std::vector<DataItem> &dataItems)
383 {
384     int errCode = StartTransaction(TransactType::IMMEDIATE);
385     if (errCode != E_OK) {
386         return errCode;
387     }
388 
389     // Init migrate data.
390     errCode = InitMigrateData();
391     if (errCode != E_OK) {
392         LOGE("Init migrate data failed, errCode = [%d]", errCode);
393         goto END;
394     }
395 
396     // fix dataItem timestamp for migrate
397     errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
398     if (errCode != E_OK) {
399         LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
400         goto END;
401     }
402 
403     errCode = MigrateDataItems(dataItems, syncData);
404     if (errCode != E_OK) {
405         goto END;
406     }
407 
408     // delete recordVersion data
409     errCode = DelCacheDbDataByVersion(recordVer);
410     if (errCode != E_OK) {
411         LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
412         goto END;
413     }
414 
415     errCode = Commit();
416     if (errCode != E_OK) {
417         LOGE("Commit data error and rollback, errCode = [%d]", errCode);
418         goto END;
419     }
420     return E_OK;
421 END:
422     Rollback();
423     return errCode;
424 }
425 
DelCacheDbDataByVersion(uint64_t version) const426 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
427 {
428     std::string sql;
429     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
430         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
431     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
432         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
433     } else {
434         return -E_INVALID_ARGS;
435     }
436 
437     sqlite3_stmt *statement = nullptr;
438     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
439     if (errCode != E_OK) {
440         LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
441         return errCode;
442     }
443 
444     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
445     if (errCode != E_OK) {
446         LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
447         goto END;
448     }
449 
450     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
451     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
452         errCode = E_OK;
453     }
454 
455 END:
456     SQLiteUtils::ResetStatement(statement, true, errCode);
457     return CheckCorruptedStatus(errCode);
458 }
459 
VacuumLocalData() const460 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
461 {
462     std::string sql;
463     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
464         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
465     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
466         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
467     } else {
468         return -E_INVALID_ARGS;
469     }
470 
471     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
472     if (errCode != E_OK) {
473         LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
474     }
475 
476     return CheckCorruptedStatus(errCode);
477 }
478 
479 // The local table data is only for local reading and writing, which can be sensed by itself.
480 // The current migration process does not provide callback subscription function.
MigrateLocalData()481 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
482 {
483     // Nick name "main" represent current database(dbhande) in sqlite grammar
484     std::string migrateLocaldataSql;
485     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
486         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
487     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
488         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
489     } else {
490         return -E_INVALID_ARGS;
491     }
492 
493     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
494     if (errCode != E_OK) {
495         LOGW("Failed to migrate the local data:%d", errCode);
496         return CheckCorruptedStatus(errCode);
497     }
498 
499     return VacuumLocalData();
500 }
501 
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const502 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
503     const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
504 {
505     int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
506     if (errCode != E_OK) {
507         LOGE("Bind saved sync data primary key failed:%d", errCode);
508         return errCode;
509     }
510 
511     // if delete flag is set, just use the hash key instead of the key
512     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
513         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
514     } else {
515         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
516     }
517 
518     if (errCode != E_OK) {
519         LOGE("Bind saved sync data key failed:%d", errCode);
520         return errCode;
521     }
522 
523     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
524     if (errCode != E_OK) {
525         LOGE("Bind saved sync data value failed:%d", errCode);
526         return errCode;
527     }
528 
529     LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
530         dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
531     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
532         static_cast<int64_t>(dataItem.flag));
533     if (errCode != E_OK) {
534         LOGE("Bind saved sync data flag failed:%d", errCode);
535         return errCode;
536     }
537     errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
538     if (errCode != E_OK) {
539         LOGE("Bind saved sync data time stamp failed:%d", errCode);
540         return errCode;
541     }
542     return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
543 }
544 
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const545 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
546     sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
547 {
548     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
549     if (errCode != E_OK) {
550         LOGE("Bind saved sync data hash key failed:%d", errCode);
551         return errCode;
552     }
553     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
554     if (errCode != E_OK) {
555         LOGE("Bind saved sync data version failed:%d", errCode);
556     }
557     return errCode;
558 }
559 
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const560 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
561     sqlite3_stmt *statement, const DataItem &dataItem) const
562 {
563     int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
564     if (errCode != E_OK) {
565         LOGE("Bind saved sync data stamp failed:%d", errCode);
566         return errCode;
567     }
568 
569     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
570     if (errCode != E_OK) {
571         LOGE("Bind saved sync data write stamp failed:%d", errCode);
572     }
573     return errCode;
574 }
575 
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const576 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
577     const std::string &origDev, const std::string &deviceName) const
578 {
579     std::string devName = DBCommon::TransferHashString(deviceName);
580     std::vector<uint8_t> devVect(devName.begin(), devName.end());
581     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
582     if (errCode != E_OK) {
583         LOGE("Bind dev for sync data failed:%d", errCode);
584         return errCode;
585     }
586 
587     std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
588     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
589     if (errCode != E_OK) {
590         LOGE("Bind orig dev for sync data failed:%d", errCode);
591     }
592     return errCode;
593 }
594 
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)595 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
596 {
597     int errCode = E_OK;
598     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
599 
600     std::string sql;
601     std::string expandedSql;
602     errCode = helper.GetSyncDataCheckSql(sql);
603     if (errCode != E_OK) {
604         LOGE("Get sync data check sql failed");
605         return errCode;
606     }
607     sqlite3_stmt *stmt = nullptr;
608     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
609     if (errCode != E_OK) {
610         LOGE("Get statement fail. %d", errCode);
611         return -E_INVALID_QUERY_FORMAT;
612     }
613 
614     errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
615     if (errCode != E_OK) {
616         goto END;
617     }
618 
619     errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
620     if (errCode != E_OK) {
621         LOGE("Get expand sql fail. %d", errCode);
622     }
623     DBCommon::StringToVector(expandedSql, dataItem.value);
624 END:
625     SQLiteUtils::ResetStatement(stmt, true, errCode);
626     return errCode;
627 }
628 
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)629 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
630     Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
631 {
632     Key hashKey;
633     int errCode = E_OK;
634     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
635         hashKey = dataItem.key;
636     } else {
637         errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
638         if (errCode != E_OK) {
639             return errCode;
640         }
641     }
642 
643     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
644         errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
645         if (errCode != E_OK) {
646             LOGE("Get sync data check sql failed. %d", errCode);
647             return errCode;
648         }
649     }
650 
651     std::string origDev = dataItem.origDev;
652     if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
653         origDev.clear();
654     }
655     dataItem.dev = deviceInfo.deviceName;
656     dataItem.origDev = origDev;
657     errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
658     if (errCode == E_OK) {
659         maxStamp = std::max(dataItem.timestamp, maxStamp);
660     } else {
661         LOGE("Save sync data to db failed:%d", errCode);
662     }
663     return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
664 }
665 
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const666 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
667     const Key &hashKey, uint64_t recordVersion) const
668 {
669     auto statement = saveSyncStatements_.GetDataSaveStatement(false);
670     if (statement == nullptr) {
671         return -E_INVALID_ARGS;
672     }
673     int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
674     if (errCode != E_OK) {
675         return errCode;
676     }
677 
678     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
679     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
680         errCode = E_OK;
681     }
682     return errCode;
683 }
684 
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const685 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
686 {
687     sqlite3_stmt *statement = nullptr;
688     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
689     if (errCode != E_OK) {
690         goto ERROR;
691     }
692 
693     errCode = BindLocalDataInCacheMode(statement, dataItem);
694     if (errCode != E_OK) {
695         goto ERROR;
696     }
697 
698     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
699     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
700         errCode = E_OK;
701     }
702 
703 ERROR:
704     SQLiteUtils::ResetStatement(statement, true, errCode);
705     return CheckCorruptedStatus(errCode);
706 }
707 
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const708 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
709     const LocalDataItem &dataItem) const
710 {
711     int errCode = SQLiteUtils::BindBlobToStatement(statement,
712         BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
713     if (errCode != E_OK) {
714         LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
715         return errCode;
716     }
717 
718     // if delete flag is set, just use the hash key instead of the key
719     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
720         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
721     } else {
722         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
723     }
724 
725     if (errCode != E_OK) {
726         LOGE("Bind saved sync data key failed:%d", errCode);
727         return errCode;
728     }
729 
730     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
731     if (errCode != E_OK) {
732         LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
733         return errCode;
734     }
735 
736     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
737     if (errCode != E_OK) {
738         LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
739         return errCode;
740     }
741 
742     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
743         static_cast<int64_t>(dataItem.flag));
744     if (errCode != E_OK) {
745         LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
746         return errCode;
747     }
748 
749     return E_OK;
750 }
751 
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)752 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
753     const DeviceInfo &deviceInfo, NotifyConflictAndObserverData &notify, bool isPermitForceWrite)
754 {
755     int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
756     if (errCode != E_OK) {
757         errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
758         if (errCode == -E_IGNORE_DATA) {
759             notify.dataStatus.isDefeated = true;
760             errCode = E_OK;
761         }
762         return errCode;
763     }
764 
765     // If delete data, the key is empty.
766     if (isSyncMigrating_ && dataItem.key.empty()) {
767         dataItem.key = notify.getData.key;
768     }
769 
770     PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
771     if (notify.dataStatus.isDefeated) {
772         LOGE("Data status is defeated:%d", errCode);
773         return ResetForMigrateCacheData();
774     }
775 
776     PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.committedData);
777     return ResetForMigrateCacheData();
778 }
779 
GetMinTimestampInCacheDB(Timestamp & minStamp) const780 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
781 {
782     if (dbHandle_ == nullptr) {
783         return E_OK;
784     }
785     std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
786         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
787         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
788     sqlite3_stmt *statement = nullptr;
789     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
790     if (errCode != E_OK) {
791         goto ERROR;
792     }
793 
794     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
795     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
796         minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
797         LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
798         errCode = E_OK;
799     } else {
800         LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
801     }
802 
803 ERROR:
804     SQLiteUtils::ResetStatement(statement, true, errCode);
805     return errCode;
806 }
807 
InitMigrateTimestampOffset()808 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
809 {
810     // Not first migrate, migrateTimeOffset_ has been set.
811     if (migrateTimeOffset_ != 0) {
812         return E_OK;
813     }
814 
815     // Get min timestamp of local data in sync_data, cacheDB.
816     Timestamp minTimeInCache = 0;
817     int errCode = GetMinTimestampInCacheDB(minTimeInCache);
818     if (errCode != E_OK) {
819         return errCode;
820     }
821 
822     // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
823     if (minTimeInCache == 0) {
824         migrateTimeOffset_ = -1;
825         LOGI("Time offset during migrating is -1.");
826         return E_OK;
827     }
828 
829     // Get max timestamp in mainDB.
830     Timestamp maxTimeInMain = 0;
831     InitCurrentMaxStamp(maxTimeInMain);
832 
833     // Get timestamp offset between mainDB and cacheDB.
834     // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
835     // the last data record in the original mainDB after the migration.
836     migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
837     LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
838         " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
839     return E_OK;
840 }
841 
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)842 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
843 {
844     if (dataItems.empty()) {
845         LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
846         return -E_INVALID_ARGS;
847     }
848 
849     // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
850     int errCode = InitMigrateTimestampOffset();
851     if (errCode != E_OK) {
852         return errCode;
853     }
854 
855     // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
856     Timestamp maxTimeInDataItems = 0;
857     for (auto &item : dataItems) {
858         item.timestamp -= migrateTimeOffset_;
859         maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
860     }
861 
862     // Update max timestamp in mainDB.
863     maxTimestampInMainDB_ = maxTimeInDataItems;
864     return E_OK;
865 }
866 
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const867 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
868     std::vector<Entry> &entries) const
869 {
870     // When removing device data, key is 'remove', value is device name.
871     if (item.key != REMOVE_DEVICE_DATA_KEY) {
872         LOGE("Invalid key. Can not notify remove device data.");
873         return -E_INVALID_ARGS;
874     }
875     if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
876         LOGI("No need to notify remove device data.");
877         return E_OK;
878     }
879     entries.clear();
880     std::string dev;
881     DBCommon::VectorToString(item.value, dev);
882     return GetAllSyncedEntries(dev, entries);
883 }
884 
InitMigrateData()885 int SQLiteSingleVerStorageExecutor::InitMigrateData()
886 {
887     // Sync_data already in migrating. Need not to init data.
888     if (isSyncMigrating_) {
889         return E_OK;
890     }
891     ClearMigrateData();
892     std::string querySQL;
893     std::string insertSQL;
894     std::string updateSQL;
895     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
896         querySQL = SELECT_SYNC_HASH_SQL;
897         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
898         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
899     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
900         querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
901         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
902         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
903     } else {
904         LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
905         return -E_INVALID_DB;
906     }
907     int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
908     if (errCode != E_OK) {
909         LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
910         return errCode;
911     }
912     isSyncMigrating_ = true;
913     return errCode;
914 }
915 
ClearMigrateData()916 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
917 {
918     // Reset data.
919     migrateTimeOffset_ = 0;
920     maxTimestampInMainDB_ = 0;
921 
922     // Reset statement.
923     int errCode = migrateSyncStatements_.ResetStatement();
924     if (errCode != E_OK) {
925         LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
926     }
927 
928     isSyncMigrating_ = false;
929 }
930 
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const931 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
932 {
933     if (maxTimestampInMainDB_ == 0) {
934         return -E_NOT_INIT;
935     }
936     maxTimestamp = maxTimestampInMainDB_;
937     return E_OK;
938 }
939 
DeleteMetaData(const std::vector<Key> & keys)940 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
941 {
942     sqlite3_stmt *statement = nullptr;
943     const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
944     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
945     if (errCode != E_OK) {
946         return errCode;
947     }
948 
949     for (const auto &key : keys) {
950         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
951         if (errCode != E_OK) {
952             break;
953         }
954 
955         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
956         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
957             break;
958         }
959         errCode = E_OK;
960         SQLiteUtils::ResetStatement(statement, false, errCode);
961     }
962 
963     SQLiteUtils::ResetStatement(statement, true, errCode);
964     return CheckCorruptedStatus(errCode);
965 }
966 
DeleteMetaDataByPrefixKey(const Key & keyPrefix)967 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
968 {
969     sqlite3_stmt *statement = nullptr;
970     const std::string sql = attachMetaMode_ ?
971         REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
972 
973     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
974     if (errCode != E_OK) {
975         return errCode;
976     }
977 
978     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
979     if (errCode == E_OK) {
980         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
981         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
982             errCode = E_OK;
983         }
984     }
985 
986     SQLiteUtils::ResetStatement(statement, true, errCode);
987     return CheckCorruptedStatus(errCode);
988 }
989 } // namespace DistributedDB
990