1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31 int errCode = -E_NOT_SUPPORT;
32 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
33 std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34 INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35 std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36 UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37 errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38 saveLocalStatements_);
39 } else if (type == SingleVerDataType::SYNC_TYPE) {
40 std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42 std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43 UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44 std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45 SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46 errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47 }
48 if (errCode != E_OK) {
49 LOGE("Prepare to save sync cache data failed:%d", errCode);
50 }
51 return CheckCorruptedStatus(errCode);
52 }
53
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56 int errCode = E_OK;
57 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
58 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61 } else if (type == SingleVerDataType::SYNC_TYPE) {
62 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65 }
66
67 return CheckCorruptedStatus(errCode);
68 }
69
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72 int errCode = E_OK;
73 SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74 SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75 SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76
77 return CheckCorruptedStatus(errCode);
78 }
79
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81 bool isNeedNotify, uint64_t recordVersion) const
82 {
83 // device name always hash string.
84 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85
86 Key hashKey;
87 int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88 if (errCode != E_OK) {
89 return errCode;
90 }
91
92 DataItem dataItem;
93 dataItem.key = REMOVE_DEVICE_DATA_KEY;
94 dataItem.value = devVect;
95 if (isNeedNotify) {
96 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97 } else {
98 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99 }
100
101 sqlite3_stmt *statement = nullptr;
102 std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105 if (errCode != E_OK) {
106 goto ERROR;
107 }
108
109 errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110 if (errCode != E_OK) {
111 goto ERROR;
112 }
113
114 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116 LOGE("Failed to execute rm the device synced data:%d", errCode);
117 } else {
118 errCode = E_OK;
119 }
120
121 ERROR:
122 SQLiteUtils::ResetStatement(statement, true, errCode);
123 return CheckCorruptedStatus(errCode);
124 }
125
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const126 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
127 std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
128 {
129 std::string sql;
130 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
131 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
132 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
133 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
134 } else {
135 return -E_INVALID_ARGS;
136 }
137
138 sqlite3_stmt *statement = nullptr;
139 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
140 if (errCode != E_OK) {
141 LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
142 return CheckCorruptedStatus(errCode);
143 }
144
145 errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
146 if (errCode != E_OK) {
147 LOGE("Failed to get all the data items by the min version:[%d]", errCode);
148 }
149
150 SQLiteUtils::ResetStatement(statement, true, errCode);
151 return CheckCorruptedStatus(errCode);
152 }
153
MigrateRmDevData(const DataItem & dataItem) const154 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
155 {
156 if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
157 LOGE("This item not means remove devices data, can not continue exe!");
158 return -E_INVALID_ARGS;
159 }
160
161 std::string sql;
162 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
163 sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL : REMOVE_DEV_DATA_SQL;
164 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
165 sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL_FROM_CACHEHANDLE: REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
166 } else {
167 return -E_INVALID_ARGS;
168 }
169
170 sqlite3_stmt *statement = nullptr;
171 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
172 if (errCode != E_OK) {
173 LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
174 return CheckCorruptedStatus(errCode);
175 }
176
177 if (!dataItem.value.empty()) {
178 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
179 if (errCode != E_OK) {
180 LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
181 goto END;
182 }
183 }
184
185 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
186 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
187 errCode = E_OK;
188 }
189 END:
190 SQLiteUtils::ResetStatement(statement, true, errCode);
191 return CheckCorruptedStatus(errCode);
192 }
193
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)194 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
195 const std::string &attachDbAbsPath, EngineState engineState)
196 {
197 std::string attachAsName;
198 if (engineState == EngineState::MAINDB) {
199 attachAsName = "cache";
200 } else if (engineState == EngineState::CACHEDB) {
201 attachAsName = "maindb";
202 } else if (engineState == EngineState::ATTACHING) {
203 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
204 return E_OK;
205 } else {
206 return -E_INVALID_ARGS;
207 }
208
209 int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
210 if (errCode != E_OK) {
211 LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
212 return CheckCorruptedStatus(errCode);
213 }
214
215 if (engineState == EngineState::MAINDB) {
216 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
217 } else if (engineState == EngineState::CACHEDB) {
218 executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
219 } else {
220 return -E_INVALID_ARGS;
221 }
222 LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
223 static_cast<unsigned>(executorState_));
224 return errCode;
225 }
226
GetMaxVersionInCacheDb(uint64_t & maxVersion) const227 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
228 {
229 sqlite3_stmt *statement = nullptr;
230 std::string sql;
231 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
232 sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
233 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
234 sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
235 } else {
236 return -E_INVALID_ARGS;
237 }
238
239 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
240 if (errCode != E_OK) {
241 LOGE("GetStatement fail when get max version in cache db");
242 return CheckCorruptedStatus(errCode);
243 }
244
245 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
246 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
247 maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
248 errCode = E_OK;
249 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
250 maxVersion = 0;
251 errCode = E_OK;
252 }
253 SQLiteUtils::ResetStatement(statement, true, errCode);
254 return CheckCorruptedStatus(errCode);
255 }
256
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)257 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
258 {
259 // Put or delete. Prepare notify data here.
260 NotifyConflictAndObserverData notify;
261 notify.committedData = syncData.committedData;
262 int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
263 syncData.isPermitForceWrite);
264 if (errCode != E_OK) {
265 ResetForMigrateCacheData();
266 LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
267 return errCode;
268 }
269 // after solving conflict, the item should not be saved into mainDB
270 if (notify.dataStatus.isDefeated) {
271 LOGE("Data status is defeated:%d", errCode);
272 return errCode;
273 }
274 bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
275 sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
276 if (statement == nullptr) {
277 LOGE("GetStatement fail when put migrating-data to main! ");
278 return -E_INVALID_ARGS;
279 }
280
281 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
282 errCode = EraseSyncData(dataItem.key);
283 goto END;
284 }
285
286 errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
287 if (errCode != E_OK) {
288 goto END;
289 }
290
291 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
292 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
293 errCode = E_OK;
294 } else {
295 LOGD("StepWithRetry fail when put migrating-data to main!");
296 }
297 END:
298 ResetForMigrateCacheData();
299 return errCode;
300 }
301
CheckDataWithQuery(std::vector<DataItem> & dataItems)302 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
303 {
304 int errCode = E_OK;
305 sqlite3_stmt *stmt = nullptr;
306 for (auto &item : dataItems) {
307 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
308 continue;
309 }
310 std::string sql;
311 DBCommon::VectorToString(item.value, sql);
312 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
313 static const std::string SYNC_DATA_TABLE = "sync_data";
314 static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
315 std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
316 if (startPos != std::string::npos) {
317 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
318 }
319 }
320 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
321 if (errCode != E_OK) {
322 LOGE("Get Check miss query data statement failed. %d", errCode);
323 return errCode;
324 }
325
326 errCode = CheckMissQueryDataItem(stmt, item.dev, item);
327 if (errCode != E_OK) {
328 LOGE("Check miss query data item failed. %d", errCode);
329 break;
330 }
331 SQLiteUtils::ResetStatement(stmt, true, errCode);
332 }
333 SQLiteUtils::ResetStatement(stmt, true, errCode);
334 return CheckCorruptedStatus(errCode);
335 }
336
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)337 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
338 {
339 syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
340 syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
341 (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
342
343 int errCode = CheckDataWithQuery(dataItems);
344 if (errCode != E_OK) {
345 LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
346 goto END;
347 }
348
349 for (auto &item : dataItems) {
350 // Remove device data owns one version itself.
351 // Get entry here. Prepare notify data in storageEngine.
352 if (syncData.isRemoveDeviceData) {
353 errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
354 if (errCode != E_OK) {
355 LOGE("Failed to get remove devices data");
356 return errCode;
357 }
358 errCode = MigrateRmDevData(item);
359 LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
360 if (errCode != E_OK) {
361 break;
362 }
363 continue;
364 }
365
366 if (item.neglect) { // Do not save this record if it is neglected
367 continue;
368 }
369
370 errCode = MigrateDataItem(item, syncData);
371 if (errCode != E_OK) {
372 LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
373 break;
374 }
375 }
376 END:
377 ResetForMigrateCacheData();
378 return CheckCorruptedStatus(errCode);
379 }
380
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)381 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
382 std::vector<DataItem> &dataItems)
383 {
384 int errCode = StartTransaction(TransactType::IMMEDIATE);
385 if (errCode != E_OK) {
386 return errCode;
387 }
388
389 // Init migrate data.
390 errCode = InitMigrateData();
391 if (errCode != E_OK) {
392 LOGE("Init migrate data failed, errCode = [%d]", errCode);
393 goto END;
394 }
395
396 // fix dataItem timestamp for migrate
397 errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
398 if (errCode != E_OK) {
399 LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
400 goto END;
401 }
402
403 errCode = MigrateDataItems(dataItems, syncData);
404 if (errCode != E_OK) {
405 goto END;
406 }
407
408 // delete recordVersion data
409 errCode = DelCacheDbDataByVersion(recordVer);
410 if (errCode != E_OK) {
411 LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
412 goto END;
413 }
414
415 errCode = Commit();
416 if (errCode != E_OK) {
417 LOGE("Commit data error and rollback, errCode = [%d]", errCode);
418 goto END;
419 }
420 return E_OK;
421 END:
422 Rollback();
423 return errCode;
424 }
425
DelCacheDbDataByVersion(uint64_t version) const426 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
427 {
428 std::string sql;
429 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
430 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
431 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
432 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
433 } else {
434 return -E_INVALID_ARGS;
435 }
436
437 sqlite3_stmt *statement = nullptr;
438 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
439 if (errCode != E_OK) {
440 LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
441 return errCode;
442 }
443
444 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
445 if (errCode != E_OK) {
446 LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
447 goto END;
448 }
449
450 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
451 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
452 errCode = E_OK;
453 }
454
455 END:
456 SQLiteUtils::ResetStatement(statement, true, errCode);
457 return CheckCorruptedStatus(errCode);
458 }
459
VacuumLocalData() const460 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
461 {
462 std::string sql;
463 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
464 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
465 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
466 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
467 } else {
468 return -E_INVALID_ARGS;
469 }
470
471 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
472 if (errCode != E_OK) {
473 LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
474 }
475
476 return CheckCorruptedStatus(errCode);
477 }
478
479 // The local table data is only for local reading and writing, which can be sensed by itself.
480 // The current migration process does not provide callback subscription function.
MigrateLocalData()481 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
482 {
483 // Nick name "main" represent current database(dbhande) in sqlite grammar
484 std::string migrateLocaldataSql;
485 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
486 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
487 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
488 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
489 } else {
490 return -E_INVALID_ARGS;
491 }
492
493 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
494 if (errCode != E_OK) {
495 LOGW("Failed to migrate the local data:%d", errCode);
496 return CheckCorruptedStatus(errCode);
497 }
498
499 return VacuumLocalData();
500 }
501
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const502 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
503 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
504 {
505 int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
506 if (errCode != E_OK) {
507 LOGE("Bind saved sync data primary key failed:%d", errCode);
508 return errCode;
509 }
510
511 // if delete flag is set, just use the hash key instead of the key
512 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
513 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
514 } else {
515 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
516 }
517
518 if (errCode != E_OK) {
519 LOGE("Bind saved sync data key failed:%d", errCode);
520 return errCode;
521 }
522
523 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
524 if (errCode != E_OK) {
525 LOGE("Bind saved sync data value failed:%d", errCode);
526 return errCode;
527 }
528
529 LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
530 dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
531 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
532 static_cast<int64_t>(dataItem.flag));
533 if (errCode != E_OK) {
534 LOGE("Bind saved sync data flag failed:%d", errCode);
535 return errCode;
536 }
537 errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
538 if (errCode != E_OK) {
539 LOGE("Bind saved sync data time stamp failed:%d", errCode);
540 return errCode;
541 }
542 return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
543 }
544
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const545 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
546 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
547 {
548 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
549 if (errCode != E_OK) {
550 LOGE("Bind saved sync data hash key failed:%d", errCode);
551 return errCode;
552 }
553 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
554 if (errCode != E_OK) {
555 LOGE("Bind saved sync data version failed:%d", errCode);
556 }
557 return errCode;
558 }
559
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const560 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
561 sqlite3_stmt *statement, const DataItem &dataItem) const
562 {
563 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
564 if (errCode != E_OK) {
565 LOGE("Bind saved sync data stamp failed:%d", errCode);
566 return errCode;
567 }
568
569 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
570 if (errCode != E_OK) {
571 LOGE("Bind saved sync data write stamp failed:%d", errCode);
572 }
573 return errCode;
574 }
575
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const576 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
577 const std::string &origDev, const std::string &deviceName) const
578 {
579 std::string devName = DBCommon::TransferHashString(deviceName);
580 std::vector<uint8_t> devVect(devName.begin(), devName.end());
581 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
582 if (errCode != E_OK) {
583 LOGE("Bind dev for sync data failed:%d", errCode);
584 return errCode;
585 }
586
587 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
588 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
589 if (errCode != E_OK) {
590 LOGE("Bind orig dev for sync data failed:%d", errCode);
591 }
592 return errCode;
593 }
594
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)595 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
596 {
597 int errCode = E_OK;
598 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
599
600 std::string sql;
601 std::string expandedSql;
602 errCode = helper.GetSyncDataCheckSql(sql);
603 if (errCode != E_OK) {
604 LOGE("Get sync data check sql failed");
605 return errCode;
606 }
607 sqlite3_stmt *stmt = nullptr;
608 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
609 if (errCode != E_OK) {
610 LOGE("Get statement fail. %d", errCode);
611 return -E_INVALID_QUERY_FORMAT;
612 }
613
614 errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
615 if (errCode != E_OK) {
616 goto END;
617 }
618
619 errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
620 if (errCode != E_OK) {
621 LOGE("Get expand sql fail. %d", errCode);
622 }
623 DBCommon::StringToVector(expandedSql, dataItem.value);
624 END:
625 SQLiteUtils::ResetStatement(stmt, true, errCode);
626 return errCode;
627 }
628
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)629 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
630 Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
631 {
632 Key hashKey;
633 int errCode = E_OK;
634 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
635 hashKey = dataItem.key;
636 } else {
637 errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
638 if (errCode != E_OK) {
639 return errCode;
640 }
641 }
642
643 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
644 errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
645 if (errCode != E_OK) {
646 LOGE("Get sync data check sql failed. %d", errCode);
647 return errCode;
648 }
649 }
650
651 std::string origDev = dataItem.origDev;
652 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
653 origDev.clear();
654 }
655 dataItem.dev = deviceInfo.deviceName;
656 dataItem.origDev = origDev;
657 errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
658 if (errCode == E_OK) {
659 maxStamp = std::max(dataItem.timestamp, maxStamp);
660 } else {
661 LOGE("Save sync data to db failed:%d", errCode);
662 }
663 return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
664 }
665
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const666 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
667 const Key &hashKey, uint64_t recordVersion) const
668 {
669 auto statement = saveSyncStatements_.GetDataSaveStatement(false);
670 if (statement == nullptr) {
671 return -E_INVALID_ARGS;
672 }
673 int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
674 if (errCode != E_OK) {
675 return errCode;
676 }
677
678 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
679 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
680 errCode = E_OK;
681 }
682 return errCode;
683 }
684
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const685 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
686 {
687 sqlite3_stmt *statement = nullptr;
688 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
689 if (errCode != E_OK) {
690 goto ERROR;
691 }
692
693 errCode = BindLocalDataInCacheMode(statement, dataItem);
694 if (errCode != E_OK) {
695 goto ERROR;
696 }
697
698 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
699 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
700 errCode = E_OK;
701 }
702
703 ERROR:
704 SQLiteUtils::ResetStatement(statement, true, errCode);
705 return CheckCorruptedStatus(errCode);
706 }
707
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const708 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
709 const LocalDataItem &dataItem) const
710 {
711 int errCode = SQLiteUtils::BindBlobToStatement(statement,
712 BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
713 if (errCode != E_OK) {
714 LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
715 return errCode;
716 }
717
718 // if delete flag is set, just use the hash key instead of the key
719 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
720 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
721 } else {
722 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
723 }
724
725 if (errCode != E_OK) {
726 LOGE("Bind saved sync data key failed:%d", errCode);
727 return errCode;
728 }
729
730 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
731 if (errCode != E_OK) {
732 LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
733 return errCode;
734 }
735
736 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
737 if (errCode != E_OK) {
738 LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
739 return errCode;
740 }
741
742 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
743 static_cast<int64_t>(dataItem.flag));
744 if (errCode != E_OK) {
745 LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
746 return errCode;
747 }
748
749 return E_OK;
750 }
751
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)752 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
753 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
754 {
755 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
756 if (errCode != E_OK) {
757 errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
758 if (errCode == -E_IGNORE_DATA) {
759 notify.dataStatus.isDefeated = true;
760 errCode = E_OK;
761 }
762 return errCode;
763 }
764
765 // If delete data, the key is empty.
766 if (isSyncMigrating_ && dataItem.key.empty()) {
767 dataItem.key = notify.getData.key;
768 }
769
770 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
771 if (notify.dataStatus.isDefeated) {
772 LOGE("Data status is defeated:%d", errCode);
773 return ResetForMigrateCacheData();
774 }
775
776 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.committedData);
777 return ResetForMigrateCacheData();
778 }
779
GetMinTimestampInCacheDB(Timestamp & minStamp) const780 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
781 {
782 if (dbHandle_ == nullptr) {
783 return E_OK;
784 }
785 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
786 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
787 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
788 sqlite3_stmt *statement = nullptr;
789 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
790 if (errCode != E_OK) {
791 goto ERROR;
792 }
793
794 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
795 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
796 minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
797 LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
798 errCode = E_OK;
799 } else {
800 LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
801 }
802
803 ERROR:
804 SQLiteUtils::ResetStatement(statement, true, errCode);
805 return errCode;
806 }
807
InitMigrateTimestampOffset()808 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
809 {
810 // Not first migrate, migrateTimeOffset_ has been set.
811 if (migrateTimeOffset_ != 0) {
812 return E_OK;
813 }
814
815 // Get min timestamp of local data in sync_data, cacheDB.
816 Timestamp minTimeInCache = 0;
817 int errCode = GetMinTimestampInCacheDB(minTimeInCache);
818 if (errCode != E_OK) {
819 return errCode;
820 }
821
822 // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
823 if (minTimeInCache == 0) {
824 migrateTimeOffset_ = -1;
825 LOGI("Time offset during migrating is -1.");
826 return E_OK;
827 }
828
829 // Get max timestamp in mainDB.
830 Timestamp maxTimeInMain = 0;
831 InitCurrentMaxStamp(maxTimeInMain);
832
833 // Get timestamp offset between mainDB and cacheDB.
834 // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
835 // the last data record in the original mainDB after the migration.
836 migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
837 LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
838 " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
839 return E_OK;
840 }
841
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)842 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
843 {
844 if (dataItems.empty()) {
845 LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
846 return -E_INVALID_ARGS;
847 }
848
849 // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
850 int errCode = InitMigrateTimestampOffset();
851 if (errCode != E_OK) {
852 return errCode;
853 }
854
855 // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
856 Timestamp maxTimeInDataItems = 0;
857 for (auto &item : dataItems) {
858 item.timestamp -= migrateTimeOffset_;
859 maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
860 }
861
862 // Update max timestamp in mainDB.
863 maxTimestampInMainDB_ = maxTimeInDataItems;
864 return E_OK;
865 }
866
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const867 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
868 std::vector<Entry> &entries) const
869 {
870 // When removing device data, key is 'remove', value is device name.
871 if (item.key != REMOVE_DEVICE_DATA_KEY) {
872 LOGE("Invalid key. Can not notify remove device data.");
873 return -E_INVALID_ARGS;
874 }
875 if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
876 LOGI("No need to notify remove device data.");
877 return E_OK;
878 }
879 entries.clear();
880 std::string dev;
881 DBCommon::VectorToString(item.value, dev);
882 return GetAllSyncedEntries(dev, entries);
883 }
884
InitMigrateData()885 int SQLiteSingleVerStorageExecutor::InitMigrateData()
886 {
887 // Sync_data already in migrating. Need not to init data.
888 if (isSyncMigrating_) {
889 return E_OK;
890 }
891 ClearMigrateData();
892 std::string querySQL;
893 std::string insertSQL;
894 std::string updateSQL;
895 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
896 querySQL = SELECT_SYNC_HASH_SQL;
897 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
898 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
899 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
900 querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
901 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
902 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
903 } else {
904 LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
905 return -E_INVALID_DB;
906 }
907 int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
908 if (errCode != E_OK) {
909 LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
910 return errCode;
911 }
912 isSyncMigrating_ = true;
913 return errCode;
914 }
915
ClearMigrateData()916 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
917 {
918 // Reset data.
919 migrateTimeOffset_ = 0;
920 maxTimestampInMainDB_ = 0;
921
922 // Reset statement.
923 int errCode = migrateSyncStatements_.ResetStatement();
924 if (errCode != E_OK) {
925 LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
926 }
927
928 isSyncMigrating_ = false;
929 }
930
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const931 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
932 {
933 if (maxTimestampInMainDB_ == 0) {
934 return -E_NOT_INIT;
935 }
936 maxTimestamp = maxTimestampInMainDB_;
937 return E_OK;
938 }
939
DeleteMetaData(const std::vector<Key> & keys)940 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
941 {
942 sqlite3_stmt *statement = nullptr;
943 const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
944 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
945 if (errCode != E_OK) {
946 return errCode;
947 }
948
949 for (const auto &key : keys) {
950 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
951 if (errCode != E_OK) {
952 break;
953 }
954
955 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
956 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
957 break;
958 }
959 errCode = E_OK;
960 SQLiteUtils::ResetStatement(statement, false, errCode);
961 }
962
963 SQLiteUtils::ResetStatement(statement, true, errCode);
964 return CheckCorruptedStatus(errCode);
965 }
966
DeleteMetaDataByPrefixKey(const Key & keyPrefix)967 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
968 {
969 sqlite3_stmt *statement = nullptr;
970 const std::string sql = attachMetaMode_ ?
971 REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
972
973 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
974 if (errCode != E_OK) {
975 return errCode;
976 }
977
978 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
979 if (errCode == E_OK) {
980 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
981 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
982 errCode = E_OK;
983 }
984 }
985
986 SQLiteUtils::ResetStatement(statement, true, errCode);
987 return CheckCorruptedStatus(errCode);
988 }
989 } // namespace DistributedDB
990