1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32 
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35     const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37     std::string querySql;
38     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39     std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40     int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41     if (errCode != E_OK) {
42         LOGE("Get query log sql fail, %d", errCode);
43         return errCode;
44     }
45     if (!pkSet.empty()) {
46         errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47         if (errCode != E_OK) {
48             LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49             return errCode;
50         }
51     }
52     sqlite3_stmt *selectStmt = nullptr;
53     errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54     if (errCode != E_OK) {
55         LOGE("Get query log statement fail, %d", errCode);
56         return errCode;
57     }
58 
59     bool alreadyFound = false;
60     do {
61         errCode = SQLiteUtils::StepWithRetry(selectStmt);
62         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63             if (alreadyFound) {
64                 LOGE("found more than one records in log table for one primary key or gid.");
65                 errCode = -E_CLOUD_ERROR;
66                 break;
67             }
68             alreadyFound = true;
69             std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70             errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71             if (errCode != E_OK) {
72                 LOGE("Get info by statement fail, %d", errCode);
73                 break;
74             }
75         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76             errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77             break;
78         } else {
79             LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80             break;
81         }
82     } while (errCode == E_OK);
83 
84     int ret = E_OK;
85     SQLiteUtils::ResetStatement(selectStmt, true, ret);
86     return errCode != E_OK ? errCode : ret;
87 }
88 
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91     int index = 0;
92     logInfo.dataKey = sqlite3_column_int64(statement, index++);
93     std::vector<uint8_t> device;
94     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device);    // 1 is device
95     DBCommon::VectorToString(device, logInfo.device);
96     std::vector<uint8_t> originDev;
97     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98     DBCommon::VectorToString(originDev, logInfo.originDev);
99     logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100     logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102     (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105     logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106     (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107     return index;
108 }
109 
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111     const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112     VBucket &assetInfo)
113 {
114     int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115     int errCode = E_OK;
116     for (const auto &field: assetFields) {
117         Type cloudValue;
118         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119         if (errCode != E_OK) {
120             break;
121         }
122         errCode = PutVBucketByType(assetInfo, field, cloudValue);
123         if (errCode != E_OK) {
124             break;
125         }
126     }
127     if (errCode != E_OK) {
128         LOGE("set asset field failed, errCode = %d", errCode);
129         return errCode;
130     }
131 
132     // fill primary key
133     for (const auto &item : pkMap) {
134         Type cloudValue;
135         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136         if (errCode != E_OK) {
137             break;
138         }
139         errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140         if (errCode != E_OK) {
141             break;
142         }
143     }
144     return errCode;
145 }
146 
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149     std::string sql = "insert into " + tableSchema.name + "(";
150     for (const auto &field : tableSchema.fields) {
151         sql += field.colName + ",";
152     }
153     sql.pop_back();
154     sql += ") values(";
155     for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156         sql += "?,";
157     }
158     sql.pop_back();
159     sql += ");";
160     return sql;
161 }
162 
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164     const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166     int errCode = E_OK;
167     TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168     // table name in cloud schema is in lower case
169     if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170         LOGE("localSchema doesn't contain table from cloud");
171         return -E_INTERNAL_ERROR;
172     }
173 
174     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175     if (pkMap.size() == 0) {
176         int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177         std::vector<uint8_t> value;
178         DBCommon::StringToVector(std::to_string(rowid), value);
179         errCode = DBCommon::CalcValueHash(value, hashValue);
180     } else {
181         std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182             tableSchema, localTable, pkMap, allowEmpty);
183     }
184     return errCode;
185 }
186 
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188     const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190     int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191     if (errCode != E_OK) {
192         LOGE("Get select log statement failed, %d", errCode);
193         return errCode;
194     }
195 
196     std::string cloudGid;
197     int ret = E_OK;
198     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200         SQLiteUtils::ResetStatement(selectStmt, true, ret);
201         LOGE("Get cloud gid fail when bind query log statement.");
202         return errCode;
203     }
204 
205     int index = 0;
206     if (!cloudGid.empty()) {
207         index++;
208         errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209         if (errCode != E_OK) {
210             LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212             return errCode;
213         }
214     }
215 
216     index++;
217     errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218     if (errCode != E_OK) {
219         LOGE("Bind hash key to query log statement failed. %d", errCode);
220         SQLiteUtils::ResetStatement(selectStmt, true, ret);
221     }
222     return errCode != E_OK ? errCode : ret;
223 }
224 
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226     const std::set<std::string> &pkSet, std::string &querySql)
227 {
228     std::string cloudGid;
229     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230     if (errCode != E_OK) {
231         LOGE("Get cloud gid fail when query log table.");
232         return errCode;
233     }
234 
235     if (pkSet.empty() && cloudGid.empty()) {
236         LOGE("query log table failed because of both primary key and gid are empty.");
237         return -E_CLOUD_ERROR;
238     }
239     std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240         " sharing_resource, status, version FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
241     if (!cloudGid.empty()) {
242         sql += "cloud_gid = ? OR ";
243     }
244     sql += "hash_key = ?";
245 
246     querySql = sql;
247     return E_OK;
248 }
249 
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)250 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
251     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
252     std::map<int, int> &statisticMap)
253 {
254     int index = 0;
255     int errCode = E_OK;
256     for (OpType op : downloadData.opType) {
257         VBucket &vBucket = downloadData.data[index];
258         switch (op) {
259             case OpType::INSERT:
260                 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
261                 break;
262             case OpType::UPDATE:
263                 errCode = UpdateCloudData(vBucket, tableSchema);
264                 break;
265             case OpType::DELETE:
266                 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
267                 break;
268             case OpType::ONLY_UPDATE_GID:
269             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
270             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
271             case OpType::UPDATE_TIMESTAMP:
272             case OpType::CLEAR_GID:
273             case OpType::LOCKED_NOT_HANDLE:
274                 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
275                 [[fallthrough]];
276             case OpType::NOT_HANDLE:
277                 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
278                     GetLocalDataKey(index, downloadData), op) : errCode;
279                 break;
280             default:
281                 errCode = -E_CLOUD_ERROR;
282                 break;
283         }
284         if (errCode != E_OK) {
285             LOGE("put cloud sync data fail: %d", errCode);
286             return errCode;
287         }
288         statisticMap[static_cast<int>(op)]++;
289         index++;
290     }
291     return errCode;
292 }
293 
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)294 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
295     const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
296     std::vector<Asset> &assets, std::vector<std::string> &notifyTableList)
297 {
298     int errCode = SetLogTriggerStatus(false);
299     if (errCode != E_OK) {
300         LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
301         return errCode;
302     }
303     if (mode == FLAG_ONLY) {
304         errCode = DoCleanLogs(tableNameList, localSchema);
305         if (errCode != E_OK) {
306             LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
307             return errCode;
308         }
309         notifyTableList = tableNameList;
310     } else if (mode == FLAG_AND_DATA) {
311         errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
312         if (errCode != E_OK) {
313             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
314             return errCode;
315         }
316         notifyTableList = tableNameList;
317     } else if (mode == CLEAR_SHARED_TABLE) {
318         errCode = DoCleanShareTableDataAndLog(tableNameList);
319         if (errCode != E_OK) {
320             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
321             return errCode;
322         }
323     }
324     errCode = SetLogTriggerStatus(true);
325     if (errCode != E_OK) {
326         LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
327     }
328 
329     return errCode;
330 }
331 
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)332 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
333     const RelationalSchemaObject &localSchema)
334 {
335     int errCode = E_OK;
336     int i = 1;
337     for (const auto &tableName: tableNameList) {
338         std::string logTableName = DBCommon::GetLogTableName(tableName);
339         LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
340         errCode = DoCleanAssetId(tableName, localSchema);
341         if (errCode != E_OK) {
342             LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
343             return errCode;
344         }
345         errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
346         if (errCode != E_OK) {
347             LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
348             return errCode;
349         }
350         i++;
351     }
352 
353     return errCode;
354 }
355 
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)356 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
357 {
358     if (ctx == nullptr || argc != 0 || argv == nullptr) {
359         LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
360         return;
361     }
362     auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
363     if (context == nullptr) {
364         LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
365         return;
366     }
367     context->cursor++;
368     sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
369 }
370 
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const371 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
372     void (*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
373 {
374     std::string sql = "update_cursor";
375     int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
376         &context, updateCursor, nullptr, nullptr, nullptr);
377     if (errCode != SQLITE_OK) {
378         LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
379         return SQLiteUtils::MapSQLiteErrno(errCode);
380     }
381     return E_OK;
382 }
383 
GetCursor(const std::string & tableName)384 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName)
385 {
386     int cursor = -1;
387     std::string sql = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX + "metadata where key = ?;";
388     sqlite3_stmt *stmt = nullptr;
389     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
390     if (errCode != E_OK) {
391         LOGE("[Storage Executor]get cursor failed=%d", errCode);
392         return cursor;
393     }
394     ResFinalizer finalizer([stmt]() {
395         sqlite3_stmt *statement = stmt;
396         int ret = E_OK;
397         SQLiteUtils::ResetStatement(statement, true, ret);
398         if (ret != E_OK) {
399             LOGW("Reset stmt failed %d when get cursor", ret);
400         }
401     });
402     Key key;
403     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
404     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, key, false); // first arg.
405     if (errCode != E_OK) {
406         return cursor;
407     }
408     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
409     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
410         cursor = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
411     }
412     return cursor;
413 }
414 
SetCursor(const std::string & tableName,int cursor)415 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, int cursor)
416 {
417     std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + "metadata SET VALUE = ? where KEY = ?;";
418     sqlite3_stmt *stmt = nullptr;
419     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
420     if (errCode != E_OK) {
421         LOGE("Set cursor sql failed=%d", errCode);
422         return cursor;
423     }
424     ResFinalizer finalizer([stmt]() {
425         sqlite3_stmt *statement = stmt;
426         int ret = E_OK;
427         SQLiteUtils::ResetStatement(statement, true, ret);
428         if (ret != E_OK) {
429             LOGW("Reset stmt failed %d when set cursor", ret);
430         }
431     });
432     int index = 1;
433     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
434     if (errCode != E_OK) {
435         LOGE("Bind saved cursor failed:%d", errCode);
436         return errCode;
437     }
438     Key key;
439     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
440     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
441     if (errCode != E_OK) {
442         return cursor;
443     }
444     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
445     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
446         errCode = E_OK;
447     }
448     return errCode;
449 }
450 
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)451 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
452     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
453 {
454     int errCode = E_OK;
455     for (size_t i = 0; i < tableNameList.size(); i++) {
456         std::string tableName = tableNameList[i];
457         std::string logTableName = DBCommon::GetLogTableName(tableName);
458         std::vector<int64_t> dataKeys;
459         errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
460         if (errCode != E_OK) {
461             LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
462             return errCode;
463         }
464 
465         std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
466         errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
467         if (errCode != E_OK) {
468             LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
469             return errCode;
470         }
471         if (isLogicDelete_) {
472             errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
473         } else {
474             errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
475         }
476         if (errCode != E_OK) {
477             LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
478             return errCode;
479         }
480     }
481 
482     return errCode;
483 }
484 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)485 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
486     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
487 {
488     int errCode = E_OK;
489     int ret = E_OK;
490     sqlite3_stmt *selectStmt = nullptr;
491     for (const auto &rowId : dataKeys) {
492         std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
493             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
494         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
495         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
496             LOGE("Get select asset statement failed, %d", errCode);
497             return errCode;
498         }
499         errCode = SQLiteUtils::StepWithRetry(selectStmt);
500         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
501             std::vector<uint8_t> blobValue;
502             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
503             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
504                 LOGE("Get column blob value failed, %d", errCode);
505                 goto END;
506             }
507             if (blobValue.empty()) {
508                 SQLiteUtils::ResetStatement(selectStmt, true, ret);
509                 continue;
510             }
511             Asset asset;
512             errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
513             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
514                 LOGE("Transfer blob to asset failed, %d", errCode);
515                 goto END;
516             }
517             assets.push_back(asset);
518         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
519             errCode = E_OK;
520             Asset asset;
521             assets.push_back(asset);
522         }
523         SQLiteUtils::ResetStatement(selectStmt, true, ret);
524     }
525     return errCode != E_OK ? errCode : ret;
526 END:
527     SQLiteUtils::ResetStatement(selectStmt, true, ret);
528     return errCode != E_OK ? errCode : ret;
529 }
530 
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)531 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
532     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
533 {
534     int errCode = E_OK;
535     int ret = E_OK;
536     sqlite3_stmt *selectStmt = nullptr;
537     for (const auto &rowId : dataKeys) {
538         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
539             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
540         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
541         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
542             LOGE("Get select assets statement failed, %d", errCode);
543             goto END;
544         }
545         errCode = SQLiteUtils::StepWithRetry(selectStmt);
546         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
547             std::vector<uint8_t> blobValue;
548             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
549             if (errCode != E_OK) {
550                 goto END;
551             }
552             Assets tmpAssets;
553             errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
554             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
555                 goto END;
556             }
557             for (const auto &asset: tmpAssets) {
558                 assets.push_back(asset);
559             }
560         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
561             errCode = E_OK;
562         }
563         SQLiteUtils::ResetStatement(selectStmt, true, ret);
564     }
565     return errCode != E_OK ? errCode : ret;
566 END:
567     SQLiteUtils::ResetStatement(selectStmt, true, ret);
568     return errCode != E_OK ? errCode : ret;
569 }
570 
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)571 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
572     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
573 {
574     int errCode = E_OK;
575     for (const auto &fieldInfo: fieldInfos) {
576         if (fieldInfo.IsAssetType()) {
577             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
578             if (errCode != E_OK) {
579                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
580                 return errCode;
581             }
582         } else if (fieldInfo.IsAssetsType()) {
583             errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
584             if (errCode != E_OK) {
585                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
586                 return errCode;
587             }
588         }
589     }
590     return errCode;
591 }
592 
SetCursorIncFlag(bool flag)593 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
594 {
595     std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
596         " VALUES ('cursor_inc_flag', ";
597     if (flag) {
598         sql += "'true'";
599     } else {
600         sql += "'false'";
601     }
602     sql += ");";
603     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
604     if (errCode != E_OK) {
605         LOGE("set cursor inc flag fail, errCode = %d", errCode);
606     }
607     return errCode;
608 }
609 
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)610 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
611     const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
612 {
613     if (downloadData.data.size() != downloadData.opType.size()) {
614         LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
615              downloadData.opType.size());
616         return -E_CLOUD_ERROR;
617     }
618 
619     int errCode = SetLogTriggerStatus(false);
620     if (errCode != E_OK) {
621         LOGE("Fail to set log trigger off, %d", errCode);
622         return errCode;
623     }
624 
625     std::map<int, int> statisticMap = {};
626     errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
627     int ret = SetLogTriggerStatus(true);
628     if (ret != E_OK) {
629         LOGE("Fail to set log trigger on, %d", ret);
630     }
631     LOGI("save cloud data:%d, ins:%d, upd:%d, del:%d, only gid:%d, flag zero:%d, flag one:%d, upd timestamp:%d,"
632          "clear gid:%d, not handle:%d, lock:%d",
633          errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
634          statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
635          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
636          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
637          statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
638          statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
639     return errCode == E_OK ? ret : errCode;
640 }
641 
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)642 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
643     const TrackerTable &trackerTable, int64_t dataKey)
644 {
645     int errCode = E_OK;
646     if (dataKey > 0) {
647         errCode = RemoveDataAndLog(tableSchema.name, dataKey);
648         if (errCode != E_OK) {
649             return errCode;
650         }
651     }
652     std::string sql = GetInsertSqlForCloudSync(tableSchema);
653     sqlite3_stmt *insertStmt = nullptr;
654     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
655     if (errCode != E_OK) {
656         LOGE("Get insert statement failed when save cloud data, %d", errCode);
657         return errCode;
658     }
659     if (putDataMode_ == PutDataMode::SYNC) {
660         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
661     }
662     errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
663     if (errCode != E_OK) {
664         SQLiteUtils::ResetStatement(insertStmt, true, errCode);
665         return errCode;
666     }
667     // insert data
668     errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
669     int ret = E_OK;
670     SQLiteUtils::ResetStatement(insertStmt, true, ret);
671     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
672         LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
673         return errCode;
674     }
675 
676     // insert log
677     return InsertLogRecord(tableSchema, trackerTable, vBucket);
678 }
679 
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)680 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
681     const TrackerTable &trackerTable, VBucket &vBucket)
682 {
683     if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
684         // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
685         // so we need to delete the old log record according to the gid first
686         std::string gidStr;
687         int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
688         if (errCode != E_OK || gidStr.empty()) {
689             LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
690             return errCode;
691         }
692         std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
693             + gidStr + "';";
694         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
695         if (errCode != E_OK) {
696             LOGE("delete log record according gid fail, errCode = %d", errCode);
697             return errCode;
698         }
699     }
700 
701     std::string sql = "INSERT OR REPLACE INTO " + DBCommon::GetLogTableName(tableSchema.name) +
702         " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " + "CASE WHEN (SELECT status FROM " +
703         DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
704         "(SELECT status FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) " + "END)";
705     sqlite3_stmt *insertLogStmt = nullptr;
706     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
707     if (errCode != E_OK) {
708         LOGE("Get insert log statement failed when save cloud data, %d", errCode);
709         return errCode;
710     }
711 
712     errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
713     if (errCode != E_OK) {
714         SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
715         return errCode;
716     }
717 
718     errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
719     int ret = E_OK;
720     SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
721     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
722         return ret;
723     } else {
724         LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
725         return errCode;
726     }
727 }
728 
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)729 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
730     sqlite3_stmt *updateStmt)
731 {
732     auto it = bindCloudFieldFuncMap_.find(field.type);
733     if (it == bindCloudFieldFuncMap_.end()) {
734         LOGE("unknown cloud type when bind one field.");
735         return -E_CLOUD_ERROR;
736     }
737     return it->second(index, vBucket, field, updateStmt);
738 }
739 
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)740 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
741     const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
742 {
743     int errCode = E_OK;
744     int index = 0;
745     for (const auto &field : fields) {
746         index++;
747         errCode = BindOneField(index, vBucket, field, upsertStmt);
748         if (errCode != E_OK) {
749             return errCode;
750         }
751     }
752     return errCode;
753 }
754 
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey)755 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
756     std::vector<uint8_t> &hashKey)
757 {
758     int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 12, hashKey); // 12 is hash_key
759     if (errCode != E_OK) {
760         LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
761         return errCode;
762     }
763 
764     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 13, hashKey); // 13 is hash_key
765     if (errCode != E_OK) {
766         LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
767         return errCode;
768     }
769     return errCode;
770 }
771 
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)772 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
773     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
774 {
775     std::vector<uint8_t> hashKey;
776     int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
777     if (errCode != E_OK) {
778         return errCode;
779     }
780     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
781     if (errCode != E_OK) {
782         LOGE("Bind hash_key to insert log statement failed, %d", errCode);
783         return errCode;
784     }
785 
786     std::string cloudGid;
787     if (putDataMode_ == PutDataMode::SYNC) {
788         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
789         if (errCode != E_OK) {
790             LOGE("get gid for insert log statement failed, %d", errCode);
791             return -E_CLOUD_ERROR;
792         }
793     }
794 
795     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
796     if (errCode != E_OK) {
797         LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
798         return errCode;
799     }
800 
801     if (trackerTable.GetExtendName().empty() || vBucket.find(trackerTable.GetExtendName()) == vBucket.end()) {
802         errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 9, ""); // 9 is extend_field
803     } else {
804         Type extendValue = vBucket.at(trackerTable.GetExtendName());
805         errCode = SQLiteRelationalUtils::BindStatementByType(insertLogStmt, 9, extendValue); // 9 is extend_field
806     }
807     if (errCode != E_OK) {
808         LOGE("Bind extend_field to insert log statement failed, %d", errCode);
809         return errCode;
810     }
811 
812     errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
813     if (errCode != E_OK) {
814         return errCode;
815     }
816     return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey);
817 }
818 
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)819 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
820     const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
821 {
822     int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
823     int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
824     if (errCode != E_OK) {
825         LOGE("Bind rowid to insert log statement failed, %d", errCode);
826         return errCode;
827     }
828 
829     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, GetDev()); // 2 is device
830     if (errCode != E_OK) {
831         LOGE("Bind device to insert log statement failed, %d", errCode);
832         return errCode;
833     }
834 
835     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, GetDev()); // 3 is ori_device
836     if (errCode != E_OK) {
837         LOGE("Bind ori_device to insert log statement failed, %d", errCode);
838         return errCode;
839     }
840 
841     int64_t val = 0;
842     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
843     if (errCode != E_OK) {
844         LOGE("get modify time for insert log statement failed, %d", errCode);
845         return -E_CLOUD_ERROR;
846     }
847 
848     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
849     if (errCode != E_OK) {
850         LOGE("Bind timestamp to insert log statement failed, %d", errCode);
851         return errCode;
852     }
853 
854     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
855     if (errCode != E_OK) {
856         LOGE("get create time for insert log statement failed, %d", errCode);
857         return -E_CLOUD_ERROR;
858     }
859 
860     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
861     if (errCode != E_OK) {
862         LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
863         return errCode;
864     }
865 
866     errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, GetDataFlag())); // 6 is flag
867     if (errCode != E_OK) {
868         LOGE("Bind flag to insert log statement failed, %d", errCode);
869         return errCode;
870     }
871 
872     vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
873     return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
874 }
875 
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)876 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
877     const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
878 {
879     std::string where = " WHERE";
880     if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
881         where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
882             DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
883     }
884     if (!pkSet.empty() && queryByPk) {
885         if (!gidStr.empty()) {
886             where += " OR";
887         }
888         where += " (1 = 1";
889         for (const auto &pk : pkSet) {
890             where += (" AND " + pk + " = ?");
891         }
892         where += ");";
893     }
894     return where;
895 }
896 
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)897 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
898     const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
899     std::string &updateSql)
900 {
901     if (pkSet.empty() && gidStr.empty()) {
902         LOGE("update data fail because both primary key and gid is empty.");
903         return -E_CLOUD_ERROR;
904     }
905     std::string sql = "UPDATE " + tableSchema.name + " SET";
906     for (const auto &field : updateFields) {
907         sql +=  " " + field.colName + " = ?,";
908     }
909     sql.pop_back();
910     sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
911     updateSql = sql;
912     return E_OK;
913 }
914 
IsGidValid(const std::string & gidStr)915 static inline bool IsGidValid(const std::string &gidStr)
916 {
917     if (!gidStr.empty()) {
918         return gidStr.find("'") == std::string::npos;
919     }
920     return true;
921 }
922 
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)923 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
924     const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
925 {
926     std::string gidStr;
927     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
928     if (errCode != E_OK) {
929         LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
930         return errCode;
931     }
932     if (!IsGidValid(gidStr)) {
933         LOGE("invalid char in cloud gid");
934         return -E_CLOUD_ERROR;
935     }
936 
937     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
938     auto updateFields = GetUpdateField(vBucket, tableSchema);
939     std::string updateSql;
940     errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
941     if (errCode != E_OK) {
942         return errCode;
943     }
944 
945     errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
946     if (errCode != E_OK) {
947         LOGE("Get update statement failed when update cloud data, %d", errCode);
948         return errCode;
949     }
950 
951     // bind value
952     if (!pkSet.empty()) {
953         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
954         updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
955     }
956     errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
957     if (errCode != E_OK) {
958         LOGE("bind value to update statement failed when update cloud data, %d", errCode);
959         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
960     }
961     return errCode;
962 }
963 
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)964 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
965 {
966     if (putDataMode_ == PutDataMode::SYNC) {
967         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
968     }
969     sqlite3_stmt *updateStmt = nullptr;
970     int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
971     if (errCode != E_OK) {
972         LOGE("Get update data table statement fail, %d", errCode);
973         return errCode;
974     }
975 
976     // update data
977     errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
978     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
979         errCode = E_OK;
980     } else {
981         LOGE("update data failed when save cloud data:%d", errCode);
982         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
983         return errCode;
984     }
985     SQLiteUtils::ResetStatement(updateStmt, true, errCode);
986 
987     // update log
988     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
989     if (errCode != E_OK) {
990         LOGE("update log record failed when update cloud data, errCode = %d", errCode);
991     }
992     return errCode;
993 }
994 
IsAllowWithPrimaryKey(OpType opType)995 static inline bool IsAllowWithPrimaryKey(OpType opType)
996 {
997     return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
998         opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
999 }
1000 
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1001 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1002     OpType opType)
1003 {
1004     sqlite3_stmt *updateLogStmt = nullptr;
1005     std::vector<std::string> updateColName;
1006     int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1007     if (errCode != E_OK) {
1008         LOGE("Get update log statement failed, errCode = %d", errCode);
1009         return errCode;
1010     }
1011 
1012     errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1013         updateLogStmt);
1014     int ret = E_OK;
1015     if (errCode != E_OK) {
1016         LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1017         SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1018         return errCode != E_OK ? errCode : ret;
1019     }
1020 
1021     errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1022     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1023         errCode = E_OK;
1024     } else {
1025         LOGE("update log record failed when update cloud data:%d", errCode);
1026     }
1027     SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1028     return errCode != E_OK ? errCode : ret;
1029 }
1030 
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1031 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1032     const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1033     sqlite3_stmt *updateLogStmt)
1034 {
1035     int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1036     if (errCode != E_OK) {
1037         return errCode;
1038     }
1039     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1040     if (pkMap.empty()) {
1041         return E_OK;
1042     }
1043 
1044     std::vector<uint8_t> hashKey;
1045     errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1046     if (errCode != E_OK) {
1047         return errCode;
1048     }
1049     return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1050 }
1051 
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1052 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1053     const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1054 {
1055     std::string gidStr;
1056     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1057     if (errCode != E_OK) {
1058         LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1059         return errCode;
1060     }
1061     if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1062         LOGE("empty or invalid char in cloud gid");
1063         return -E_CLOUD_ERROR;
1064     }
1065 
1066     bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1067     std::string deleteSql = "DELETE FROM " + tableSchema.name;
1068     deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1069     errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1070     if (errCode != E_OK) {
1071         LOGE("Get delete statement failed when delete data, %d", errCode);
1072         return errCode;
1073     }
1074 
1075     int ret = E_OK;
1076     if (!pkSet.empty() && queryByPk) {
1077         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1078         errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1079         if (errCode != E_OK) {
1080             LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1081             SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1082         }
1083     }
1084     return errCode != E_OK ? errCode : ret;
1085 }
1086 
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1087 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1088     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1089 {
1090     if (isLogicDelete_) {
1091         return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1092     }
1093     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1094     sqlite3_stmt *deleteStmt = nullptr;
1095     int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1096     if (errCode != E_OK) {
1097         return errCode;
1098     }
1099     errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1100     int ret = E_OK;
1101     SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1102     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1103         LOGE("delete data failed when sync with cloud:%d", errCode);
1104         return errCode;
1105     }
1106     if (ret != E_OK) {
1107         LOGE("reset delete statement failed:%d", ret);
1108         return ret;
1109     }
1110 
1111     // update log
1112     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1113     if (errCode != E_OK) {
1114         LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1115     }
1116     return errCode;
1117 }
1118 
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1119 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1120     const TableSchema &tableSchema, OpType opType)
1121 {
1122     return UpdateLogRecord(vBucket, tableSchema, opType);
1123 }
1124 
DeleteTableTrigger(const std::string & missTable) const1125 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1126 {
1127     static const char *triggerEndName[] = {
1128         "_ON_INSERT",
1129         "_ON_UPDATE",
1130         "_ON_DELETE"
1131     };
1132     std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1133     for (const auto &endName : triggerEndName) {
1134         std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1135         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1136         if (errCode != E_OK) {
1137             LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1138             return errCode;
1139         }
1140     }
1141     return E_OK;
1142 }
1143 
SetLogicDelete(bool isLogicDelete)1144 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1145 {
1146     isLogicDelete_ = isLogicDelete;
1147 }
1148 
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1149 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1150     const std::string &status, const Key &hashKey)
1151 {
1152     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1153     sqlite3_stmt *stmt = nullptr;
1154     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1155     if (errCode != E_OK) {
1156         LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1157         return errCode;
1158     }
1159     int ret = E_OK;
1160     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1161     if (errCode != E_OK) {
1162         LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1163         SQLiteUtils::ResetStatement(stmt, true, ret);
1164         return errCode;
1165     }
1166     errCode = SQLiteUtils::StepWithRetry(stmt);
1167     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1168         errCode = E_OK;
1169     } else {
1170         LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1171     }
1172     SQLiteUtils::ResetStatement(stmt, true, ret);
1173     return errCode == E_OK ? ret : errCode;
1174 }
1175 
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1176 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1177 {
1178     maxUploadCount_ = maxUploadCount;
1179     maxUploadSize_ = maxUploadSize;
1180 }
1181 
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1182 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1183     const TableSchema &tableSchema, const TrackerTable &trackerTable)
1184 {
1185     LOGD("[RDBExecutor] logic delete skip delete data");
1186     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1187     if (errCode != E_OK) {
1188         return errCode;
1189     }
1190     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1191     if (errCode != E_OK) {
1192         return errCode;
1193     }
1194     if (!trackerTable.IsEmpty()) {
1195         return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1196     }
1197     return E_OK;
1198 }
1199 
InitCursorToMeta(const std::string & tableName)1200 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1201 {
1202     Value key;
1203     Value cursor;
1204     DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1205     int errCode = GetKvData(key, cursor);
1206     if (errCode == -E_NOT_FOUND) {
1207         DBCommon::StringToVector(std::string("0"), cursor);
1208         errCode = PutKvData(key, cursor);
1209         if (errCode != E_OK) {
1210             LOGE("Init cursor to meta table failed. %d", errCode);
1211         }
1212         return errCode;
1213     }
1214     if (errCode != E_OK) {
1215         LOGE("Get cursor from meta table failed. %d", errCode);
1216     }
1217     return errCode;
1218 }
1219 
SetTableSchema(const TableSchema & tableSchema)1220 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1221 {
1222     tableSchema_ = tableSchema;
1223 }
1224 
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1225 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1226     const std::vector<Field> &assetFields, VBucket &assetInfo)
1227 {
1228     int errCode = SQLiteUtils::StepWithRetry(stmt);
1229     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1230         int index = 0;
1231         for (const auto &field: assetFields) {
1232             Type cloudValue;
1233             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1234             if (errCode != E_OK) {
1235                 break;
1236             }
1237             errCode = PutVBucketByType(assetInfo, field, cloudValue);
1238             if (errCode != E_OK) {
1239                 break;
1240             }
1241         }
1242     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1243         errCode = E_OK;
1244     } else {
1245         LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1246     }
1247     return errCode;
1248 }
1249 
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo)1250 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1251     const VBucket &vBucket, const Field &field, VBucket &assetInfo)
1252 {
1253     if (field.type == TYPE_INDEX<Asset>) {
1254         Asset asset;
1255         UpdateLocalAssetId(vBucket, field.colName, asset);
1256         Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1257         if (assetDBPtr == nullptr) {
1258             return true;
1259         }
1260         Asset &assetDB = *assetDBPtr;
1261         if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1262             return true;
1263         }
1264     }
1265     if (field.type == TYPE_INDEX<Assets>) {
1266         Assets assets;
1267         UpdateLocalAssetsId(vBucket, field.colName, assets);
1268         Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1269         if (assetsDBPtr == nullptr) {
1270             return true;
1271         }
1272         Assets &assetsDB = *assetsDBPtr;
1273         if (assets.size() != assetsDB.size()) {
1274             return true;
1275         }
1276         for (uint32_t i = 0; i < assets.size(); ++i) {
1277             if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1278                 return true;
1279             }
1280         }
1281     }
1282     return false;
1283 }
1284 
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1285 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1286     const VBucket &vBucket)
1287 {
1288     std::vector<Field> assetFields;
1289     for (const auto &field : tableSchema.fields) {
1290         if (field.type == TYPE_INDEX<Asset>) {
1291             assetFields.push_back(field);
1292         }
1293         if (field.type == TYPE_INDEX<Assets>) {
1294             assetFields.push_back(field);
1295         }
1296     }
1297     if (assetFields.empty()) {
1298         return false;
1299     }
1300     sqlite3_stmt *selectStmt = nullptr;
1301     std::string queryAssetsSql = "SELECT ";
1302     for (const auto &field : assetFields) {
1303         queryAssetsSql += field.colName + ",";
1304     }
1305     queryAssetsSql.pop_back();
1306     queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1307         std::to_string(dataKey) + ";";
1308     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1309     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1310         LOGE("Get select assets statement failed, %d.", errCode);
1311         return true;
1312     }
1313     ResFinalizer finalizer([selectStmt]() {
1314         sqlite3_stmt *statementInner = selectStmt;
1315         int ret = E_OK;
1316         SQLiteUtils::ResetStatement(statementInner, true, ret);
1317         if (ret != E_OK) {
1318             LOGW("Reset stmt failed %d when get asset", ret);
1319         }
1320     });
1321     VBucket assetInfo;
1322     errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1323     if (errCode != E_OK) {
1324         return true;
1325     }
1326     for (const auto &field : assetFields) {
1327         if (IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo)) {
1328             return true;
1329         }
1330     }
1331     return false;
1332 }
1333 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1334 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1335     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1336 {
1337     if (downloadData.data.size() != downloadData.opType.size()) {
1338         LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1339         return -E_CLOUD_ERROR;
1340     }
1341     std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1342         " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1343     sqlite3_stmt *stmt = nullptr;
1344     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1345     if (errCode != E_OK) {
1346         LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1347         return errCode;
1348     }
1349     int ret = E_OK;
1350     int index = 0;
1351     for (const auto &data: downloadData.data) {
1352         SQLiteUtils::ResetStatement(stmt, false, ret);
1353         OpType opType = downloadData.opType[index++];
1354         if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1355             continue;
1356         }
1357         errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1358         if (errCode != E_OK) {
1359             break;
1360         }
1361     }
1362     SQLiteUtils::ResetStatement(stmt, true, ret);
1363     return errCode == E_OK ? ret : errCode;
1364 }
1365 
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1366 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1367     const CloudSyncBatch &batchData)
1368 {
1369     if (batchData.extend.empty()) {
1370         return E_OK;
1371     }
1372     if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1373         LOGE("invalid sync data for filling version.");
1374         return -E_INVALID_ARGS;
1375     }
1376     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1377         "' SET version = ? WHERE hash_key = ? ";
1378     sqlite3_stmt *stmt = nullptr;
1379     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1380     if (errCode != E_OK) {
1381         return errCode;
1382     }
1383     int ret = E_OK;
1384     for (size_t i = 0; i < batchData.extend.size(); ++i) {
1385         errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1386         if (errCode != E_OK) {
1387             LOGE("bind update version stmt failed.");
1388             SQLiteUtils::ResetStatement(stmt, true, ret);
1389             return errCode;
1390         }
1391     }
1392     SQLiteUtils::ResetStatement(stmt, true, ret);
1393     return ret;
1394 }
1395 
QueryCount(const std::string & tableName,int64_t & count)1396 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1397 {
1398     return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1399 }
1400 
CheckInventoryData(const std::string & tableName)1401 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1402 {
1403     int64_t dataCount = 0;
1404     int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1405     if (errCode != E_OK) {
1406         LOGE("Query count failed.", errCode);
1407         return errCode;
1408     }
1409     return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1410 }
1411 
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1412 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp &timestamp,
1413     SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1414 {
1415     sqlite3_stmt *stmt = nullptr;
1416     int errCode = helper.GetCloudQueryStatement(false, dbHandle_, timestamp, sql, stmt);
1417     if (errCode != E_OK) {
1418         LOGE("failed to get count statement %d", errCode);
1419         return errCode;
1420     }
1421     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1422     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1423         count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1424         errCode = E_OK;
1425     } else {
1426         LOGE("Failed to get the count to be uploaded. %d", errCode);
1427     }
1428     SQLiteUtils::ResetStatement(stmt, true, errCode);
1429     return errCode;
1430 }
1431 
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1432 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp &timestamp, bool isCloudForcePush,
1433     bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1434 {
1435     int errCode;
1436     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1437     if (errCode != E_OK) {
1438         return errCode;
1439     }
1440     std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1441         CloudWaterType::DELETE);
1442     return GetUploadCountInner(timestamp, helper, sql, count);
1443 }
1444 
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1445 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> &timestampVec,
1446     bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1447 {
1448     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1449     if (timestampVec.size() != typeVec.size()) {
1450         return -E_INVALID_ARGS;
1451     }
1452     int errCode;
1453     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1454     if (errCode != E_OK) {
1455         return errCode;
1456     }
1457     count = 0;
1458     for (size_t i = 0; i < typeVec.size(); i++) {
1459         std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1460         int64_t tempCount = 0;
1461         helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1462         errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1463         if (errCode != E_OK) {
1464             return errCode;
1465         }
1466         count += tempCount;
1467     }
1468     return E_OK;
1469 }
1470 
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1471 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1472     bool ignoreEmptyGid)
1473 {
1474     if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1475         cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1476         return -E_INVALID_ARGS;
1477     }
1478     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1479         + "' SET cloud_gid = ? WHERE data_key = ? ";
1480     sqlite3_stmt *stmt = nullptr;
1481     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1482     if (errCode != E_OK) {
1483         return errCode;
1484     }
1485     errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1486     int resetCode = E_OK;
1487     SQLiteUtils::ResetStatement(stmt, true, resetCode);
1488     return errCode == E_OK ? resetCode : errCode;
1489 }
1490 
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1491 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1492     CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1493 {
1494     token.GetCloudTableSchema(tableSchema_);
1495     sqlite3_stmt *queryStmt = nullptr;
1496     bool isStepNext = false;
1497     int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1498     if (errCode != E_OK) {
1499         (void)token.ReleaseCloudStatement();
1500         return errCode;
1501     }
1502     uint32_t totalSize = 0;
1503     uint32_t stepNum = -1;
1504     do {
1505         if (isStepNext) {
1506             errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1507             if (errCode != E_OK) {
1508                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1509                 break;
1510             }
1511         }
1512         isStepNext = true;
1513         errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1514     } while (errCode == E_OK);
1515     if (errCode != -E_UNFINISHED) {
1516         (void)token.ReleaseCloudStatement();
1517     }
1518     return errCode;
1519 }
1520 
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1521 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1522 {
1523     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1524         Asset asset;
1525         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1526         if (errCode != E_OK) {
1527             return errCode;
1528         }
1529         if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1530             return -E_CLOUD_ERROR;
1531         }
1532         vBucket.insert_or_assign(field.colName, asset);
1533     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1534         Assets assets;
1535         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1536         if (errCode != E_OK) {
1537             return errCode;
1538         }
1539         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1540             return -E_CLOUD_ERROR;
1541         }
1542         vBucket.insert_or_assign(field.colName, assets);
1543     } else {
1544         vBucket.insert_or_assign(field.colName, cloudValue);
1545     }
1546     return E_OK;
1547 }
1548 } // namespace DistributedDB
1549 #endif
1550