1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)34 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
35 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
36 {
37 std::string querySql;
38 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
39 std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
40 int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
41 if (errCode != E_OK) {
42 LOGE("Get query log sql fail, %d", errCode);
43 return errCode;
44 }
45 if (!pkSet.empty()) {
46 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, dataInfoWithLog.logInfo.hashKey, true);
47 if (errCode != E_OK) {
48 LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
49 return errCode;
50 }
51 }
52 sqlite3_stmt *selectStmt = nullptr;
53 errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, dataInfoWithLog.logInfo.hashKey, selectStmt);
54 if (errCode != E_OK) {
55 LOGE("Get query log statement fail, %d", errCode);
56 return errCode;
57 }
58
59 bool alreadyFound = false;
60 do {
61 errCode = SQLiteUtils::StepWithRetry(selectStmt);
62 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
63 if (alreadyFound) {
64 LOGE("found more than one records in log table for one primary key or gid.");
65 errCode = -E_CLOUD_ERROR;
66 break;
67 }
68 alreadyFound = true;
69 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
70 errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
71 if (errCode != E_OK) {
72 LOGE("Get info by statement fail, %d", errCode);
73 break;
74 }
75 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
76 errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
77 break;
78 } else {
79 LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
80 break;
81 }
82 } while (errCode == E_OK);
83
84 int ret = E_OK;
85 SQLiteUtils::ResetStatement(selectStmt, true, ret);
86 return errCode != E_OK ? errCode : ret;
87 }
88
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)89 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
90 {
91 int index = 0;
92 logInfo.dataKey = sqlite3_column_int64(statement, index++);
93 std::vector<uint8_t> device;
94 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, device); // 1 is device
95 DBCommon::VectorToString(device, logInfo.device);
96 std::vector<uint8_t> originDev;
97 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, originDev); // 2 is originDev
98 DBCommon::VectorToString(originDev, logInfo.originDev);
99 logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 3 is timestamp
100 logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, index++)); // 4 is wtimestamp
101 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, index++)); // 5 is flag
102 (void)SQLiteUtils::GetColumnBlobValue(statement, index++, logInfo.hashKey); // 6 is hash_key
103 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.cloudGid); // 7 is cloud_gid
104 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.sharingResource); // 8 is sharing_resource
105 logInfo.status = static_cast<uint64_t>(sqlite3_column_int64(statement, index++)); // 9 is status
106 (void)SQLiteUtils::GetColumnTextValue(statement, index++, logInfo.version); // 10 is version
107 return index;
108 }
109
GetInfoByStatement(sqlite3_stmt * statement,const std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)110 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
111 const std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
112 VBucket &assetInfo)
113 {
114 int index = GetLogInfoByStatement(statement, dataInfoWithLog.logInfo); // start index of assetInfo or primary key
115 int errCode = E_OK;
116 for (const auto &field: assetFields) {
117 Type cloudValue;
118 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
119 if (errCode != E_OK) {
120 break;
121 }
122 errCode = PutVBucketByType(assetInfo, field, cloudValue);
123 if (errCode != E_OK) {
124 break;
125 }
126 }
127 if (errCode != E_OK) {
128 LOGE("set asset field failed, errCode = %d", errCode);
129 return errCode;
130 }
131
132 // fill primary key
133 for (const auto &item : pkMap) {
134 Type cloudValue;
135 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
136 if (errCode != E_OK) {
137 break;
138 }
139 errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
140 if (errCode != E_OK) {
141 break;
142 }
143 }
144 return errCode;
145 }
146
GetInsertSqlForCloudSync(const TableSchema & tableSchema)147 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
148 {
149 std::string sql = "insert into " + tableSchema.name + "(";
150 for (const auto &field : tableSchema.fields) {
151 sql += field.colName + ",";
152 }
153 sql.pop_back();
154 sql += ") values(";
155 for (size_t i = 0; i < tableSchema.fields.size(); i++) {
156 sql += "?,";
157 }
158 sql.pop_back();
159 sql += ");";
160 return sql;
161 }
162
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)163 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
164 const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
165 {
166 int errCode = E_OK;
167 TableInfo localTable = localSchema_.GetTable(tableSchema.name);
168 // table name in cloud schema is in lower case
169 if (!DBCommon::CaseInsensitiveCompare(localTable.GetTableName(), tableSchema.name)) {
170 LOGE("localSchema doesn't contain table from cloud");
171 return -E_INTERNAL_ERROR;
172 }
173
174 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
175 if (pkMap.size() == 0) {
176 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
177 std::vector<uint8_t> value;
178 DBCommon::StringToVector(std::to_string(rowid), value);
179 errCode = DBCommon::CalcValueHash(value, hashValue);
180 } else {
181 std::tie(errCode, hashValue) = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(vBucket,
182 tableSchema, localTable, pkMap, allowEmpty);
183 }
184 return errCode;
185 }
186
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,const Key & hashKey,sqlite3_stmt * & selectStmt)187 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
188 const VBucket &vBucket, const std::string &querySql, const Key &hashKey, sqlite3_stmt *&selectStmt)
189 {
190 int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
191 if (errCode != E_OK) {
192 LOGE("Get select log statement failed, %d", errCode);
193 return errCode;
194 }
195
196 std::string cloudGid;
197 int ret = E_OK;
198 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
199 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
200 SQLiteUtils::ResetStatement(selectStmt, true, ret);
201 LOGE("Get cloud gid fail when bind query log statement.");
202 return errCode;
203 }
204
205 int index = 0;
206 if (!cloudGid.empty()) {
207 index++;
208 errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
209 if (errCode != E_OK) {
210 LOGE("Bind cloud gid to query log statement failed. %d", errCode);
211 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
212 return errCode;
213 }
214 }
215
216 index++;
217 errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashKey, true);
218 if (errCode != E_OK) {
219 LOGE("Bind hash key to query log statement failed. %d", errCode);
220 SQLiteUtils::ResetStatement(selectStmt, true, ret);
221 }
222 return errCode != E_OK ? errCode : ret;
223 }
224
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,const std::set<std::string> & pkSet,std::string & querySql)225 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
226 const std::set<std::string> &pkSet, std::string &querySql)
227 {
228 std::string cloudGid;
229 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
230 if (errCode != E_OK) {
231 LOGE("Get cloud gid fail when query log table.");
232 return errCode;
233 }
234
235 if (pkSet.empty() && cloudGid.empty()) {
236 LOGE("query log table failed because of both primary key and gid are empty.");
237 return -E_CLOUD_ERROR;
238 }
239 std::string sql = "SELECT data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid,"
240 " sharing_resource, status, version FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
241 if (!cloudGid.empty()) {
242 sql += "cloud_gid = ? OR ";
243 }
244 sql += "hash_key = ?";
245
246 querySql = sql;
247 return E_OK;
248 }
249
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData,std::map<int,int> & statisticMap)250 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
251 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData,
252 std::map<int, int> &statisticMap)
253 {
254 int index = 0;
255 int errCode = E_OK;
256 for (OpType op : downloadData.opType) {
257 VBucket &vBucket = downloadData.data[index];
258 switch (op) {
259 case OpType::INSERT:
260 errCode = InsertCloudData(vBucket, tableSchema, trackerTable, GetLocalDataKey(index, downloadData));
261 break;
262 case OpType::UPDATE:
263 errCode = UpdateCloudData(vBucket, tableSchema);
264 break;
265 case OpType::DELETE:
266 errCode = DeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
267 break;
268 case OpType::ONLY_UPDATE_GID:
269 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
270 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
271 case OpType::UPDATE_TIMESTAMP:
272 case OpType::CLEAR_GID:
273 case OpType::LOCKED_NOT_HANDLE:
274 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
275 [[fallthrough]];
276 case OpType::NOT_HANDLE:
277 errCode = errCode == E_OK ? OnlyUpdateAssetId(tableName, tableSchema, vBucket,
278 GetLocalDataKey(index, downloadData), op) : errCode;
279 break;
280 default:
281 errCode = -E_CLOUD_ERROR;
282 break;
283 }
284 if (errCode != E_OK) {
285 LOGE("put cloud sync data fail: %d", errCode);
286 return errCode;
287 }
288 statisticMap[static_cast<int>(op)]++;
289 index++;
290 }
291 return errCode;
292 }
293
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets,std::vector<std::string> & notifyTableList)294 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
295 const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
296 std::vector<Asset> &assets, std::vector<std::string> ¬ifyTableList)
297 {
298 int errCode = SetLogTriggerStatus(false);
299 if (errCode != E_OK) {
300 LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
301 return errCode;
302 }
303 if (mode == FLAG_ONLY) {
304 errCode = DoCleanLogs(tableNameList, localSchema);
305 if (errCode != E_OK) {
306 LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
307 return errCode;
308 }
309 notifyTableList = tableNameList;
310 } else if (mode == FLAG_AND_DATA) {
311 errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
312 if (errCode != E_OK) {
313 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
314 return errCode;
315 }
316 notifyTableList = tableNameList;
317 } else if (mode == CLEAR_SHARED_TABLE) {
318 errCode = DoCleanShareTableDataAndLog(tableNameList);
319 if (errCode != E_OK) {
320 LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
321 return errCode;
322 }
323 }
324 errCode = SetLogTriggerStatus(true);
325 if (errCode != E_OK) {
326 LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
327 }
328
329 return errCode;
330 }
331
DoCleanLogs(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)332 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList,
333 const RelationalSchemaObject &localSchema)
334 {
335 int errCode = E_OK;
336 int i = 1;
337 for (const auto &tableName: tableNameList) {
338 std::string logTableName = DBCommon::GetLogTableName(tableName);
339 LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
340 errCode = DoCleanAssetId(tableName, localSchema);
341 if (errCode != E_OK) {
342 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
343 return errCode;
344 }
345 errCode = CleanCloudDataOnLogTable(logTableName, FLAG_ONLY);
346 if (errCode != E_OK) {
347 LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
348 return errCode;
349 }
350 i++;
351 }
352
353 return errCode;
354 }
355
UpdateCursor(sqlite3_context * ctx,int argc,sqlite3_value ** argv)356 void SQLiteSingleVerRelationalStorageExecutor::UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv)
357 {
358 if (ctx == nullptr || argc != 0 || argv == nullptr) {
359 LOGW("[SqlSinRDBExe][UpdateCursor] invalid param=%d", argc);
360 return;
361 }
362 auto context = static_cast<UpdateCursorContext *>(sqlite3_user_data(ctx));
363 if (context == nullptr) {
364 LOGW("[SqlSinRDBExe][UpdateCursor] invalid context");
365 return;
366 }
367 context->cursor++;
368 sqlite3_result_int64(ctx, static_cast<sqlite3_int64>(context->cursor));
369 }
370
CreateFuncUpdateCursor(UpdateCursorContext & context,void (* updateCursor)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const371 int SQLiteSingleVerRelationalStorageExecutor::CreateFuncUpdateCursor(UpdateCursorContext &context,
372 void (*updateCursor)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
373 {
374 std::string sql = "update_cursor";
375 int errCode = sqlite3_create_function_v2(dbHandle_, sql.c_str(), 0, SQLITE_UTF8 | SQLITE_DIRECTONLY,
376 &context, updateCursor, nullptr, nullptr, nullptr);
377 if (errCode != SQLITE_OK) {
378 LOGE("[Storage Executor][UpdateCursor] Create func=updateCursor failed=%d", errCode);
379 return SQLiteUtils::MapSQLiteErrno(errCode);
380 }
381 return E_OK;
382 }
383
GetCursor(const std::string & tableName)384 int SQLiteSingleVerRelationalStorageExecutor::GetCursor(const std::string &tableName)
385 {
386 int cursor = -1;
387 std::string sql = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX + "metadata where key = ?;";
388 sqlite3_stmt *stmt = nullptr;
389 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
390 if (errCode != E_OK) {
391 LOGE("[Storage Executor]get cursor failed=%d", errCode);
392 return cursor;
393 }
394 ResFinalizer finalizer([stmt]() {
395 sqlite3_stmt *statement = stmt;
396 int ret = E_OK;
397 SQLiteUtils::ResetStatement(statement, true, ret);
398 if (ret != E_OK) {
399 LOGW("Reset stmt failed %d when get cursor", ret);
400 }
401 });
402 Key key;
403 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
404 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, key, false); // first arg.
405 if (errCode != E_OK) {
406 return cursor;
407 }
408 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
409 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
410 cursor = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
411 }
412 return cursor;
413 }
414
SetCursor(const std::string & tableName,int cursor)415 int SQLiteSingleVerRelationalStorageExecutor::SetCursor(const std::string &tableName, int cursor)
416 {
417 std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + "metadata SET VALUE = ? where KEY = ?;";
418 sqlite3_stmt *stmt = nullptr;
419 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
420 if (errCode != E_OK) {
421 LOGE("Set cursor sql failed=%d", errCode);
422 return cursor;
423 }
424 ResFinalizer finalizer([stmt]() {
425 sqlite3_stmt *statement = stmt;
426 int ret = E_OK;
427 SQLiteUtils::ResetStatement(statement, true, ret);
428 if (ret != E_OK) {
429 LOGW("Reset stmt failed %d when set cursor", ret);
430 }
431 });
432 int index = 1;
433 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, cursor);
434 if (errCode != E_OK) {
435 LOGE("Bind saved cursor failed:%d", errCode);
436 return errCode;
437 }
438 Key key;
439 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
440 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, key, false);
441 if (errCode != E_OK) {
442 return cursor;
443 }
444 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
445 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
446 errCode = E_OK;
447 }
448 return errCode;
449 }
450
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)451 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
452 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
453 {
454 int errCode = E_OK;
455 for (size_t i = 0; i < tableNameList.size(); i++) {
456 std::string tableName = tableNameList[i];
457 std::string logTableName = DBCommon::GetLogTableName(tableName);
458 std::vector<int64_t> dataKeys;
459 errCode = GetCleanCloudDataKeys(logTableName, dataKeys, true);
460 if (errCode != E_OK) {
461 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
462 return errCode;
463 }
464
465 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
466 errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
467 if (errCode != E_OK) {
468 LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
469 return errCode;
470 }
471 if (isLogicDelete_) {
472 errCode = SetDataOnUserTableWithLogicDelete(tableName, logTableName);
473 } else {
474 errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName, localSchema);
475 }
476 if (errCode != E_OK) {
477 LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
478 return errCode;
479 }
480 }
481
482 return errCode;
483 }
484
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)485 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
486 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
487 {
488 int errCode = E_OK;
489 int ret = E_OK;
490 sqlite3_stmt *selectStmt = nullptr;
491 for (const auto &rowId : dataKeys) {
492 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
493 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
494 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
495 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
496 LOGE("Get select asset statement failed, %d", errCode);
497 return errCode;
498 }
499 errCode = SQLiteUtils::StepWithRetry(selectStmt);
500 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
501 std::vector<uint8_t> blobValue;
502 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
503 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
504 LOGE("Get column blob value failed, %d", errCode);
505 goto END;
506 }
507 if (blobValue.empty()) {
508 SQLiteUtils::ResetStatement(selectStmt, true, ret);
509 continue;
510 }
511 Asset asset;
512 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
513 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
514 LOGE("Transfer blob to asset failed, %d", errCode);
515 goto END;
516 }
517 assets.push_back(asset);
518 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
519 errCode = E_OK;
520 Asset asset;
521 assets.push_back(asset);
522 }
523 SQLiteUtils::ResetStatement(selectStmt, true, ret);
524 }
525 return errCode != E_OK ? errCode : ret;
526 END:
527 SQLiteUtils::ResetStatement(selectStmt, true, ret);
528 return errCode != E_OK ? errCode : ret;
529 }
530
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)531 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
532 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
533 {
534 int errCode = E_OK;
535 int ret = E_OK;
536 sqlite3_stmt *selectStmt = nullptr;
537 for (const auto &rowId : dataKeys) {
538 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
539 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
540 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
541 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
542 LOGE("Get select assets statement failed, %d", errCode);
543 goto END;
544 }
545 errCode = SQLiteUtils::StepWithRetry(selectStmt);
546 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
547 std::vector<uint8_t> blobValue;
548 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
549 if (errCode != E_OK) {
550 goto END;
551 }
552 Assets tmpAssets;
553 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
554 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
555 goto END;
556 }
557 for (const auto &asset: tmpAssets) {
558 assets.push_back(asset);
559 }
560 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
561 errCode = E_OK;
562 }
563 SQLiteUtils::ResetStatement(selectStmt, true, ret);
564 }
565 return errCode != E_OK ? errCode : ret;
566 END:
567 SQLiteUtils::ResetStatement(selectStmt, true, ret);
568 return errCode != E_OK ? errCode : ret;
569 }
570
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)571 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
572 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
573 {
574 int errCode = E_OK;
575 for (const auto &fieldInfo: fieldInfos) {
576 if (fieldInfo.IsAssetType()) {
577 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
578 if (errCode != E_OK) {
579 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
580 return errCode;
581 }
582 } else if (fieldInfo.IsAssetsType()) {
583 errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
584 if (errCode != E_OK) {
585 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
586 return errCode;
587 }
588 }
589 }
590 return errCode;
591 }
592
SetCursorIncFlag(bool flag)593 int SQLiteSingleVerRelationalStorageExecutor::SetCursorIncFlag(bool flag)
594 {
595 std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
596 " VALUES ('cursor_inc_flag', ";
597 if (flag) {
598 sql += "'true'";
599 } else {
600 sql += "'false'";
601 }
602 sql += ");";
603 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
604 if (errCode != E_OK) {
605 LOGE("set cursor inc flag fail, errCode = %d", errCode);
606 }
607 return errCode;
608 }
609
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,const TrackerTable & trackerTable,DownloadData & downloadData)610 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
611 const TableSchema &tableSchema, const TrackerTable &trackerTable, DownloadData &downloadData)
612 {
613 if (downloadData.data.size() != downloadData.opType.size()) {
614 LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
615 downloadData.opType.size());
616 return -E_CLOUD_ERROR;
617 }
618
619 int errCode = SetLogTriggerStatus(false);
620 if (errCode != E_OK) {
621 LOGE("Fail to set log trigger off, %d", errCode);
622 return errCode;
623 }
624
625 std::map<int, int> statisticMap = {};
626 errCode = ExecutePutCloudData(tableName, tableSchema, trackerTable, downloadData, statisticMap);
627 int ret = SetLogTriggerStatus(true);
628 if (ret != E_OK) {
629 LOGE("Fail to set log trigger on, %d", ret);
630 }
631 LOGI("save cloud data:%d, ins:%d, upd:%d, del:%d, only gid:%d, flag zero:%d, flag one:%d, upd timestamp:%d,"
632 "clear gid:%d, not handle:%d, lock:%d",
633 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
634 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
635 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
636 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
637 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
638 statisticMap[static_cast<int>(OpType::NOT_HANDLE)], statisticMap[static_cast<int>(OpType::LOCKED_NOT_HANDLE)]);
639 return errCode == E_OK ? ret : errCode;
640 }
641
InsertCloudData(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,int64_t dataKey)642 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema,
643 const TrackerTable &trackerTable, int64_t dataKey)
644 {
645 int errCode = E_OK;
646 if (dataKey > 0) {
647 errCode = RemoveDataAndLog(tableSchema.name, dataKey);
648 if (errCode != E_OK) {
649 return errCode;
650 }
651 }
652 std::string sql = GetInsertSqlForCloudSync(tableSchema);
653 sqlite3_stmt *insertStmt = nullptr;
654 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
655 if (errCode != E_OK) {
656 LOGE("Get insert statement failed when save cloud data, %d", errCode);
657 return errCode;
658 }
659 if (putDataMode_ == PutDataMode::SYNC) {
660 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
661 }
662 errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
663 if (errCode != E_OK) {
664 SQLiteUtils::ResetStatement(insertStmt, true, errCode);
665 return errCode;
666 }
667 // insert data
668 errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
669 int ret = E_OK;
670 SQLiteUtils::ResetStatement(insertStmt, true, ret);
671 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
672 LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
673 return errCode;
674 }
675
676 // insert log
677 return InsertLogRecord(tableSchema, trackerTable, vBucket);
678 }
679
InsertLogRecord(const TableSchema & tableSchema,const TrackerTable & trackerTable,VBucket & vBucket)680 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema,
681 const TrackerTable &trackerTable, VBucket &vBucket)
682 {
683 if (putDataMode_ == PutDataMode::SYNC && !CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
684 // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
685 // so we need to delete the old log record according to the gid first
686 std::string gidStr;
687 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
688 if (errCode != E_OK || gidStr.empty()) {
689 LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
690 return errCode;
691 }
692 std::string sql = "DELETE FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE cloud_gid = '"
693 + gidStr + "';";
694 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
695 if (errCode != E_OK) {
696 LOGE("delete log record according gid fail, errCode = %d", errCode);
697 return errCode;
698 }
699 }
700
701 std::string sql = "INSERT OR REPLACE INTO " + DBCommon::GetLogTableName(tableSchema.name) +
702 " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, " + "CASE WHEN (SELECT status FROM " +
703 DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) IS NULL THEN 0 ELSE " +
704 "(SELECT status FROM " + DBCommon::GetLogTableName(tableSchema.name) + " WHERE hash_key=?) " + "END)";
705 sqlite3_stmt *insertLogStmt = nullptr;
706 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
707 if (errCode != E_OK) {
708 LOGE("Get insert log statement failed when save cloud data, %d", errCode);
709 return errCode;
710 }
711
712 errCode = BindValueToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
713 if (errCode != E_OK) {
714 SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
715 return errCode;
716 }
717
718 errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
719 int ret = E_OK;
720 SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
721 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
722 return ret;
723 } else {
724 LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
725 return errCode;
726 }
727 }
728
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)729 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
730 sqlite3_stmt *updateStmt)
731 {
732 auto it = bindCloudFieldFuncMap_.find(field.type);
733 if (it == bindCloudFieldFuncMap_.end()) {
734 LOGE("unknown cloud type when bind one field.");
735 return -E_CLOUD_ERROR;
736 }
737 return it->second(index, vBucket, field, updateStmt);
738 }
739
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)740 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
741 const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
742 {
743 int errCode = E_OK;
744 int index = 0;
745 for (const auto &field : fields) {
746 index++;
747 errCode = BindOneField(index, vBucket, field, upsertStmt);
748 if (errCode != E_OK) {
749 return errCode;
750 }
751 }
752 return errCode;
753 }
754
BindStatusSubQueryHashKeyStatement(sqlite3_stmt * insertLogStmt,std::vector<uint8_t> & hashKey)755 int SQLiteSingleVerRelationalStorageExecutor::BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt,
756 std::vector<uint8_t> &hashKey)
757 {
758 int errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 12, hashKey); // 12 is hash_key
759 if (errCode != E_OK) {
760 LOGE("Bind hash_key to status subQuery statement failed, %d", errCode);
761 return errCode;
762 }
763
764 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 13, hashKey); // 13 is hash_key
765 if (errCode != E_OK) {
766 LOGE("Bind hash_key to status subQuery2 statement failed, %d", errCode);
767 return errCode;
768 }
769 return errCode;
770 }
771
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)772 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
773 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
774 {
775 std::vector<uint8_t> hashKey;
776 int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
777 if (errCode != E_OK) {
778 return errCode;
779 }
780 errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
781 if (errCode != E_OK) {
782 LOGE("Bind hash_key to insert log statement failed, %d", errCode);
783 return errCode;
784 }
785
786 std::string cloudGid;
787 if (putDataMode_ == PutDataMode::SYNC) {
788 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
789 if (errCode != E_OK) {
790 LOGE("get gid for insert log statement failed, %d", errCode);
791 return -E_CLOUD_ERROR;
792 }
793 }
794
795 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
796 if (errCode != E_OK) {
797 LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
798 return errCode;
799 }
800
801 if (trackerTable.GetExtendName().empty() || vBucket.find(trackerTable.GetExtendName()) == vBucket.end()) {
802 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 9, ""); // 9 is extend_field
803 } else {
804 Type extendValue = vBucket.at(trackerTable.GetExtendName());
805 errCode = SQLiteRelationalUtils::BindStatementByType(insertLogStmt, 9, extendValue); // 9 is extend_field
806 }
807 if (errCode != E_OK) {
808 LOGE("Bind extend_field to insert log statement failed, %d", errCode);
809 return errCode;
810 }
811
812 errCode = BindShareValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
813 if (errCode != E_OK) {
814 return errCode;
815 }
816 return BindStatusSubQueryHashKeyStatement(insertLogStmt, hashKey);
817 }
818
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable,sqlite3_stmt * insertLogStmt)819 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
820 const TableSchema &tableSchema, const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt)
821 {
822 int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
823 int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
824 if (errCode != E_OK) {
825 LOGE("Bind rowid to insert log statement failed, %d", errCode);
826 return errCode;
827 }
828
829 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, GetDev()); // 2 is device
830 if (errCode != E_OK) {
831 LOGE("Bind device to insert log statement failed, %d", errCode);
832 return errCode;
833 }
834
835 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, GetDev()); // 3 is ori_device
836 if (errCode != E_OK) {
837 LOGE("Bind ori_device to insert log statement failed, %d", errCode);
838 return errCode;
839 }
840
841 int64_t val = 0;
842 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
843 if (errCode != E_OK) {
844 LOGE("get modify time for insert log statement failed, %d", errCode);
845 return -E_CLOUD_ERROR;
846 }
847
848 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
849 if (errCode != E_OK) {
850 LOGE("Bind timestamp to insert log statement failed, %d", errCode);
851 return errCode;
852 }
853
854 errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
855 if (errCode != E_OK) {
856 LOGE("get create time for insert log statement failed, %d", errCode);
857 return -E_CLOUD_ERROR;
858 }
859
860 errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
861 if (errCode != E_OK) {
862 LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
863 return errCode;
864 }
865
866 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, GetDataFlag())); // 6 is flag
867 if (errCode != E_OK) {
868 LOGE("Bind flag to insert log statement failed, %d", errCode);
869 return errCode;
870 }
871
872 vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
873 return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, trackerTable, insertLogStmt);
874 }
875
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)876 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
877 const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
878 {
879 std::string where = " WHERE";
880 if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
881 where += " " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (SELECT data_key FROM " +
882 DBCommon::GetLogTableName(tableName) + " WHERE cloud_gid = '" + gidStr + "')";
883 }
884 if (!pkSet.empty() && queryByPk) {
885 if (!gidStr.empty()) {
886 where += " OR";
887 }
888 where += " (1 = 1";
889 for (const auto &pk : pkSet) {
890 where += (" AND " + pk + " = ?");
891 }
892 where += ");";
893 }
894 return where;
895 }
896
GetUpdateSqlForCloudSync(const std::vector<Field> & updateFields,const TableSchema & tableSchema,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)897 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields,
898 const TableSchema &tableSchema, const std::string &gidStr, const std::set<std::string> &pkSet,
899 std::string &updateSql)
900 {
901 if (pkSet.empty() && gidStr.empty()) {
902 LOGE("update data fail because both primary key and gid is empty.");
903 return -E_CLOUD_ERROR;
904 }
905 std::string sql = "UPDATE " + tableSchema.name + " SET";
906 for (const auto &field : updateFields) {
907 sql += " " + field.colName + " = ?,";
908 }
909 sql.pop_back();
910 sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
911 updateSql = sql;
912 return E_OK;
913 }
914
IsGidValid(const std::string & gidStr)915 static inline bool IsGidValid(const std::string &gidStr)
916 {
917 if (!gidStr.empty()) {
918 return gidStr.find("'") == std::string::npos;
919 }
920 return true;
921 }
922
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)923 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
924 const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
925 {
926 std::string gidStr;
927 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
928 if (errCode != E_OK) {
929 LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
930 return errCode;
931 }
932 if (!IsGidValid(gidStr)) {
933 LOGE("invalid char in cloud gid");
934 return -E_CLOUD_ERROR;
935 }
936
937 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
938 auto updateFields = GetUpdateField(vBucket, tableSchema);
939 std::string updateSql;
940 errCode = GetUpdateSqlForCloudSync(updateFields, tableSchema, gidStr, pkSet, updateSql);
941 if (errCode != E_OK) {
942 return errCode;
943 }
944
945 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
946 if (errCode != E_OK) {
947 LOGE("Get update statement failed when update cloud data, %d", errCode);
948 return errCode;
949 }
950
951 // bind value
952 if (!pkSet.empty()) {
953 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
954 updateFields.insert(updateFields.end(), pkFields.begin(), pkFields.end());
955 }
956 errCode = BindValueToUpsertStatement(vBucket, updateFields, updateStmt);
957 if (errCode != E_OK) {
958 LOGE("bind value to update statement failed when update cloud data, %d", errCode);
959 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
960 }
961 return errCode;
962 }
963
UpdateCloudData(VBucket & vBucket,const TableSchema & tableSchema)964 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema)
965 {
966 if (putDataMode_ == PutDataMode::SYNC) {
967 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
968 }
969 sqlite3_stmt *updateStmt = nullptr;
970 int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
971 if (errCode != E_OK) {
972 LOGE("Get update data table statement fail, %d", errCode);
973 return errCode;
974 }
975
976 // update data
977 errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
978 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
979 errCode = E_OK;
980 } else {
981 LOGE("update data failed when save cloud data:%d", errCode);
982 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
983 return errCode;
984 }
985 SQLiteUtils::ResetStatement(updateStmt, true, errCode);
986
987 // update log
988 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
989 if (errCode != E_OK) {
990 LOGE("update log record failed when update cloud data, errCode = %d", errCode);
991 }
992 return errCode;
993 }
994
IsAllowWithPrimaryKey(OpType opType)995 static inline bool IsAllowWithPrimaryKey(OpType opType)
996 {
997 return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
998 opType == OpType::ONLY_UPDATE_GID || opType == OpType::LOCKED_NOT_HANDLE);
999 }
1000
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1001 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
1002 OpType opType)
1003 {
1004 sqlite3_stmt *updateLogStmt = nullptr;
1005 std::vector<std::string> updateColName;
1006 int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
1007 if (errCode != E_OK) {
1008 LOGE("Get update log statement failed, errCode = %d", errCode);
1009 return errCode;
1010 }
1011
1012 errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
1013 updateLogStmt);
1014 int ret = E_OK;
1015 if (errCode != E_OK) {
1016 LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
1017 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1018 return errCode != E_OK ? errCode : ret;
1019 }
1020
1021 errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
1022 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1023 errCode = E_OK;
1024 } else {
1025 LOGE("update log record failed when update cloud data:%d", errCode);
1026 }
1027 SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
1028 return errCode != E_OK ? errCode : ret;
1029 }
1030
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)1031 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
1032 const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
1033 sqlite3_stmt *updateLogStmt)
1034 {
1035 int errCode = CloudStorageUtils::BindUpdateLogStmtFromVBucket(vBucket, tableSchema, colNames, updateLogStmt);
1036 if (errCode != E_OK) {
1037 return errCode;
1038 }
1039 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1040 if (pkMap.empty()) {
1041 return E_OK;
1042 }
1043
1044 std::vector<uint8_t> hashKey;
1045 errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
1046 if (errCode != E_OK) {
1047 return errCode;
1048 }
1049 return SQLiteUtils::BindBlobToStatement(updateLogStmt, colNames.size() + 1, hashKey);
1050 }
1051
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)1052 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
1053 const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
1054 {
1055 std::string gidStr;
1056 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
1057 if (errCode != E_OK) {
1058 LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
1059 return errCode;
1060 }
1061 if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
1062 LOGE("empty or invalid char in cloud gid");
1063 return -E_CLOUD_ERROR;
1064 }
1065
1066 bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
1067 std::string deleteSql = "DELETE FROM " + tableSchema.name;
1068 deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
1069 errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
1070 if (errCode != E_OK) {
1071 LOGE("Get delete statement failed when delete data, %d", errCode);
1072 return errCode;
1073 }
1074
1075 int ret = E_OK;
1076 if (!pkSet.empty() && queryByPk) {
1077 std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema, true);
1078 errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
1079 if (errCode != E_OK) {
1080 LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
1081 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1082 }
1083 }
1084 return errCode != E_OK ? errCode : ret;
1085 }
1086
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1087 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1088 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1089 {
1090 if (isLogicDelete_) {
1091 return LogicDeleteCloudData(tableName, vBucket, tableSchema, trackerTable);
1092 }
1093 std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1094 sqlite3_stmt *deleteStmt = nullptr;
1095 int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
1096 if (errCode != E_OK) {
1097 return errCode;
1098 }
1099 errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
1100 int ret = E_OK;
1101 SQLiteUtils::ResetStatement(deleteStmt, true, ret);
1102 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1103 LOGE("delete data failed when sync with cloud:%d", errCode);
1104 return errCode;
1105 }
1106 if (ret != E_OK) {
1107 LOGE("reset delete statement failed:%d", ret);
1108 return ret;
1109 }
1110
1111 // update log
1112 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1113 if (errCode != E_OK) {
1114 LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
1115 }
1116 return errCode;
1117 }
1118
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)1119 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
1120 const TableSchema &tableSchema, OpType opType)
1121 {
1122 return UpdateLogRecord(vBucket, tableSchema, opType);
1123 }
1124
DeleteTableTrigger(const std::string & missTable) const1125 int SQLiteSingleVerRelationalStorageExecutor::DeleteTableTrigger(const std::string &missTable) const
1126 {
1127 static const char *triggerEndName[] = {
1128 "_ON_INSERT",
1129 "_ON_UPDATE",
1130 "_ON_DELETE"
1131 };
1132 std::string logTableName = DBConstant::SYSTEM_TABLE_PREFIX + missTable;
1133 for (const auto &endName : triggerEndName) {
1134 std::string deleteSql = "DROP TRIGGER IF EXISTS " + logTableName + endName + ";";
1135 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1136 if (errCode != E_OK) {
1137 LOGE("[DeleteTableTrigger] Drop trigger failed. %d", errCode);
1138 return errCode;
1139 }
1140 }
1141 return E_OK;
1142 }
1143
SetLogicDelete(bool isLogicDelete)1144 void SQLiteSingleVerRelationalStorageExecutor::SetLogicDelete(bool isLogicDelete)
1145 {
1146 isLogicDelete_ = isLogicDelete;
1147 }
1148
UpdateRecordStatus(const std::string & tableName,const std::string & status,const Key & hashKey)1149 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordStatus(const std::string &tableName,
1150 const std::string &status, const Key &hashKey)
1151 {
1152 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " + status + " WHERE hash_key = ?;";
1153 sqlite3_stmt *stmt = nullptr;
1154 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1155 if (errCode != E_OK) {
1156 LOGE("[Storage Executor] Get stmt failed when update record status, %d", errCode);
1157 return errCode;
1158 }
1159 int ret = E_OK;
1160 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey); // 1 is bind index of hashKey
1161 if (errCode != E_OK) {
1162 LOGE("[Storage Executor] Bind hashKey to update record status stmt failed, %d", errCode);
1163 SQLiteUtils::ResetStatement(stmt, true, ret);
1164 return errCode;
1165 }
1166 errCode = SQLiteUtils::StepWithRetry(stmt);
1167 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1168 errCode = E_OK;
1169 } else {
1170 LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1171 }
1172 SQLiteUtils::ResetStatement(stmt, true, ret);
1173 return errCode == E_OK ? ret : errCode;
1174 }
1175
SetUploadConfig(int32_t maxUploadCount,int32_t maxUploadSize)1176 void SQLiteSingleVerRelationalStorageExecutor::SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize)
1177 {
1178 maxUploadCount_ = maxUploadCount;
1179 maxUploadSize_ = maxUploadSize;
1180 }
1181
LogicDeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema,const TrackerTable & trackerTable)1182 int SQLiteSingleVerRelationalStorageExecutor::LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket,
1183 const TableSchema &tableSchema, const TrackerTable &trackerTable)
1184 {
1185 LOGD("[RDBExecutor] logic delete skip delete data");
1186 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, CloudStorageUtils::GetCursorIncSql(tableName));
1187 if (errCode != E_OK) {
1188 return errCode;
1189 }
1190 errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
1191 if (errCode != E_OK) {
1192 return errCode;
1193 }
1194 if (!trackerTable.IsEmpty()) {
1195 return SQLiteRelationalUtils::SelectServerObserver(dbHandle_, tableName, true);
1196 }
1197 return E_OK;
1198 }
1199
InitCursorToMeta(const std::string & tableName)1200 int SQLiteSingleVerRelationalStorageExecutor::InitCursorToMeta(const std::string &tableName)
1201 {
1202 Value key;
1203 Value cursor;
1204 DBCommon::StringToVector(DBCommon::GetCursorKey(tableName), key);
1205 int errCode = GetKvData(key, cursor);
1206 if (errCode == -E_NOT_FOUND) {
1207 DBCommon::StringToVector(std::string("0"), cursor);
1208 errCode = PutKvData(key, cursor);
1209 if (errCode != E_OK) {
1210 LOGE("Init cursor to meta table failed. %d", errCode);
1211 }
1212 return errCode;
1213 }
1214 if (errCode != E_OK) {
1215 LOGE("Get cursor from meta table failed. %d", errCode);
1216 }
1217 return errCode;
1218 }
1219
SetTableSchema(const TableSchema & tableSchema)1220 void SQLiteSingleVerRelationalStorageExecutor::SetTableSchema(const TableSchema &tableSchema)
1221 {
1222 tableSchema_ = tableSchema;
1223 }
1224
GetAssetInfoOnTable(sqlite3_stmt * & stmt,const std::vector<Field> & assetFields,VBucket & assetInfo)1225 int SQLiteSingleVerRelationalStorageExecutor::GetAssetInfoOnTable(sqlite3_stmt *&stmt,
1226 const std::vector<Field> &assetFields, VBucket &assetInfo)
1227 {
1228 int errCode = SQLiteUtils::StepWithRetry(stmt);
1229 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1230 int index = 0;
1231 for (const auto &field: assetFields) {
1232 Type cloudValue;
1233 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1234 if (errCode != E_OK) {
1235 break;
1236 }
1237 errCode = PutVBucketByType(assetInfo, field, cloudValue);
1238 if (errCode != E_OK) {
1239 break;
1240 }
1241 }
1242 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1243 errCode = E_OK;
1244 } else {
1245 LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1246 }
1247 return errCode;
1248 }
1249
IsNeedUpdateAssetIdInner(sqlite3_stmt * selectStmt,const VBucket & vBucket,const Field & field,VBucket & assetInfo)1250 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt,
1251 const VBucket &vBucket, const Field &field, VBucket &assetInfo)
1252 {
1253 if (field.type == TYPE_INDEX<Asset>) {
1254 Asset asset;
1255 UpdateLocalAssetId(vBucket, field.colName, asset);
1256 Asset *assetDBPtr = std::get_if<Asset>(&assetInfo[field.colName]);
1257 if (assetDBPtr == nullptr) {
1258 return true;
1259 }
1260 Asset &assetDB = *assetDBPtr;
1261 if (assetDB.assetId != asset.assetId || asset.status != AssetStatus::NORMAL) {
1262 return true;
1263 }
1264 }
1265 if (field.type == TYPE_INDEX<Assets>) {
1266 Assets assets;
1267 UpdateLocalAssetsId(vBucket, field.colName, assets);
1268 Assets *assetsDBPtr = std::get_if<Assets>(&assetInfo[field.colName]);
1269 if (assetsDBPtr == nullptr) {
1270 return true;
1271 }
1272 Assets &assetsDB = *assetsDBPtr;
1273 if (assets.size() != assetsDB.size()) {
1274 return true;
1275 }
1276 for (uint32_t i = 0; i < assets.size(); ++i) {
1277 if (assets[i].assetId != assetsDB[i].assetId || assets[i].status != AssetStatus::NORMAL) {
1278 return true;
1279 }
1280 }
1281 }
1282 return false;
1283 }
1284
IsNeedUpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1285 bool SQLiteSingleVerRelationalStorageExecutor::IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1286 const VBucket &vBucket)
1287 {
1288 std::vector<Field> assetFields;
1289 for (const auto &field : tableSchema.fields) {
1290 if (field.type == TYPE_INDEX<Asset>) {
1291 assetFields.push_back(field);
1292 }
1293 if (field.type == TYPE_INDEX<Assets>) {
1294 assetFields.push_back(field);
1295 }
1296 }
1297 if (assetFields.empty()) {
1298 return false;
1299 }
1300 sqlite3_stmt *selectStmt = nullptr;
1301 std::string queryAssetsSql = "SELECT ";
1302 for (const auto &field : assetFields) {
1303 queryAssetsSql += field.colName + ",";
1304 }
1305 queryAssetsSql.pop_back();
1306 queryAssetsSql += " FROM '" + tableSchema.name + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " +
1307 std::to_string(dataKey) + ";";
1308 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1309 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1310 LOGE("Get select assets statement failed, %d.", errCode);
1311 return true;
1312 }
1313 ResFinalizer finalizer([selectStmt]() {
1314 sqlite3_stmt *statementInner = selectStmt;
1315 int ret = E_OK;
1316 SQLiteUtils::ResetStatement(statementInner, true, ret);
1317 if (ret != E_OK) {
1318 LOGW("Reset stmt failed %d when get asset", ret);
1319 }
1320 });
1321 VBucket assetInfo;
1322 errCode = GetAssetInfoOnTable(selectStmt, assetFields, assetInfo);
1323 if (errCode != E_OK) {
1324 return true;
1325 }
1326 for (const auto &field : assetFields) {
1327 if (IsNeedUpdateAssetIdInner(selectStmt, vBucket, field, assetInfo)) {
1328 return true;
1329 }
1330 }
1331 return false;
1332 }
1333
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)1334 int SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsConsistent(const std::string &tableName,
1335 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
1336 {
1337 if (downloadData.data.size() != downloadData.opType.size()) {
1338 LOGE("The num of data:%zu an opType:%zu is not equal.", downloadData.data.size(), downloadData.opType.size());
1339 return -E_CLOUD_ERROR;
1340 }
1341 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) +
1342 " SET flag=flag&(~0x20), " + CloudDbConstant::UNLOCKING_TO_UNLOCK + " WHERE cloud_gid=? and timestamp=?;";
1343 sqlite3_stmt *stmt = nullptr;
1344 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1345 if (errCode != E_OK) {
1346 LOGE("Get mark flag as consistent stmt failed, %d.", errCode);
1347 return errCode;
1348 }
1349 int ret = E_OK;
1350 int index = 0;
1351 for (const auto &data: downloadData.data) {
1352 SQLiteUtils::ResetStatement(stmt, false, ret);
1353 OpType opType = downloadData.opType[index++];
1354 if (opType == OpType::NOT_HANDLE || opType == OpType::LOCKED_NOT_HANDLE) {
1355 continue;
1356 }
1357 errCode = CloudStorageUtils::BindStepConsistentFlagStmt(stmt, data, gidFilters);
1358 if (errCode != E_OK) {
1359 break;
1360 }
1361 }
1362 SQLiteUtils::ResetStatement(stmt, true, ret);
1363 return errCode == E_OK ? ret : errCode;
1364 }
1365
FillCloudVersionForUpload(const std::string & tableName,const CloudSyncBatch & batchData)1366 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const std::string &tableName,
1367 const CloudSyncBatch &batchData)
1368 {
1369 if (batchData.extend.empty()) {
1370 return E_OK;
1371 }
1372 if (batchData.hashKey.empty() || batchData.extend.size() != batchData.hashKey.size()) {
1373 LOGE("invalid sync data for filling version.");
1374 return -E_INVALID_ARGS;
1375 }
1376 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(tableName) +
1377 "' SET version = ? WHERE hash_key = ? ";
1378 sqlite3_stmt *stmt = nullptr;
1379 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1380 if (errCode != E_OK) {
1381 return errCode;
1382 }
1383 int ret = E_OK;
1384 for (size_t i = 0; i < batchData.extend.size(); ++i) {
1385 errCode = BindUpdateVersionStatement(batchData.extend[i], batchData.hashKey[i], stmt);
1386 if (errCode != E_OK) {
1387 LOGE("bind update version stmt failed.");
1388 SQLiteUtils::ResetStatement(stmt, true, ret);
1389 return errCode;
1390 }
1391 }
1392 SQLiteUtils::ResetStatement(stmt, true, ret);
1393 return ret;
1394 }
1395
QueryCount(const std::string & tableName,int64_t & count)1396 int SQLiteSingleVerRelationalStorageExecutor::QueryCount(const std::string &tableName, int64_t &count)
1397 {
1398 return SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, count);
1399 }
1400
CheckInventoryData(const std::string & tableName)1401 int SQLiteSingleVerRelationalStorageExecutor::CheckInventoryData(const std::string &tableName)
1402 {
1403 int64_t dataCount = 0;
1404 int errCode = SQLiteRelationalUtils::QueryCount(dbHandle_, tableName, dataCount);
1405 if (errCode != E_OK) {
1406 LOGE("Query count failed.", errCode);
1407 return errCode;
1408 }
1409 return dataCount > 0 ? -E_WITH_INVENTORY_DATA : E_OK;
1410 }
1411
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1412 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp ×tamp,
1413 SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1414 {
1415 sqlite3_stmt *stmt = nullptr;
1416 int errCode = helper.GetCloudQueryStatement(false, dbHandle_, timestamp, sql, stmt);
1417 if (errCode != E_OK) {
1418 LOGE("failed to get count statement %d", errCode);
1419 return errCode;
1420 }
1421 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1422 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1423 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1424 errCode = E_OK;
1425 } else {
1426 LOGE("Failed to get the count to be uploaded. %d", errCode);
1427 }
1428 SQLiteUtils::ResetStatement(stmt, true, errCode);
1429 return errCode;
1430 }
1431
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1432 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush,
1433 bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1434 {
1435 int errCode;
1436 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1437 if (errCode != E_OK) {
1438 return errCode;
1439 }
1440 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1441 CloudWaterType::DELETE);
1442 return GetUploadCountInner(timestamp, helper, sql, count);
1443 }
1444
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1445 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> ×tampVec,
1446 bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1447 {
1448 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1449 if (timestampVec.size() != typeVec.size()) {
1450 return -E_INVALID_ARGS;
1451 }
1452 int errCode;
1453 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1454 if (errCode != E_OK) {
1455 return errCode;
1456 }
1457 count = 0;
1458 for (size_t i = 0; i < typeVec.size(); i++) {
1459 std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1460 int64_t tempCount = 0;
1461 helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1462 errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1463 if (errCode != E_OK) {
1464 return errCode;
1465 }
1466 count += tempCount;
1467 }
1468 return E_OK;
1469 }
1470
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1471 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1472 bool ignoreEmptyGid)
1473 {
1474 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1475 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1476 return -E_INVALID_ARGS;
1477 }
1478 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1479 + "' SET cloud_gid = ? WHERE data_key = ? ";
1480 sqlite3_stmt *stmt = nullptr;
1481 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1482 if (errCode != E_OK) {
1483 return errCode;
1484 }
1485 errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1486 int resetCode = E_OK;
1487 SQLiteUtils::ResetStatement(stmt, true, resetCode);
1488 return errCode == E_OK ? resetCode : errCode;
1489 }
1490
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1491 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1492 CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1493 {
1494 token.GetCloudTableSchema(tableSchema_);
1495 sqlite3_stmt *queryStmt = nullptr;
1496 bool isStepNext = false;
1497 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1498 if (errCode != E_OK) {
1499 (void)token.ReleaseCloudStatement();
1500 return errCode;
1501 }
1502 uint32_t totalSize = 0;
1503 uint32_t stepNum = -1;
1504 do {
1505 if (isStepNext) {
1506 errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1507 if (errCode != E_OK) {
1508 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1509 break;
1510 }
1511 }
1512 isStepNext = true;
1513 errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1514 } while (errCode == E_OK);
1515 if (errCode != -E_UNFINISHED) {
1516 (void)token.ReleaseCloudStatement();
1517 }
1518 return errCode;
1519 }
1520
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1521 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1522 {
1523 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1524 Asset asset;
1525 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1526 if (errCode != E_OK) {
1527 return errCode;
1528 }
1529 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1530 return -E_CLOUD_ERROR;
1531 }
1532 vBucket.insert_or_assign(field.colName, asset);
1533 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1534 Assets assets;
1535 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1536 if (errCode != E_OK) {
1537 return errCode;
1538 }
1539 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets) || !CloudStorageUtils::CheckAssetStatus(assets)) {
1540 return -E_CLOUD_ERROR;
1541 }
1542 vBucket.insert_or_assign(field.colName, assets);
1543 } else {
1544 vBucket.insert_or_assign(field.colName, cloudValue);
1545 }
1546 return E_OK;
1547 }
1548 } // namespace DistributedDB
1549 #endif
1550