1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "cloud/cloud_store_types.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "log_print.h"
25 #include "log_table_manager_factory.h"
26 #include "parcel.h"
27 #include "platform_specific.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_single_ver_storage_executor_sql.h"
31
32 namespace DistributedDB {
33 namespace {
34
ResetOrRegetStmt(sqlite3 * db,sqlite3_stmt * & stmt,const std::string & sql)35 int ResetOrRegetStmt(sqlite3 *db, sqlite3_stmt *&stmt, const std::string &sql)
36 {
37 int errCode = E_OK;
38 SQLiteUtils::ResetStatement(stmt, false, errCode);
39 if (errCode != E_OK) {
40 LOGE("[ResetOrRegetStmt] reset stmt failed:%d.", errCode);
41 // Finish current statement and remade one
42 SQLiteUtils::ResetStatement(stmt, true, errCode);
43 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
44 if (errCode != E_OK) {
45 LOGE("[ResetOrRegetStmt] reget failed:%d.", errCode);
46 }
47 }
48 return errCode;
49 }
50
GetEntryFromStatement(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries)51 int GetEntryFromStatement(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries)
52 {
53 Entry entry;
54 int errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key);
55 if (errCode != E_OK) {
56 return errCode;
57 }
58 if (isGetValue) {
59 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value);
60 if (errCode != E_OK) {
61 return errCode;
62 }
63 }
64
65 entries.push_back(std::move(entry));
66 return errCode;
67 }
68 }
69
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb)70 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb)
71 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
72 getSyncStatement_(nullptr),
73 getResultRowIdStatement_(nullptr),
74 getResultEntryStatement_(nullptr),
75 isTransactionOpen_(false),
76 attachMetaMode_(false),
77 executorState_(ExecutorState::INVALID),
78 maxTimestampInMainDB_(0),
79 migrateTimeOffset_(0),
80 isSyncMigrating_(false),
81 conflictResolvePolicy_(DEFAULT_LAST_WIN)
82 {}
83
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb,ExecutorState executorState)84 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb,
85 ExecutorState executorState)
86 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
87 getSyncStatement_(nullptr),
88 getResultRowIdStatement_(nullptr),
89 getResultEntryStatement_(nullptr),
90 isTransactionOpen_(false),
91 attachMetaMode_(false),
92 executorState_(executorState),
93 maxTimestampInMainDB_(0),
94 migrateTimeOffset_(0),
95 isSyncMigrating_(false),
96 conflictResolvePolicy_(DEFAULT_LAST_WIN)
97 {}
98
~SQLiteSingleVerStorageExecutor()99 SQLiteSingleVerStorageExecutor::~SQLiteSingleVerStorageExecutor()
100 {
101 if (isTransactionOpen_) {
102 Rollback();
103 }
104 FinalizeAllStatements();
105 }
106
GetKvData(SingleVerDataType type,const Key & key,Value & value,Timestamp & timestamp) const107 int SQLiteSingleVerStorageExecutor::GetKvData(SingleVerDataType type, const Key &key, Value &value,
108 Timestamp ×tamp) const
109 {
110 std::string sql;
111 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
112 sql = SELECT_LOCAL_VALUE_TIMESTAMP_SQL;
113 } else if (type == SingleVerDataType::SYNC_TYPE) {
114 sql = SELECT_SYNC_VALUE_WTIMESTAMP_SQL;
115 } else if (type == SingleVerDataType::META_TYPE) {
116 if (attachMetaMode_) {
117 sql = SELECT_ATTACH_META_VALUE_SQL;
118 } else {
119 sql = SELECT_META_VALUE_SQL;
120 }
121 } else {
122 return -E_INVALID_ARGS;
123 }
124
125 sqlite3_stmt *statement = nullptr;
126 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
127 if (errCode != E_OK) {
128 goto END;
129 }
130
131 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
132 if (errCode != E_OK) {
133 goto END;
134 }
135
136 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
137 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
138 errCode = -E_NOT_FOUND;
139 goto END;
140 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
141 goto END;
142 }
143
144 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
145
146 // get timestamp
147 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
148 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_LOCAL_TIME_INDEX));
149 } else if (type == SingleVerDataType::SYNC_TYPE) {
150 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_SYNC_TIME_INDEX));
151 }
152
153 END:
154 SQLiteUtils::ResetStatement(statement, true, errCode);
155 return CheckCorruptedStatus(errCode);
156 }
157
BindPutKvData(sqlite3_stmt * statement,const Key & key,const Value & value,Timestamp timestamp,SingleVerDataType type)158 int SQLiteSingleVerStorageExecutor::BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value,
159 Timestamp timestamp, SingleVerDataType type)
160 {
161 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, key, false);
162 if (errCode != E_OK) {
163 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
164 return errCode;
165 }
166
167 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_VAL_INDEX, value, true);
168 if (errCode != E_OK) {
169 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
170 return errCode;
171 }
172
173 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
174 Key hashKey;
175 errCode = DBCommon::CalcValueHash(key, hashKey);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179
180 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_LOCAL_HASH_KEY_INDEX, hashKey, false);
181 if (errCode != E_OK) {
182 LOGE("[SingleVerExe][BindPutKv]Bind hash key error:%d", errCode);
183 return errCode;
184 }
185
186 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_LOCAL_TIMESTAMP_INDEX, timestamp);
187 if (errCode != E_OK) {
188 LOGE("[SingleVerExe][BindPutKv]Bind timestamp error:%d", errCode);
189 return errCode;
190 }
191 }
192 return E_OK;
193 }
194
GetKvDataByHashKey(const Key & hashKey,SingleVerRecord & result) const195 int SQLiteSingleVerStorageExecutor::GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const
196 {
197 sqlite3_stmt *statement = nullptr;
198 std::vector<uint8_t> devVect;
199 std::vector<uint8_t> origDevVect;
200 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_HASH_SQL, statement);
201 if (errCode != E_OK) {
202 goto END;
203 }
204
205 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // bind the first arg hashkey.
206 if (errCode != E_OK) {
207 goto END;
208 }
209
210 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
211 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
212 result.hashKey = hashKey;
213 result.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
214 result.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
215 result.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
216 // get key
217 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, result.key);
218 if (errCode != E_OK) {
219 goto END;
220 }
221 // get value
222 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, result.value);
223 if (errCode != E_OK) {
224 goto END;
225 }
226 // get device
227 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
228 if (errCode != E_OK) {
229 goto END;
230 }
231 result.device = std::string(devVect.begin(), devVect.end());
232 // get original device
233 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
234 if (errCode != E_OK) {
235 goto END;
236 }
237 result.origDevice = std::string(origDevVect.begin(), origDevVect.end());
238 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
239 errCode = -E_NOT_FOUND;
240 goto END;
241 }
242
243 END:
244 SQLiteUtils::ResetStatement(statement, true, errCode);
245 return CheckCorruptedStatus(errCode);
246 }
247
SaveKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp)248 int SQLiteSingleVerStorageExecutor::SaveKvData(SingleVerDataType type, const Key &key, const Value &value,
249 Timestamp timestamp)
250 {
251 sqlite3_stmt *statement = nullptr;
252 std::string sql;
253 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
254 sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ? INSERT_LOCAL_SQL_FROM_CACHEHANDLE :
255 INSERT_LOCAL_SQL);
256 } else {
257 sql = (attachMetaMode_ ? INSERT_ATTACH_META_SQL : INSERT_META_SQL);
258 }
259 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
260 if (errCode != E_OK) {
261 goto ERROR;
262 }
263
264 errCode = BindPutKvData(statement, key, value, timestamp, type);
265 if (errCode != E_OK) {
266 goto ERROR;
267 }
268
269 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
270 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
271 errCode = E_OK;
272 }
273
274 ERROR:
275 SQLiteUtils::ResetStatement(statement, true, errCode);
276 return CheckCorruptedStatus(errCode);
277 }
278
PutKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp,SingleVerNaturalStoreCommitNotifyData * committedData)279 int SQLiteSingleVerStorageExecutor::PutKvData(SingleVerDataType type, const Key &key, const Value &value,
280 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData)
281 {
282 if (type != SingleVerDataType::LOCAL_TYPE_SQLITE && type != SingleVerDataType::META_TYPE) {
283 return -E_INVALID_ARGS;
284 }
285 // committedData is only for local data, not for meta data.
286 bool isLocal = (SingleVerDataType::LOCAL_TYPE_SQLITE == type);
287 Timestamp localTimestamp = 0;
288 Value readValue;
289 bool isExisted = CheckIfKeyExisted(key, isLocal, readValue, localTimestamp);
290 if (isLocal && committedData != nullptr) {
291 ExistStatus existedStatus = isExisted ? ExistStatus::EXIST : ExistStatus::NONE;
292 Key hashKey;
293 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
294 if (innerErrCode != E_OK) {
295 return innerErrCode;
296 }
297 committedData->InitKeyPropRecord(hashKey, existedStatus);
298 }
299 int errCode = SaveKvData(type, key, value, timestamp);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303
304 if (isLocal && committedData != nullptr) {
305 Entry entry = {key, value};
306 committedData->InsertCommittedData(std::move(entry), isExisted ? DataType::UPDATE : DataType::INSERT, true);
307 }
308 return E_OK;
309 }
310
GetEntries(bool isGetValue,SingleVerDataType type,const Key & keyPrefix,std::vector<Entry> & entries) const311 int SQLiteSingleVerStorageExecutor::GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix,
312 std::vector<Entry> &entries) const
313 {
314 if ((type != SingleVerDataType::LOCAL_TYPE_SQLITE) && (type != SingleVerDataType::SYNC_TYPE)) {
315 return -E_INVALID_ARGS;
316 }
317
318 std::string sql;
319 if (type == SingleVerDataType::SYNC_TYPE) {
320 sql = isGetValue ? SELECT_SYNC_PREFIX_SQL : SELECT_SYNC_KEY_PREFIX_SQL;
321 } else {
322 sql = SELECT_LOCAL_PREFIX_SQL;
323 }
324 sqlite3_stmt *statement = nullptr;
325 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
326 if (errCode != E_OK) {
327 goto END;
328 }
329
330 // bind the prefix key for the first and second args.
331 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // first argument is key
332 if (errCode != E_OK) {
333 goto END;
334 }
335
336 errCode = StepForResultEntries(isGetValue, statement, entries);
337
338 END:
339 SQLiteUtils::ResetStatement(statement, true, errCode);
340 return CheckCorruptedStatus(errCode);
341 }
342
GetEntries(QueryObject & queryObj,std::vector<Entry> & entries) const343 int SQLiteSingleVerStorageExecutor::GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const
344 {
345 int errCode = E_OK;
346 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
347 if (errCode != E_OK) {
348 return errCode;
349 }
350
351 sqlite3_stmt *statement = nullptr;
352 errCode = helper.GetQuerySqlStatement(dbHandle_, false, statement);
353 if (errCode == E_OK) {
354 errCode = StepForResultEntries(true, statement, entries);
355 }
356
357 SQLiteUtils::ResetStatement(statement, true, errCode);
358 return CheckCorruptedStatus(errCode);
359 }
360
GetCount(QueryObject & queryObj,int & count) const361 int SQLiteSingleVerStorageExecutor::GetCount(QueryObject &queryObj, int &count) const
362 {
363 if (dbHandle_ == nullptr) {
364 return -E_INVALID_DB;
365 }
366
367 int errCode = E_OK;
368 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
369 if (errCode != E_OK) {
370 return errCode;
371 }
372
373 if (!queryObj.IsCountValid()) {
374 LOGE("GetCount no need limit or orderby");
375 return -E_INVALID_QUERY_FORMAT;
376 }
377
378 std::string countSql;
379 errCode = helper.GetCountQuerySql(countSql);
380 if (errCode != E_OK) {
381 return errCode;
382 }
383
384 sqlite3_stmt *countStatement = nullptr;
385 // get statement for count
386 errCode = helper.GetQuerySqlStatement(dbHandle_, countSql, countStatement);
387 if (errCode != E_OK) {
388 LOGE("Get count bind statement error:%d", errCode);
389 goto END;
390 }
391 // get count value
392 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
393 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
394 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
395 if (readCount > INT32_MAX) {
396 LOGW("total count is beyond the max count");
397 count = 0;
398 errCode = -E_UNEXPECTED_DATA;
399 } else {
400 count = static_cast<int>(readCount);
401 errCode = E_OK;
402 }
403 LOGD("Entry count in this result set is %d", count);
404 } else {
405 errCode = -E_UNEXPECTED_DATA;
406 }
407
408 END:
409 SQLiteUtils::ResetStatement(countStatement, true, errCode);
410 return CheckCorruptedStatus(errCode);
411 }
412
InitCurrentMaxStamp(Timestamp & maxStamp)413 void SQLiteSingleVerStorageExecutor::InitCurrentMaxStamp(Timestamp &maxStamp)
414 {
415 if (dbHandle_ == nullptr) {
416 return;
417 }
418 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
419 SELECT_MAX_TIMESTAMP_SQL_FROM_CACHEHANDLE : SELECT_MAX_TIMESTAMP_SQL);
420 sqlite3_stmt *statement = nullptr;
421 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
422 if (errCode != E_OK) {
423 return;
424 }
425
426 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
427 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
428 maxStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
429 }
430 SQLiteUtils::ResetStatement(statement, true, errCode);
431 }
432
PrepareForSyncDataByTime(Timestamp begin,Timestamp end,sqlite3_stmt * & statement,bool getDeletedData) const433 int SQLiteSingleVerStorageExecutor::PrepareForSyncDataByTime(Timestamp begin, Timestamp end,
434 sqlite3_stmt *&statement, bool getDeletedData) const
435 {
436 if (dbHandle_ == nullptr) {
437 return -E_INVALID_DB;
438 }
439
440 const std::string sql = (getDeletedData ? SELECT_SYNC_DELETED_ENTRIES_SQL : SELECT_SYNC_ENTRIES_SQL);
441 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
442 if (errCode != E_OK) {
443 LOGE("Prepare the sync entries statement error:%d", errCode);
444 return errCode;
445 }
446
447 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_BEGIN_STAMP_INDEX, begin);
448 if (errCode != E_OK) {
449 goto ERROR;
450 }
451
452 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_END_STAMP_INDEX, end);
453
454 ERROR:
455 if (errCode != E_OK) {
456 LOGE("Bind the timestamp for getting sync data error:%d", errCode);
457 SQLiteUtils::ResetStatement(statement, true, errCode);
458 }
459
460 return CheckCorruptedStatus(errCode);
461 }
462
ReleaseContinueStatement()463 void SQLiteSingleVerStorageExecutor::ReleaseContinueStatement()
464 {
465 if (getSyncStatement_ != nullptr) {
466 int errCode = E_OK;
467 SQLiteUtils::ResetStatement(getSyncStatement_, true, errCode);
468 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
469 SetCorruptedStatus();
470 }
471 }
472 }
473
474 namespace {
GetDataItemForSync(sqlite3_stmt * statement,DataItem & dataItem)475 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem)
476 {
477 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
478 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
479 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
480 dataItem.flag &= (~DataItem::LOCAL_FLAG);
481 std::vector<uint8_t> devVect;
482 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
483 if (errCode != E_OK) {
484 return errCode;
485 }
486 dataItem.origDev = std::string(devVect.begin(), devVect.end());
487 int keyIndex = SYNC_RES_KEY_INDEX;
488 // If the data has been deleted, just use the hash key for sync.
489 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
490 keyIndex = SYNC_RES_HASH_KEY_INDEX;
491 }
492 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, dataItem.key);
493 if (errCode != E_OK) {
494 return errCode;
495 }
496 return SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
497 }
498 }
499
GetSyncDataItems(std::vector<DataItem> & dataItems,sqlite3_stmt * statement,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo) const500 int SQLiteSingleVerStorageExecutor::GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
501 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const
502 {
503 int errCode;
504 size_t dataTotalSize = 0;
505 do {
506 DataItem dataItem;
507 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
508 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
509 errCode = GetDataItemForSync(statement, dataItem);
510 if (errCode != E_OK) {
511 LOGE("GetDataItemForSync failed:%d", errCode);
512 return errCode;
513 }
514 } else {
515 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
516 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
517 errCode = -E_FINISHED;
518 } else {
519 LOGE("Get sync data error:%d", errCode);
520 }
521 break;
522 }
523
524 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
525 dataTotalSize += GetDataItemSerialSize(dataItem, appendLength);
526 if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) ||
527 dataItems.size() >= dataSizeInfo.packetSize) {
528 errCode = -E_UNFINISHED;
529 break;
530 } else {
531 dataItems.push_back(std::move(dataItem));
532 }
533 } while (true);
534 return errCode;
535 }
536
GetSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const537 int SQLiteSingleVerStorageExecutor::GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
538 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
539 {
540 sqlite3_stmt *statement = nullptr;
541 int errCode = PrepareForSyncDataByTime(begin, end, statement);
542 if (errCode != E_OK) {
543 return errCode;
544 }
545
546 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
547 SQLiteUtils::ResetStatement(statement, true, errCode);
548 return CheckCorruptedStatus(errCode);
549 }
550
GetDeletedSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const551 int SQLiteSingleVerStorageExecutor::GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
552 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
553 {
554 sqlite3_stmt *statement = nullptr;
555 int errCode = PrepareForSyncDataByTime(begin, end, statement, true);
556 if (errCode != E_OK) {
557 return errCode;
558 }
559
560 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
561 SQLiteUtils::ResetStatement(statement, true, errCode);
562 return CheckCorruptedStatus(errCode);
563 }
564
565 namespace {
AppendDataItem(std::vector<DataItem> & dataItems,const DataItem & item,size_t & dataTotalSize,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo)566 int AppendDataItem(std::vector<DataItem> &dataItems, const DataItem &item, size_t &dataTotalSize, size_t appendLength,
567 const DataSizeSpecInfo &dataSizeInfo)
568 {
569 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
570 size_t appendSize = dataTotalSize + SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength);
571 if ((appendSize > dataSizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= dataSizeInfo.packetSize) {
572 return -E_UNFINISHED;
573 }
574 dataItems.push_back(item);
575 dataTotalSize = appendSize;
576 return E_OK;
577 }
578
GetFullDataStatement(sqlite3 * db,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)579 int GetFullDataStatement(sqlite3 *db, const std::pair<Timestamp, Timestamp> &timeRange, sqlite3_stmt *&stmt)
580 {
581 int errCode = SQLiteUtils::GetStatement(db, SELECT_SYNC_MODIFY_SQL, stmt);
582 if (errCode != E_OK) {
583 LOGE("Get statement failed. %d", errCode);
584 return errCode;
585 }
586 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange.first); // 1 : Bind time rang index start
587 if (errCode != E_OK) {
588 LOGE("Bind time range to statement failed. %d", errCode);
589 goto ERR;
590 }
591 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange.second); // 2 : Bind time rang index end
592 if (errCode != E_OK) {
593 LOGE("Bind time range to statement failed. %d", errCode);
594 goto ERR;
595 }
596 return E_OK; // do not release statement when success
597 ERR:
598 SQLiteUtils::ResetStatement(stmt, true, errCode);
599 return errCode;
600 }
601
GetQueryDataStatement(sqlite3 * db,QueryObject query,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)602 int GetQueryDataStatement(sqlite3 *db, QueryObject query, const std::pair<Timestamp, Timestamp> &timeRange,
603 sqlite3_stmt *&stmt)
604 {
605 int errCode = E_OK;
606 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
607 if (errCode != E_OK) {
608 return errCode;
609 }
610 return helper.GetQuerySyncStatement(db, timeRange.first, timeRange.second, stmt);
611 }
612
GetNextDataItem(sqlite3_stmt * stmt,bool isMemDB,DataItem & item)613 int GetNextDataItem(sqlite3_stmt *stmt, bool isMemDB, DataItem &item)
614 {
615 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
616 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
617 errCode = GetDataItemForSync(stmt, item);
618 }
619 return errCode;
620 }
621 }
622
GetSyncDataWithQuery(const QueryObject & query,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,const std::pair<Timestamp,Timestamp> & timeRange,std::vector<DataItem> & dataItems) const623 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(const QueryObject &query, size_t appendLength,
624 const DataSizeSpecInfo &dataSizeInfo, const std::pair<Timestamp, Timestamp> &timeRange,
625 std::vector<DataItem> &dataItems) const
626 {
627 sqlite3_stmt *fullStmt = nullptr; // statement for get all modified data in the time range
628 sqlite3_stmt *queryStmt = nullptr; // statement for get modified data which is matched query in the time range
629 int errCode = GetQueryDataStatement(dbHandle_, query, timeRange, queryStmt);
630 if (errCode != E_OK) {
631 LOGE("Get query matched data statement failed. %d", errCode);
632 goto END;
633 }
634 if (query.IsQueryOnlyByKey()) {
635 // Query sync by prefixKey only should not deal with REMOTE_DEVICE_DATA_MISS_QUERY. Get the data directly.
636 errCode = GetSyncDataItems(dataItems, queryStmt, appendLength, dataSizeInfo);
637 goto END;
638 }
639 errCode = GetFullDataStatement(dbHandle_, timeRange, fullStmt);
640 if (errCode != E_OK) {
641 LOGE("Get full changed data statement failed. %d", errCode);
642 goto END;
643 }
644 errCode = GetSyncDataWithQuery(fullStmt, queryStmt, appendLength, dataSizeInfo, dataItems);
645 if (errCode != E_OK && errCode != -E_UNFINISHED && errCode != -E_FINISHED) {
646 LOGE("Get sync data with query failed. %d", errCode);
647 }
648 END:
649 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
650 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
651 return CheckCorruptedStatus(errCode);
652 }
653
GetSyncDataWithQuery(sqlite3_stmt * fullStmt,sqlite3_stmt * queryStmt,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,std::vector<DataItem> & dataItems) const654 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
655 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const
656 {
657 int errCode = E_OK;
658 size_t dataTotalSize = 0;
659 DataItem fullItem;
660 DataItem matchItem;
661 bool isFullItemFinished = false;
662 bool isMatchItemFinished = false;
663 while (!isFullItemFinished || !isMatchItemFinished) {
664 errCode = GetNextDataItem(queryStmt, isMemDb_, matchItem);
665 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // query finished
666 isMatchItemFinished = true;
667 } else if (errCode != E_OK) { // step failed or get data failed
668 LOGE("Get next query matched data failed. %d", errCode);
669 return errCode;
670 }
671 while (!isFullItemFinished) {
672 errCode = GetNextDataItem(fullStmt, isMemDb_, fullItem);
673 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // queryStmt is a subset of fullStmt
674 isFullItemFinished = true;
675 break;
676 } else if (errCode != E_OK) { // step failed or get data failed
677 LOGE("Get next changed data failed. %d", errCode);
678 return errCode;
679 }
680 bool matchData = true;
681 if (isMatchItemFinished || matchItem.key != fullItem.key) {
682 matchData = false; // got miss query data
683 DBCommon::CalcValueHash(fullItem.key, fullItem.key); // set and send key with hash_key
684 Value().swap(fullItem.value); // not send value when data miss query
685 fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; // mark with miss query flag
686 }
687 errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo);
688 if (errCode == -E_UNFINISHED) {
689 goto END;
690 }
691 if (matchData) {
692 break; // step to next match data
693 }
694 }
695 }
696 END:
697 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
698 return (isFullItemFinished && isMatchItemFinished) ? -E_FINISHED : errCode;
699 }
700
OpenResultSet(const Key & keyPrefix,int & count)701 int SQLiteSingleVerStorageExecutor::OpenResultSet(const Key &keyPrefix, int &count)
702 {
703 sqlite3_stmt *countStatement = nullptr;
704 if (InitResultSet(keyPrefix, countStatement) != E_OK) {
705 LOGE("Initialize result set stat failed.");
706 return -E_INVALID_DB;
707 }
708
709 int errCode = StartTransaction(TransactType::DEFERRED);
710 if (errCode != E_OK) {
711 goto END;
712 }
713
714 // get count value
715 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
716 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
717 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
718 if (readCount > INT32_MAX) {
719 LOGW("total count is beyond the max count");
720 count = 0;
721 errCode = -E_UNEXPECTED_DATA;
722 } else {
723 count = static_cast<int>(readCount);
724 errCode = E_OK;
725 }
726 LOGD("Entry count in this result set is %d", count);
727 } else {
728 errCode = -E_UNEXPECTED_DATA;
729 }
730
731 END:
732 SQLiteUtils::ResetStatement(countStatement, true, errCode);
733 if (errCode != E_OK) {
734 CloseResultSet();
735 }
736 return CheckCorruptedStatus(errCode);
737 }
738
OpenResultSet(QueryObject & queryObj,int & count)739 int SQLiteSingleVerStorageExecutor::OpenResultSet(QueryObject &queryObj, int &count)
740 {
741 sqlite3_stmt *countStatement = nullptr;
742 int errCode = InitResultSet(queryObj, countStatement);
743 if (errCode != E_OK) {
744 LOGE("Initialize result set stat failed.");
745 return errCode;
746 }
747
748 errCode = StartTransaction(TransactType::DEFERRED);
749 if (errCode != E_OK) {
750 goto END;
751 }
752
753 // get count value
754 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
755 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
756 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
757 if (queryObj.HasLimit()) {
758 int limit = 0;
759 int offset = 0;
760 queryObj.GetLimitVal(limit, offset);
761 offset = (offset < 0) ? 0 : offset;
762 limit = (limit < 0) ? 0 : limit;
763 if (readCount <= static_cast<uint64_t>(offset)) {
764 readCount = 0;
765 } else {
766 readCount = std::min(readCount - offset, static_cast<uint64_t>(limit));
767 }
768 }
769
770 if (readCount > INT32_MAX) {
771 LOGW("total count is beyond the max count");
772 count = 0;
773 errCode = -E_UNEXPECTED_DATA;
774 } else {
775 count = static_cast<int>(readCount);
776 errCode = E_OK;
777 }
778 LOGD("Entry count in this result set is %d", count);
779 } else {
780 errCode = -E_UNEXPECTED_DATA;
781 }
782
783 END:
784 SQLiteUtils::ResetStatement(countStatement, true, errCode);
785 if (errCode != E_OK) {
786 CloseResultSet();
787 }
788 return CheckCorruptedStatus(errCode);
789 }
790
OpenResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)791 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(const Key &keyPrefix,
792 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
793 {
794 if (dbHandle_ == nullptr) {
795 return -E_INVALID_DB;
796 }
797 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
798 if (errCode != E_OK) {
799 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Get rowId stmt fail, errCode=%d", errCode);
800 return CheckCorruptedStatus(errCode);
801 }
802 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument index is 1
803 if (errCode != E_OK) {
804 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Bind rowid stmt fail, errCode=%d", errCode);
805 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
806 return CheckCorruptedStatus(errCode);
807 }
808 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
809 if (errCode != E_OK) {
810 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
811 }
812 return errCode;
813 }
814
OpenResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)815 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(QueryObject &queryObj,
816 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
817 {
818 if (dbHandle_ == nullptr) {
819 return -E_INVALID_DB;
820 }
821
822 int errCode = E_OK;
823 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
824 if (errCode != E_OK) {
825 return errCode;
826 }
827
828 if (!queryObj.IsValid()) {
829 LOGE("[SqlSinExe][OpenResSetRowId][Query] query object not Valid");
830 return -E_INVALID_QUERY_FORMAT;
831 }
832
833 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
834 if (errCode != E_OK) {
835 LOGE("[SqlSinExe][OpenResSetRowId][Query] Get Stmt fail, errCode=%d", errCode);
836 // The GetQuerySqlStatement does not self rollback(BAD...), so we have to reset the stmt here.
837 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
838 return errCode;
839 }
840 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
841 if (errCode != E_OK) {
842 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
843 }
844 return errCode;
845 }
846
ReloadResultSet(const Key & keyPrefix)847 int SQLiteSingleVerStorageExecutor::ReloadResultSet(const Key &keyPrefix)
848 {
849 int errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, SELECT_SYNC_ROWID_PREFIX_SQL);
850 if (errCode != E_OK) {
851 return CheckCorruptedStatus(errCode);
852 }
853
854 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
855 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
856 if (errCode != E_OK) {
857 LOGE("Rebind result set rowid statement of keyPrefix error:%d", errCode);
858 return CheckCorruptedStatus(errCode);
859 }
860 return E_OK;
861 }
862
ReloadResultSet(QueryObject & queryObj)863 int SQLiteSingleVerStorageExecutor::ReloadResultSet(QueryObject &queryObj)
864 {
865 int errCode = E_OK;
866 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
867 if (errCode != E_OK) {
868 return errCode;
869 }
870
871 if (!queryObj.IsValid()) {
872 return -E_INVALID_QUERY_FORMAT;
873 }
874
875 std::string sql;
876 errCode = helper.GetQuerySql(sql, true); // only rowid sql
877 if (errCode != E_OK) {
878 return errCode;
879 }
880
881 errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, sql);
882 if (errCode != E_OK) {
883 return CheckCorruptedStatus(errCode);
884 }
885
886 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
887 // GetQuerySqlStatement will not alter getResultRowIdStatement_ if it is not null
888 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
889 if (errCode != E_OK) {
890 LOGE("Rebind result set rowid statement of query error:%d", errCode);
891 return CheckCorruptedStatus(errCode);
892 }
893 return E_OK;
894 }
895
ReloadResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)896 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(const Key &keyPrefix,
897 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
898 {
899 int errCode = ReloadResultSet(keyPrefix); // Reuse this function(A convenience)
900 if (errCode != E_OK) {
901 return errCode;
902 }
903 int count = 0; // Ignored
904 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
905 if (errCode != E_OK) {
906 LOGE("[SqlSinExe][ReloadResSet][KeyPrefix] Load fail, errCode=%d", errCode);
907 }
908 // We can just return, no need to reset the statement
909 return errCode;
910 }
911
ReloadResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)912 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(QueryObject &queryObj,
913 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
914 {
915 int errCode = ReloadResultSet(queryObj); // Reuse this function(A convenience)
916 if (errCode != E_OK) {
917 return errCode;
918 }
919 int count = 0; // Ignored
920 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
921 if (errCode != E_OK) {
922 LOGE("[SqlSinExe][ReloadResSet][Query] Load fail, errCode=%d", errCode);
923 }
924 // We can just return, no need to reset the statement
925 return errCode;
926 }
927
GetNextEntryFromResultSet(Key & key,Value & value,bool isCopy)928 int SQLiteSingleVerStorageExecutor::GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy)
929 {
930 if (getResultRowIdStatement_ == nullptr || getResultEntryStatement_ == nullptr) {
931 return -E_RESULT_SET_STATUS_INVALID;
932 }
933
934 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
935 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
936 if (!isCopy) {
937 return E_OK;
938 }
939 int64_t rowId = sqlite3_column_int64(getResultRowIdStatement_, 0);
940 errCode = E_OK;
941 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
942 if (errCode != E_OK) {
943 LOGE("[SqlSinExe][GetNext] Reset result set entry statement fail, errCode=%d.", errCode);
944 return CheckCorruptedStatus(errCode);
945 }
946
947 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
948 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
949 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
950 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, key);
951 if (errCode != E_OK) {
952 LOGE("[SqlSinExe][GetNext] Get key failed:%d", errCode);
953 return CheckCorruptedStatus(errCode);
954 }
955 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, value);
956 if (errCode != E_OK) {
957 LOGE("[SqlSinExe][GetNext] Get value failed:%d", errCode);
958 return CheckCorruptedStatus(errCode);
959 }
960 return E_OK;
961 } else {
962 return -E_UNEXPECTED_DATA;
963 }
964 }
965 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
966 return -E_FINISHED;
967 }
968
969 LOGE("SQLite step failed:%d", errCode);
970 return CheckCorruptedStatus(errCode);
971 }
972
GetEntryByRowId(int64_t rowId,Entry & entry)973 int SQLiteSingleVerStorageExecutor::GetEntryByRowId(int64_t rowId, Entry &entry)
974 {
975 if (getResultEntryStatement_ == nullptr) {
976 return -E_RESULT_SET_STATUS_INVALID;
977 }
978 int errCode = E_OK;
979 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
980 if (errCode != E_OK) {
981 LOGE("[SqlSinExe][GetEntryByRowid] Reset result set entry statement fail, errCode=%d.", errCode);
982 return CheckCorruptedStatus(errCode);
983 }
984 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
985 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
986 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
987 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, entry.key);
988 if (errCode != E_OK) {
989 LOGE("[SqlSinExe][GetEntryByRowid] Get key failed, errCode=%d.", errCode);
990 return CheckCorruptedStatus(errCode);
991 }
992 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, entry.value);
993 if (errCode != E_OK) {
994 LOGE("[SqlSinExe][GetEntryByRowid] Get value failed, errCode=%d.", errCode);
995 return CheckCorruptedStatus(errCode);
996 }
997 return E_OK;
998 } else {
999 LOGE("[SqlSinExe][GetEntryByRowid] Step failed, errCode=%d.", errCode);
1000 return -E_UNEXPECTED_DATA;
1001 }
1002 }
1003
CloseResultSet()1004 void SQLiteSingleVerStorageExecutor::CloseResultSet()
1005 {
1006 int errCode = E_OK;
1007 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1008 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1009 SetCorruptedStatus();
1010 }
1011 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1012 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1013 SetCorruptedStatus();
1014 }
1015 if (isTransactionOpen_) {
1016 SQLiteUtils::RollbackTransaction(dbHandle_);
1017 isTransactionOpen_ = false;
1018 }
1019 }
1020
StartTransaction(TransactType type)1021 int SQLiteSingleVerStorageExecutor::StartTransaction(TransactType type)
1022 {
1023 if (dbHandle_ == nullptr) {
1024 LOGE("Begin transaction failed, dbHandle is null.");
1025 return -E_INVALID_DB;
1026 }
1027 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
1028 if (errCode == E_OK) {
1029 isTransactionOpen_ = true;
1030 } else {
1031 LOGE("Begin transaction failed, errCode = %d", errCode);
1032 }
1033 return CheckCorruptedStatus(errCode);
1034 }
1035
Commit()1036 int SQLiteSingleVerStorageExecutor::Commit()
1037 {
1038 if (dbHandle_ == nullptr) {
1039 return -E_INVALID_DB;
1040 }
1041 int errCode = SQLiteUtils::CommitTransaction(dbHandle_);
1042 if (errCode != E_OK) {
1043 return CheckCorruptedStatus(errCode);
1044 }
1045 isTransactionOpen_ = false;
1046 return E_OK;
1047 }
1048
Rollback()1049 int SQLiteSingleVerStorageExecutor::Rollback()
1050 {
1051 if (dbHandle_ == nullptr) {
1052 return -E_INVALID_DB;
1053 }
1054 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
1055 if (errCode != E_OK) {
1056 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
1057 return CheckCorruptedStatus(errCode);
1058 }
1059 isTransactionOpen_ = false;
1060 return E_OK;
1061 }
1062
CheckIfKeyExisted(const Key & key,bool isLocal,Value & value,Timestamp & timestamp) const1063 bool SQLiteSingleVerStorageExecutor::CheckIfKeyExisted(const Key &key, bool isLocal,
1064 Value &value, Timestamp ×tamp) const
1065 {
1066 // not local value, no need to get the value.
1067 if (!isLocal) {
1068 return false;
1069 }
1070
1071 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1072 if (errCode != E_OK) {
1073 return false;
1074 }
1075 return true;
1076 }
1077
GetDeviceIdentifier(PragmaEntryDeviceIdentifier * identifier)1078 int SQLiteSingleVerStorageExecutor::GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier)
1079 {
1080 if (identifier == nullptr) {
1081 return -E_INVALID_ARGS;
1082 }
1083
1084 if (dbHandle_ == nullptr) {
1085 return -E_INVALID_DB;
1086 }
1087
1088 sqlite3_stmt *statement = nullptr;
1089 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ENTRY_DEVICE, statement);
1090 if (errCode != E_OK) {
1091 return errCode;
1092 }
1093
1094 int keyIndex = identifier->origDevice ? BIND_ORI_DEVICE_ID : BIND_PRE_DEVICE_ID;
1095 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, identifier->key, false);
1096 if (errCode != E_OK) {
1097 goto END;
1098 }
1099
1100 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1101 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1102 std::vector<uint8_t> deviceId;
1103 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, deviceId);
1104 identifier->deviceIdentifier.assign(deviceId.begin(), deviceId.end());
1105 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1106 errCode = -E_NOT_FOUND;
1107 }
1108
1109 END:
1110 SQLiteUtils::ResetStatement(statement, true, errCode);
1111 return CheckCorruptedStatus(errCode);
1112 }
1113
PutIntoCommittedData(const DataItem & itemPut,const DataItem & itemGet,const DataOperStatus & status,SingleVerNaturalStoreCommitNotifyData * committedData)1114 void SQLiteSingleVerStorageExecutor::PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet,
1115 const DataOperStatus &status, SingleVerNaturalStoreCommitNotifyData *committedData)
1116 {
1117 if (committedData == nullptr) {
1118 return;
1119 }
1120
1121 Entry entry;
1122 int errCode;
1123 if (!status.isDeleted) {
1124 entry.key = itemPut.key;
1125 entry.value = itemPut.value;
1126 DataType dataType = (status.preStatus == DataStatus::EXISTED) ? DataType::UPDATE : DataType::INSERT;
1127 errCode = committedData->InsertCommittedData(std::move(entry), dataType, true);
1128 } else {
1129 entry.key = itemGet.key;
1130 entry.value = itemGet.value;
1131 errCode = committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1132 }
1133
1134 if (errCode != E_OK) {
1135 LOGE("[SingleVerExe][PutCommitData]Insert failed:%d", errCode);
1136 }
1137 }
1138
PrepareForSavingData(const std::string & readSql,const std::string & insertSql,const std::string & updateSql,SaveRecordStatements & statements) const1139 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
1140 const std::string &updateSql, SaveRecordStatements &statements) const
1141 {
1142 int errCode = SQLiteUtils::GetStatement(dbHandle_, readSql, statements.queryStatement);
1143 if (errCode != E_OK) {
1144 LOGE("Get query statement failed. errCode = [%d]", errCode);
1145 goto ERR;
1146 }
1147
1148 errCode = SQLiteUtils::GetStatement(dbHandle_, insertSql, statements.insertStatement);
1149 if (errCode != E_OK) {
1150 LOGE("Get insert statement failed. errCode = [%d]", errCode);
1151 goto ERR;
1152 }
1153
1154 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, statements.updateStatement);
1155 if (errCode != E_OK) {
1156 LOGE("Get update statement failed. errCode = [%d]", errCode);
1157 goto ERR;
1158 }
1159 return E_OK;
1160 ERR:
1161 (void)statements.ResetStatement();
1162 return errCode;
1163 }
1164
PrepareForSavingData(SingleVerDataType type)1165 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(SingleVerDataType type)
1166 {
1167 int errCode = -E_NOT_SUPPORT;
1168 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1169 // currently, Local type has not been optimized, so pass updateSql parameter with INSERT_LOCAL_SQL
1170 errCode = PrepareForSavingData(SELECT_LOCAL_HASH_SQL, INSERT_LOCAL_SQL, INSERT_LOCAL_SQL, saveLocalStatements_);
1171 } else if (type == SingleVerDataType::SYNC_TYPE) {
1172 errCode = PrepareForSavingData(SELECT_SYNC_HASH_SQL, INSERT_SYNC_SQL, UPDATE_SYNC_SQL, saveSyncStatements_);
1173 }
1174 return CheckCorruptedStatus(errCode);
1175 }
1176
ResetForSavingData(SingleVerDataType type)1177 int SQLiteSingleVerStorageExecutor::ResetForSavingData(SingleVerDataType type)
1178 {
1179 int errCode = E_OK;
1180 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1181 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
1182 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
1183 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
1184 } else if (type == SingleVerDataType::SYNC_TYPE) {
1185 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1186 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1187 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1188 }
1189 return CheckCorruptedStatus(errCode);
1190 }
1191
GetOriginDevName(const DataItem & dataItem,const std::string & origDevGet)1192 std::string SQLiteSingleVerStorageExecutor::GetOriginDevName(const DataItem &dataItem,
1193 const std::string &origDevGet)
1194 {
1195 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
1196 return origDevGet;
1197 }
1198 return dataItem.origDev;
1199 }
1200
SaveSyncDataToDatabase(const DataItem & dataItem,const Key & hashKey,const std::string & origDev,const std::string & deviceName,bool isUpdate)1201 int SQLiteSingleVerStorageExecutor::SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey,
1202 const std::string &origDev, const std::string &deviceName, bool isUpdate)
1203 {
1204 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1205 LOGD("Find query data missing, erase local data.");
1206 return EraseSyncData(hashKey);
1207 }
1208 auto statement = saveSyncStatements_.GetDataSaveStatement(isUpdate);
1209 if (statement == nullptr) {
1210 return -E_INVALID_ARGS;
1211 }
1212
1213 std::string devName = DBCommon::TransferHashString(deviceName);
1214 int errCode = BindSavedSyncData(statement, dataItem, hashKey, {origDev, devName}, isUpdate);
1215 if (errCode != E_OK) {
1216 return errCode;
1217 }
1218
1219 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1220 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1221 errCode = E_OK;
1222 }
1223 if (errCode == E_OK) {
1224 errCode = RemoveCloudUploadFlag(hashKey);
1225 }
1226 return errCode;
1227 }
1228
JudgeSyncSaveType(DataItem & dataItem,const DataItem & itemGet,const DeviceInfo & deviceInfo,bool isHashKeyExisted,bool isPermitForceWrite)1229 DataOperStatus SQLiteSingleVerStorageExecutor::JudgeSyncSaveType(DataItem &dataItem,
1230 const DataItem &itemGet, const DeviceInfo &deviceInfo, bool isHashKeyExisted, bool isPermitForceWrite)
1231 {
1232 DataOperStatus status;
1233 status.isDeleted = ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1234 (dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY);
1235 if (isHashKeyExisted) {
1236 if ((itemGet.flag & DataItem::DELETE_FLAG) != 0) {
1237 status.preStatus = DataStatus::DELETED;
1238 } else {
1239 status.preStatus = DataStatus::EXISTED;
1240 }
1241 std::string deviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1242 if (itemGet.writeTimestamp >= dataItem.writeTimestamp) {
1243 // for multi user mode, no permit to forcewrite
1244 if (((!deviceName.empty()) && IsFromDataOwner(itemGet, deviceName) && isPermitForceWrite) ||
1245 deviceInfo.isLocal) {
1246 LOGI("Force overwrite the data:%" PRIu64 " vs %" PRIu64 " isLocal %d",
1247 itemGet.writeTimestamp, dataItem.writeTimestamp, static_cast<int>(deviceInfo.isLocal));
1248 status.isDefeated = false;
1249 dataItem.writeTimestamp = itemGet.writeTimestamp + 1;
1250 dataItem.timestamp = itemGet.timestamp;
1251 } else {
1252 status.isDefeated = true;
1253 }
1254 }
1255 }
1256 return status;
1257 }
1258
GetSyncDataItemExt(const DataItem & dataItem,DataItem & itemGet,const DataOperStatus & dataStatus) const1259 int SQLiteSingleVerStorageExecutor::GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet,
1260 const DataOperStatus &dataStatus) const
1261 {
1262 if (dataStatus.preStatus != DataStatus::EXISTED) {
1263 return E_OK;
1264 }
1265 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1266 // only deleted item need origin value.
1267 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1268 if (errCode != E_OK) {
1269 return errCode;
1270 }
1271
1272 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, itemGet.value);
1273 if (errCode != E_OK) {
1274 LOGE("Get column value data failed:%d", errCode);
1275 }
1276
1277 return errCode;
1278 }
1279
ResetSaveSyncStatements(int errCode)1280 int SQLiteSingleVerStorageExecutor::ResetSaveSyncStatements(int errCode)
1281 {
1282 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1283 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1284 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1285 return CheckCorruptedStatus(errCode);
1286 }
1287
1288 namespace {
IsNeedIgnoredData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & devInfo,bool isHashKeyExisted,int policy)1289 inline bool IsNeedIgnoredData(const DataItem &itemPut, const DataItem &itemGet,
1290 const DeviceInfo &devInfo, bool isHashKeyExisted, int policy)
1291 {
1292 // deny the data synced from other dev which the origin dev is current or the existed value is current dev data.
1293 return (((itemGet.origDev.empty() && isHashKeyExisted) || itemPut.origDev.empty()) &&
1294 (!devInfo.isLocal && policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA));
1295 }
1296 }
1297
PrepareForNotifyConflictAndObserver(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)1298 int SQLiteSingleVerStorageExecutor::PrepareForNotifyConflictAndObserver(DataItem &dataItem,
1299 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
1300 {
1301 // Check sava data existed info
1302 int errCode = GetSyncDataItemPre(dataItem, notify.getData, notify.hashKey);
1303 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1304 LOGD("[SingleVerExe][PrepareForNotifyConflictAndObserver] failed:%d", errCode);
1305 if (isSyncMigrating_) {
1306 ResetForMigrateCacheData();
1307 return errCode;
1308 }
1309 return ResetSaveSyncStatements(errCode);
1310 }
1311
1312 bool isHashKeyExisted = (errCode != -E_NOT_FOUND);
1313 if (IsNeedIgnoredData(dataItem, notify.getData, deviceInfo, isHashKeyExisted, conflictResolvePolicy_)) {
1314 LOGD("[SingleVerExe] Ignore the sync data.");
1315 if (isSyncMigrating_) {
1316 ResetForMigrateCacheData();
1317 return -E_IGNORE_DATA;
1318 }
1319 return ResetSaveSyncStatements(-E_IGNORE_DATA);
1320 }
1321
1322 notify.dataStatus = JudgeSyncSaveType(dataItem, notify.getData, deviceInfo, isHashKeyExisted,
1323 isPermitForceWrite);
1324 InitCommitNotifyDataKeyStatus(notify.committedData, notify.hashKey, notify.dataStatus);
1325
1326 // Nonexistent data, but deleted by local.
1327 if ((notify.dataStatus.preStatus == DataStatus::DELETED || notify.dataStatus.preStatus == DataStatus::NOEXISTED) &&
1328 (dataItem.flag & DataItem::DELETE_FLAG) != 0 &&
1329 (dataItem.flag & DataItem::LOCAL_FLAG) != 0) {
1330 // For delete item in cacheDB, which not in mainDB. Cannot notify, but this is not error.
1331 errCode = -E_NOT_FOUND;
1332 LOGD("Nonexistent data, but deleted by local");
1333 if (isSyncMigrating_) {
1334 ResetForMigrateCacheData();
1335 return errCode;
1336 }
1337 return ResetSaveSyncStatements(errCode);
1338 }
1339
1340 // get key and value from ori database
1341 errCode = GetSyncDataItemExt(dataItem, notify.getData, notify.dataStatus);
1342 if (errCode != E_OK) {
1343 LOGD("GetSyncDataItemExt failed:%d", errCode);
1344 if (isSyncMigrating_) {
1345 ResetForMigrateCacheData();
1346 return errCode;
1347 }
1348 return ResetSaveSyncStatements(errCode);
1349 }
1350
1351 return E_OK;
1352 }
1353
SaveSyncDataItem(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,SingleVerNaturalStoreCommitNotifyData * committedData,bool isPermitForceWrite)1354 int SQLiteSingleVerStorageExecutor::SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
1355 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite)
1356 {
1357 NotifyConflictAndObserverData notify = {
1358 .committedData = committedData
1359 };
1360
1361 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
1362 if (errCode != E_OK) {
1363 if (errCode == -E_IGNORE_DATA) {
1364 errCode = E_OK;
1365 }
1366 return errCode;
1367 }
1368
1369 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, committedData);
1370 if (notify.dataStatus.isDefeated) {
1371 LOGE("Data status is defeated:%d", errCode);
1372 return ResetSaveSyncStatements(errCode);
1373 }
1374
1375 bool isUpdate = (notify.dataStatus.preStatus != DataStatus::NOEXISTED);
1376 std::string origDev = GetOriginDevName(dataItem, notify.getData.origDev);
1377 errCode = SaveSyncDataToDatabase(dataItem, notify.hashKey, origDev, deviceInfo.deviceName, isUpdate);
1378 if (errCode == E_OK) {
1379 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, committedData);
1380 maxStamp = std::max(dataItem.timestamp, maxStamp);
1381 } else {
1382 LOGE("Save sync data to db failed:%d", errCode);
1383 }
1384 return ResetSaveSyncStatements(errCode);
1385 }
1386
GetAllMetaKeys(std::vector<Key> & keys) const1387 int SQLiteSingleVerStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
1388 {
1389 sqlite3_stmt *statement = nullptr;
1390 const std::string &sqlStr = (attachMetaMode_ ? SELECT_ATTACH_ALL_META_KEYS : SELECT_ALL_META_KEYS);
1391 int errCode = SQLiteUtils::GetStatement(dbHandle_, sqlStr, statement);
1392 if (errCode != E_OK) {
1393 LOGE("[SingleVerExe][GetAllKey] Get statement failed:%d", errCode);
1394 return errCode;
1395 }
1396
1397 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
1398 SQLiteUtils::ResetStatement(statement, true, errCode);
1399 return errCode;
1400 }
1401
GetAllSyncedEntries(const std::string & hashDev,std::vector<Entry> & entries) const1402 int SQLiteSingleVerStorageExecutor::GetAllSyncedEntries(const std::string &hashDev,
1403 std::vector<Entry> &entries) const
1404 {
1405 int errCode = E_OK;
1406 sqlite3_stmt *statement = nullptr;
1407 if (hashDev.empty()) {
1408 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1409 SELECT_ALL_SYNC_ENTRIES_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES);
1410 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1411 if (errCode != E_OK) {
1412 LOGE("Get all entries statement failed:%d", errCode);
1413 return errCode;
1414 }
1415 } else {
1416 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1417 SELECT_ALL_SYNC_ENTRIES_BY_DEV_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES_BY_DEV);
1418 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1419 if (errCode != E_OK) {
1420 LOGE("Get all entries statement failed:%d", errCode);
1421 return errCode;
1422 }
1423
1424 // deviceName always hash string
1425 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
1426 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, devVect, true); // bind the 1st to device.
1427 if (errCode != E_OK) {
1428 LOGE("Failed to bind the synced device for all entries:%d", errCode);
1429 goto END;
1430 }
1431 }
1432
1433 errCode = GetAllEntries(statement, entries);
1434 END:
1435 SQLiteUtils::ResetStatement(statement, true, errCode);
1436 return errCode;
1437 }
1438
GetAllEntries(sqlite3_stmt * statement,std::vector<Entry> & entries) const1439 int SQLiteSingleVerStorageExecutor::GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const
1440 {
1441 if (statement == nullptr) {
1442 return -E_INVALID_DB;
1443 }
1444 int errCode;
1445 do {
1446 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1447 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1448 Entry entry;
1449 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key); // No.0 is the key
1450 if (errCode != E_OK) {
1451 break;
1452 }
1453 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value); // No.1 is the value
1454 if (errCode != E_OK) {
1455 break;
1456 }
1457
1458 entries.push_back(std::move(entry));
1459 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1460 errCode = E_OK;
1461 break;
1462 } else {
1463 LOGE("SQLite step for all entries failed:%d", errCode);
1464 break;
1465 }
1466 } while (true);
1467
1468 return errCode;
1469 }
1470
BindSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,const SyncDataDevices & devices,bool isUpdate)1471 int SQLiteSingleVerStorageExecutor::BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1472 const Key &hashKey, const SyncDataDevices &devices, bool isUpdate)
1473 {
1474 const int hashKeyIndex = isUpdate ? BIND_SYNC_UPDATE_HASH_KEY_INDEX : BIND_SYNC_HASH_KEY_INDEX;
1475 int errCode = SQLiteUtils::BindBlobToStatement(statement, hashKeyIndex, hashKey, false);
1476 if (errCode != E_OK) {
1477 LOGE("Bind saved sync data hash key failed:%d", errCode);
1478 return errCode;
1479 }
1480
1481 // if delete flag is set, just use the hash key instead of the key
1482 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1483 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_SYNC_KEY_INDEX, -1));
1484 } else {
1485 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_KEY_INDEX, dataItem.key, false);
1486 }
1487
1488 if (errCode != E_OK) {
1489 LOGE("Bind saved sync data key failed:%d", errCode);
1490 return errCode;
1491 }
1492
1493 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_VAL_INDEX, dataItem.value, true);
1494 if (errCode != E_OK) {
1495 LOGE("Bind saved sync data value failed:%d", errCode);
1496 return errCode;
1497 }
1498
1499 errCode = BindSyncDataTime(statement, dataItem, isUpdate);
1500 if (errCode != E_OK) {
1501 return errCode;
1502 }
1503 return BindDevForSavedSyncData(statement, dataItem, devices.origDev, devices.dev);
1504 }
1505
PutConflictData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & deviceInfo,const DataOperStatus & dataStatus,SingleVerNaturalStoreCommitNotifyData * commitData)1506 void SQLiteSingleVerStorageExecutor::PutConflictData(const DataItem &itemPut, const DataItem &itemGet,
1507 const DeviceInfo &deviceInfo, const DataOperStatus &dataStatus,
1508 SingleVerNaturalStoreCommitNotifyData *commitData)
1509 {
1510 if (commitData == nullptr) {
1511 return;
1512 }
1513
1514 bool conflictNotifyMatch = commitData->IsConflictedNotifyMatched(itemPut, itemGet);
1515 if (!conflictNotifyMatch) {
1516 return;
1517 }
1518
1519 if (dataStatus.preStatus == DataStatus::NOEXISTED ||
1520 ((dataStatus.preStatus == DataStatus::DELETED) && dataStatus.isDeleted)) {
1521 return;
1522 }
1523
1524 Key origKey;
1525 if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1526 (itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1527 origKey = itemGet.key;
1528 } else {
1529 origKey = itemPut.key;
1530 }
1531
1532 // insert db original entry
1533 std::vector<uint8_t> getDevVect(itemGet.dev.begin(), itemGet.dev.end());
1534 DataItemInfo orgItemInfo = {itemGet, true, getDevVect};
1535 orgItemInfo.dataItem.key = origKey;
1536 commitData->InsertConflictedItem(orgItemInfo, true);
1537
1538 // insert conflict entry
1539 std::string putDeviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1540 std::vector<uint8_t> putDevVect(putDeviceName.begin(), putDeviceName.end());
1541
1542 DataItemInfo newItemInfo = {itemPut, deviceInfo.isLocal, putDevVect};
1543 newItemInfo.dataItem.key = origKey;
1544 commitData->InsertConflictedItem(newItemInfo, false);
1545 }
1546
Reset()1547 int SQLiteSingleVerStorageExecutor::Reset()
1548 {
1549 if (isTransactionOpen_) {
1550 Rollback();
1551 }
1552
1553 int errCode = ResetForSavingData(SingleVerDataType::SYNC_TYPE);
1554 if (errCode != E_OK) {
1555 LOGE("Finalize the sync resources for saving sync data failed: %d", errCode);
1556 }
1557
1558 errCode = ResetForSavingData(SingleVerDataType::LOCAL_TYPE_SQLITE);
1559 if (errCode != E_OK) {
1560 LOGE("Finalize the local resources for saving sync data failed: %d", errCode);
1561 }
1562 return SQLiteStorageExecutor::Reset();
1563 }
1564
GetSyncDataItemPre(const DataItem & itemPut,DataItem & itemGet,Key & hashKey) const1565 int SQLiteSingleVerStorageExecutor::GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet,
1566 Key &hashKey) const
1567 {
1568 if (isSyncMigrating_) {
1569 hashKey = itemPut.hashKey;
1570 } else if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1571 ((itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY)) {
1572 hashKey = itemPut.key;
1573 } else {
1574 int errCode = DBCommon::CalcValueHash(itemPut.key, hashKey);
1575 if (errCode != E_OK) {
1576 return errCode;
1577 }
1578 }
1579
1580 return GetSyncDataPreByHashKey(hashKey, itemGet);
1581 }
1582
GetSyncDataPreByHashKey(const Key & hashKey,DataItem & itemGet) const1583 int SQLiteSingleVerStorageExecutor::GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const
1584 {
1585 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1586 int errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // 1st arg.
1587 if (errCode != E_OK) {
1588 return errCode;
1589 }
1590
1591 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1592 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // no find the key
1593 errCode = -E_NOT_FOUND;
1594 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1595 itemGet.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1596 itemGet.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1597 itemGet.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1598 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1599 if (errCode != E_OK) {
1600 return errCode;
1601 }
1602 std::vector<uint8_t> devVect;
1603 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1604 if (errCode != E_OK) {
1605 return errCode;
1606 }
1607
1608 std::vector<uint8_t> origDevVect;
1609 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
1610 if (errCode != E_OK) {
1611 return errCode;
1612 }
1613 itemGet.dev.assign(devVect.begin(), devVect.end());
1614 itemGet.origDev.assign(origDevVect.begin(), origDevVect.end());
1615 }
1616 return errCode;
1617 }
1618
DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData * committedData,const Key & key,const Value & value)1619 int SQLiteSingleVerStorageExecutor::DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
1620 const Key &key, const Value &value)
1621 {
1622 if (committedData != nullptr) {
1623 Key hashKey;
1624 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
1625 if (innerErrCode != E_OK) {
1626 return innerErrCode;
1627 }
1628 committedData->InitKeyPropRecord(hashKey, ExistStatus::EXIST);
1629 }
1630
1631 std::string sql = DELETE_LOCAL_SQL;
1632 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
1633 sql = DELETE_LOCAL_SQL_FROM_CACHEHANDLE;
1634 }
1635 sqlite3_stmt *statement = nullptr;
1636 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1637 if (errCode != E_OK) {
1638 goto ERROR;
1639 }
1640
1641 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);
1642 if (errCode != E_OK) {
1643 LOGE("Bind the key error(%d) when delete kv data.", errCode);
1644 goto ERROR;
1645 }
1646
1647 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1648 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1649 if (sqlite3_changes(dbHandle_) > 0) {
1650 if (committedData != nullptr) {
1651 Entry entry = {key, value};
1652 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1653 } else {
1654 LOGE("DeleteLocalKvData failed to do commit notify because of OOM.");
1655 }
1656 errCode = E_OK;
1657 }
1658 }
1659
1660 ERROR:
1661 SQLiteUtils::ResetStatement(statement, true, errCode);
1662 return CheckCorruptedStatus(errCode);
1663 }
1664
DeleteLocalKvData(const Key & key,SingleVerNaturalStoreCommitNotifyData * committedData,Value & value,Timestamp & timestamp)1665 int SQLiteSingleVerStorageExecutor::DeleteLocalKvData(const Key &key,
1666 SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, Timestamp ×tamp)
1667 {
1668 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1669 if (errCode != E_OK) {
1670 return CheckCorruptedStatus(errCode);
1671 }
1672
1673 return DeleteLocalDataInner(committedData, key, value);
1674 }
1675
EraseSyncData(const Key & hashKey)1676 int SQLiteSingleVerStorageExecutor::EraseSyncData(const Key &hashKey)
1677 {
1678 sqlite3_stmt *stmt = nullptr;
1679 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
1680 DELETE_SYNC_DATA_WITH_HASHKEY_FROM_CACHEHANDLE : DELETE_SYNC_DATA_WITH_HASHKEY;
1681 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1682 if (errCode != E_OK) {
1683 LOGE("get erase statement failed:%d", errCode);
1684 return errCode;
1685 }
1686
1687 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey, false);
1688 if (errCode != E_OK) {
1689 LOGE("bind hashKey failed:%d", errCode);
1690 goto END;
1691 }
1692
1693 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1694 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1695 errCode = E_OK;
1696 } else {
1697 LOGE("erase data failed:%d", errCode);
1698 }
1699 END:
1700 SQLiteUtils::ResetStatement(stmt, true, errCode);
1701 return CheckCorruptedStatus(errCode);
1702 }
1703
RemoveDeviceData(const std::string & deviceName)1704 int SQLiteSingleVerStorageExecutor::RemoveDeviceData(const std::string &deviceName)
1705 {
1706 int errCode = E_OK;
1707 bool isCreate = false;
1708 int ret = SQLiteUtils::CheckTableExists(dbHandle_, NATURALBASE_KV_AUX_SYNC_DATA_LOG_TABLE_NAME, isCreate);
1709 bool isTableExists = (ret == E_OK && isCreate);
1710 if (deviceName.empty()) {
1711 if (isTableExists) {
1712 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_ALL_LOG_DATA_SQL, "", "");
1713 }
1714 errCode = CloudExcuteRemoveOrUpdate(REMOVE_ALL_DEV_DATA_SQL, "", "");
1715 } else {
1716 if (isTableExists) {
1717 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_LOG_DATA_BY_DEVID_SQL, deviceName, "");
1718 }
1719 errCode = CloudExcuteRemoveOrUpdate(REMOVE_DEV_DATA_SQL, deviceName, "");
1720 }
1721 return CheckCorruptedStatus(errCode);
1722 }
1723
StepForResultEntries(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries) const1724 int SQLiteSingleVerStorageExecutor::StepForResultEntries(bool isGetValue, sqlite3_stmt *statement,
1725 std::vector<Entry> &entries) const
1726 {
1727 entries.clear();
1728 entries.shrink_to_fit();
1729 int errCode = E_OK;
1730 do {
1731 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1732 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1733 errCode = GetEntryFromStatement(isGetValue, statement, entries);
1734 if (errCode != E_OK) {
1735 return errCode;
1736 }
1737 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1738 errCode = E_OK;
1739 break;
1740 } else {
1741 LOGE("SQLite step failed:%d", errCode);
1742 return errCode;
1743 }
1744 } while (true);
1745
1746 // if select no result, return the -E_NOT_FOUND.
1747 if (entries.empty()) {
1748 errCode = -E_NOT_FOUND;
1749 }
1750
1751 return errCode;
1752 }
1753
BindDevForSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const std::string & origDev,const std::string & deviceName)1754 int SQLiteSingleVerStorageExecutor::BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1755 const std::string &origDev, const std::string &deviceName)
1756 {
1757 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_FLAG_INDEX,
1758 static_cast<int64_t>(dataItem.flag));
1759 if (errCode != E_OK) {
1760 LOGE("Bind saved sync data flag failed:%d", errCode);
1761 return errCode;
1762 }
1763
1764 std::vector<uint8_t> devVect(deviceName.begin(), deviceName.end());
1765 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_DEV_INDEX, devVect, true);
1766 if (errCode != E_OK) {
1767 LOGE("Bind dev for sync data failed:%d", errCode);
1768 return errCode;
1769 }
1770
1771 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
1772 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_ORI_DEV_INDEX, origDevVect, true);
1773 if (errCode != E_OK) {
1774 LOGE("Bind orig dev for sync data failed:%d", errCode);
1775 }
1776
1777 return errCode;
1778 }
1779
GetDataItemSerialSize(const DataItem & item,size_t appendLen)1780 size_t SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(const DataItem &item, size_t appendLen)
1781 {
1782 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
1783 // the size would not be very large.
1784 static const size_t maxOrigDevLength = 40;
1785 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
1786 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
1787 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
1788 return dataSize;
1789 }
1790
InitResultSet(const Key & keyPrefix,sqlite3_stmt * & countStmt)1791 int SQLiteSingleVerStorageExecutor::InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt)
1792 {
1793 if (dbHandle_ == nullptr) {
1794 return -E_INVALID_DB;
1795 }
1796 // bind statement for count
1797 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_COUNT_SYNC_PREFIX_SQL, countStmt);
1798 if (errCode != E_OK) {
1799 LOGE("Get count statement for resultset error:%d", errCode);
1800 return errCode;
1801 }
1802
1803 errCode = SQLiteUtils::BindPrefixKey(countStmt, 1, keyPrefix); // first argument is key
1804 if (errCode != E_OK) {
1805 LOGE("Bind count key error:%d", errCode);
1806 goto ERROR;
1807 }
1808 // bind statement for result set
1809 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
1810 if (errCode != E_OK) {
1811 LOGE("Get result set rowid statement error:%d", errCode);
1812 goto ERROR;
1813 }
1814
1815 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1816 if (errCode != E_OK) {
1817 LOGE("Get result set entry statement error:%d", errCode);
1818 goto ERROR;
1819 }
1820
1821 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
1822 if (errCode != E_OK) {
1823 LOGE("Bind result set rowid statement error:%d", errCode);
1824 goto ERROR;
1825 }
1826 return E_OK;
1827
1828 ERROR:
1829 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1830 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1831 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1832 return CheckCorruptedStatus(errCode);
1833 }
1834
InitResultSetCount(QueryObject & queryObj,sqlite3_stmt * & countStmt)1835 int SQLiteSingleVerStorageExecutor::InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1836 {
1837 if (dbHandle_ == nullptr) {
1838 return -E_INVALID_DB;
1839 }
1840
1841 int errCode = E_OK;
1842 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1843 if (errCode != E_OK) {
1844 return errCode;
1845 }
1846
1847 errCode = helper.GetCountSqlStatement(dbHandle_, countStmt);
1848 if (errCode != E_OK) {
1849 LOGE("Get count bind statement error:%d", errCode);
1850 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1851 }
1852 return errCode;
1853 }
1854
InitResultSetContent(QueryObject & queryObj)1855 int SQLiteSingleVerStorageExecutor::InitResultSetContent(QueryObject &queryObj)
1856 {
1857 int errCode = E_OK;
1858 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1859 if (errCode != E_OK) {
1860 return errCode;
1861 }
1862
1863 // bind statement for result set
1864 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
1865 if (errCode != E_OK) {
1866 LOGE("[SqlSinExe][InitResSetContent] Bind result set rowid statement of query error:%d", errCode);
1867 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1868 return errCode;
1869 }
1870 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1871 if (errCode != E_OK) {
1872 LOGE("[SqlSinExe][InitResSetContent] Get result set entry statement of query error:%d", errCode);
1873 return CheckCorruptedStatus(errCode);
1874 }
1875 return errCode;
1876 }
1877
InitResultSet(QueryObject & queryObj,sqlite3_stmt * & countStmt)1878 int SQLiteSingleVerStorageExecutor::InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1879 {
1880 if (dbHandle_ == nullptr) {
1881 return -E_INVALID_DB;
1882 }
1883
1884 int errCode = E_OK;
1885 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1886 if (errCode != E_OK) {
1887 return errCode;
1888 }
1889
1890 if (!queryObj.IsValid()) {
1891 return -E_INVALID_QUERY_FORMAT;
1892 }
1893
1894 errCode = InitResultSetCount(queryObj, countStmt);
1895 if (errCode != E_OK) {
1896 return CheckCorruptedStatus(errCode);
1897 }
1898
1899 errCode = InitResultSetContent(queryObj);
1900 if (errCode != E_OK) {
1901 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1902 }
1903 return CheckCorruptedStatus(errCode);
1904 }
1905
UpdateLocalDataTimestamp(Timestamp timestamp)1906 int SQLiteSingleVerStorageExecutor::UpdateLocalDataTimestamp(Timestamp timestamp)
1907 {
1908 const std::string updateSql = "UPDATE local_data SET timestamp=";
1909 std::string sql = updateSql + std::to_string(timestamp) + " WHERE timestamp=0;";
1910 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1911 return CheckCorruptedStatus(errCode);
1912 }
1913
SetAttachMetaMode(bool attachMetaMode)1914 void SQLiteSingleVerStorageExecutor::SetAttachMetaMode(bool attachMetaMode)
1915 {
1916 attachMetaMode_ = attachMetaMode;
1917 }
1918
GetOneRawDataItem(sqlite3_stmt * statement,DataItem & dataItem,uint64_t & verInCurCacheDb,bool isCacheDb) const1919 int SQLiteSingleVerStorageExecutor::GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
1920 uint64_t &verInCurCacheDb, bool isCacheDb) const
1921 {
1922 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, dataItem.key);
1923 if (errCode != E_OK) {
1924 return errCode;
1925 }
1926
1927 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
1928 if (errCode != E_OK) {
1929 return errCode;
1930 }
1931
1932 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1933 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1934
1935 std::vector<uint8_t> devVect;
1936 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1937 if (errCode != E_OK) {
1938 return errCode;
1939 }
1940 dataItem.dev = std::string(devVect.begin(), devVect.end());
1941
1942 devVect.clear();
1943 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
1944 if (errCode != E_OK) {
1945 return errCode;
1946 }
1947 dataItem.origDev = std::string(devVect.begin(), devVect.end());
1948
1949 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_HASH_KEY_INDEX, dataItem.hashKey);
1950 if (errCode != E_OK) {
1951 return errCode;
1952 }
1953 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1954 if (errCode != E_OK) {
1955 return errCode;
1956 }
1957 if (isCacheDb) {
1958 verInCurCacheDb = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_VERSION_INDEX));
1959 }
1960 return E_OK;
1961 }
1962
GetAllDataItems(sqlite3_stmt * statement,std::vector<DataItem> & dataItems,uint64_t & verInCurCacheDb,bool isCacheDb) const1963 int SQLiteSingleVerStorageExecutor::GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
1964 uint64_t &verInCurCacheDb, bool isCacheDb) const
1965 {
1966 dataItems.clear();
1967 dataItems.shrink_to_fit();
1968 DataItem dataItem;
1969 int errCode;
1970 do {
1971 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1972 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1973 errCode = GetOneRawDataItem(statement, dataItem, verInCurCacheDb, isCacheDb);
1974 if (errCode != E_OK) {
1975 return errCode;
1976 }
1977 dataItems.push_back(std::move(dataItem));
1978 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1979 errCode = E_OK;
1980 break;
1981 } else {
1982 LOGE("SQLite step failed:%d", errCode);
1983 break;
1984 }
1985 } while (true);
1986
1987 return CheckCorruptedStatus(errCode);
1988 }
1989
OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)1990 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache,
1991 uint32_t cacheLimit, int &count)
1992 {
1993 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1994 if (errCode != E_OK) {
1995 LOGE("[SqlSinExe][OpenResSetRowId][Common] Get entry stmt fail, errCode=%d", errCode);
1996 return CheckCorruptedStatus(errCode);
1997 }
1998 errCode = StartTransaction(TransactType::DEFERRED);
1999 if (errCode != E_OK) {
2000 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2001 return CheckCorruptedStatus(errCode);
2002 }
2003 // Now Ready To Execute
2004 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, 0, count);
2005 if (errCode != E_OK) {
2006 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2007 Rollback();
2008 return CheckCorruptedStatus(errCode);
2009 }
2010 // Consider finalize getResultRowIdStatement_ here if count equal to size of rowIdCache.
2011 return E_OK;
2012 }
2013
ResultSetLoadRowIdCache(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos,int & count)2014 int SQLiteSingleVerStorageExecutor::ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
2015 uint32_t cacheStartPos, int &count)
2016 {
2017 rowIdCache.clear();
2018 count = 0;
2019 while (true) {
2020 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
2021 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2022 if (count >= static_cast<int>(cacheStartPos) && rowIdCache.size() < cacheLimit) {
2023 // If we can start cache, and, if we can still cache
2024 int64_t rowid = sqlite3_column_int64(getResultRowIdStatement_, 0);
2025 rowIdCache.push_back(rowid);
2026 }
2027 // Always increase the count
2028 count++;
2029 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2030 break;
2031 } else {
2032 LOGE("[SqlSinExe][ResSetLoadCache] Step fail, errCode=%d", errCode);
2033 rowIdCache.clear();
2034 count = 0;
2035 return CheckCorruptedStatus(errCode);
2036 }
2037 }
2038 return E_OK;
2039 }
2040
ResetStatement()2041 int SQLiteSingleVerStorageExecutor::SaveRecordStatements::ResetStatement()
2042 {
2043 int errCode = E_OK;
2044 SQLiteUtils::ResetStatement(insertStatement, true, errCode);
2045 if (errCode != E_OK) {
2046 LOGE("Finalize insert statements failed, error: %d", errCode);
2047 }
2048
2049 SQLiteUtils::ResetStatement(updateStatement, true, errCode);
2050 if (errCode != E_OK) {
2051 LOGE("Finalize update statements failed, error: %d", errCode);
2052 }
2053
2054 SQLiteUtils::ResetStatement(queryStatement, true, errCode);
2055 if (errCode != E_OK) {
2056 LOGE("Finalize query statement failed, error: %d", errCode);
2057 }
2058 return errCode;
2059 }
2060
FinalizeAllStatements()2061 void SQLiteSingleVerStorageExecutor::FinalizeAllStatements()
2062 {
2063 int errCode = saveLocalStatements_.ResetStatement();
2064 if (errCode != E_OK) {
2065 LOGE("Finalize saveLocal statements failed, error: %d", errCode);
2066 }
2067
2068 errCode = saveSyncStatements_.ResetStatement();
2069 if (errCode != E_OK) {
2070 LOGE("Finalize saveSync statement failed, error: %d", errCode);
2071 }
2072
2073 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
2074 if (errCode != E_OK) {
2075 LOGE("Finalize getResultRowIdStatement_ failed, error: %d", errCode);
2076 }
2077
2078 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2079 if (errCode != E_OK) {
2080 LOGE("Finalize getResultEntryStatement_ failed, error: %d", errCode);
2081 }
2082
2083 errCode = migrateSyncStatements_.ResetStatement();
2084 if (errCode != E_OK) {
2085 LOGE("Finalize migrateSync statements failed, error: %d", errCode);
2086 }
2087
2088 ReleaseContinueStatement();
2089 }
2090
SetConflictResolvePolicy(int policy)2091 void SQLiteSingleVerStorageExecutor::SetConflictResolvePolicy(int policy)
2092 {
2093 if (policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA || policy == DEFAULT_LAST_WIN) {
2094 conflictResolvePolicy_ = policy;
2095 }
2096 }
2097
CheckIntegrity() const2098 int SQLiteSingleVerStorageExecutor::CheckIntegrity() const
2099 {
2100 if (dbHandle_ == nullptr) {
2101 return -E_INVALID_DB;
2102 }
2103
2104 return SQLiteUtils::CheckIntegrity(dbHandle_, CHECK_DB_INTEGRITY_SQL);
2105 }
2106
ForceCheckPoint() const2107 int SQLiteSingleVerStorageExecutor::ForceCheckPoint() const
2108 {
2109 if (dbHandle_ == nullptr) {
2110 return -E_INVALID_DB;
2111 }
2112 SQLiteUtils::ExecuteCheckPoint(dbHandle_);
2113 return E_OK;
2114 }
2115
GetLogFileSize() const2116 uint64_t SQLiteSingleVerStorageExecutor::GetLogFileSize() const
2117 {
2118 if (isMemDb_) {
2119 return 0;
2120 }
2121
2122 const char *fileName = sqlite3_db_filename(dbHandle_, "main");
2123 if (fileName == nullptr) {
2124 return 0;
2125 }
2126 std::string walName = std::string(fileName) + "-wal";
2127 uint64_t fileSize = 0;
2128 int result = OS::CalFileSize(std::string(walName), fileSize);
2129 if (result != E_OK) {
2130 return 0;
2131 }
2132 return fileSize;
2133 }
2134
GetExistsDevicesFromMeta(std::set<std::string> & devices)2135 int SQLiteSingleVerStorageExecutor::GetExistsDevicesFromMeta(std::set<std::string> &devices)
2136 {
2137 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_,
2138 attachMetaMode_ ? SqliteMetaExecutor::MetaMode::KV_ATTACH : SqliteMetaExecutor::MetaMode::KV,
2139 isMemDb_, devices);
2140 }
2141
UpdateKey(const UpdateKeyCallback & callback)2142 int SQLiteSingleVerStorageExecutor::UpdateKey(const UpdateKeyCallback &callback)
2143 {
2144 if (dbHandle_ == nullptr) {
2145 return -E_INVALID_DB;
2146 }
2147 UpdateContext context;
2148 context.callback = callback;
2149 int errCode = CreateFuncUpdateKey(context, &Translate, &CalHashKey);
2150 if (errCode != E_OK) {
2151 return errCode;
2152 }
2153 int executeErrCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, UPDATE_SYNC_DATA_KEY_SQL);
2154 context.callback = nullptr;
2155 errCode = CreateFuncUpdateKey(context, nullptr, nullptr);
2156 if (context.errCode != E_OK) {
2157 return context.errCode;
2158 }
2159 if (executeErrCode != E_OK) {
2160 return executeErrCode;
2161 }
2162 if (errCode != E_OK) {
2163 return errCode;
2164 }
2165 return E_OK;
2166 }
2167
CreateFuncUpdateKey(UpdateContext & context,void (* translateFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv),void (* calHashFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const2168 int SQLiteSingleVerStorageExecutor::CreateFuncUpdateKey(UpdateContext &context,
2169 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
2170 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
2171 {
2172 int errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_TRANSLATE_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2173 &context, translateFunc, nullptr, nullptr, nullptr);
2174 if (errCode != SQLITE_OK) {
2175 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2176 return SQLiteUtils::MapSQLiteErrno(errCode);
2177 }
2178 errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_CAL_HASH_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2179 &context, calHashFunc, nullptr, nullptr, nullptr);
2180 if (errCode != SQLITE_OK) {
2181 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2182 return SQLiteUtils::MapSQLiteErrno(errCode);
2183 }
2184 return E_OK;
2185 }
2186
Translate(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2187 void SQLiteSingleVerStorageExecutor::Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2188 {
2189 if (ctx == nullptr || argc != 1 || argv == nullptr) { // i parameters, which are key
2190 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2191 return;
2192 }
2193 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2194 auto keyBlob = static_cast<const uint8_t *>(sqlite3_value_blob(argv[0]));
2195 int keyBlobLen = sqlite3_value_bytes(argv[0]);
2196 Key oldKey;
2197 if (keyBlob != nullptr && keyBlobLen > 0) {
2198 oldKey = Key(keyBlob, keyBlob + keyBlobLen);
2199 }
2200 Key newKey;
2201 context->callback(oldKey, newKey);
2202 if (newKey.size() >= DBConstant::MAX_KEY_SIZE || newKey.empty()) {
2203 LOGE("[SqlSinExe][Translate] invalid key len=%zu", newKey.size());
2204 context->errCode = -E_INVALID_ARGS;
2205 sqlite3_result_error(ctx, "Update key is invalid", -1);
2206 return;
2207 }
2208 context->newKey = newKey;
2209 sqlite3_result_blob(ctx, newKey.data(), static_cast<int>(newKey.size()), SQLITE_TRANSIENT);
2210 }
2211
CalHashKey(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2212 void SQLiteSingleVerStorageExecutor::CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2213 {
2214 if (ctx == nullptr || argc != 1 || argv == nullptr) {
2215 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2216 return;
2217 }
2218 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2219 Key hashKey;
2220 DBCommon::CalcValueHash(context->newKey, hashKey);
2221 sqlite3_result_blob(ctx, hashKey.data(), static_cast<int>(hashKey.size()), SQLITE_TRANSIENT);
2222 }
2223
BindSyncDataTime(sqlite3_stmt * statement,const DataItem & dataItem,bool isUpdate)2224 int SQLiteSingleVerStorageExecutor::BindSyncDataTime(sqlite3_stmt *statement, const DataItem &dataItem, bool isUpdate)
2225 {
2226 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_STAMP_INDEX, dataItem.timestamp);
2227 if (errCode != E_OK) {
2228 LOGE("Bind saved sync data stamp failed:%d", errCode);
2229 return errCode;
2230 }
2231
2232 const int writeTimeIndex = isUpdate ? BIND_SYNC_UPDATE_W_TIME_INDEX : BIND_SYNC_W_TIME_INDEX;
2233 errCode = SQLiteUtils::BindInt64ToStatement(statement, writeTimeIndex, dataItem.writeTimestamp);
2234 if (errCode != E_OK) {
2235 LOGE("Bind saved sync data write stamp failed:%d", errCode);
2236 return errCode;
2237 }
2238
2239 const int modifyTimeIndex = isUpdate ? BIND_SYNC_UPDATE_MODIFY_TIME_INDEX : BIND_SYNC_MODIFY_TIME_INDEX;
2240 errCode = SQLiteUtils::BindInt64ToStatement(statement, modifyTimeIndex, dataItem.modifyTime);
2241 if (errCode != E_OK) {
2242 LOGE("Bind saved sync data modify time failed:%d", errCode);
2243 return errCode;
2244 }
2245
2246 const int createTimeIndex = isUpdate ? BIND_SYNC_UPDATE_CREATE_TIME_INDEX : BIND_SYNC_CREATE_TIME_INDEX;
2247 errCode = SQLiteUtils::BindInt64ToStatement(statement, createTimeIndex, dataItem.createTime);
2248 if (errCode != E_OK) {
2249 LOGE("Bind saved sync data create time failed:%d", errCode);
2250 return errCode;
2251 }
2252
2253 LOGI("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 " modifyTime:%" PRIu64 " createTime:%"
2254 PRIu64, dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, dataItem.modifyTime, dataItem.createTime);
2255 return errCode;
2256 }
2257
CreateCloudLogTable()2258 int SQLiteSingleVerStorageExecutor::CreateCloudLogTable()
2259 {
2260 if (dbHandle_ == nullptr) {
2261 return -E_INVALID_DB;
2262 }
2263 return SqliteLogTableManager::CreateKvSyncLogTable(dbHandle_);
2264 }
2265 } // namespace DistributedDB
2266