1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27 
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32 
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34 
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36     std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38     if (assetFields.empty() && pkSet.empty()) {
39         return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40     }
41     std::string gid;
42     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43     if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44         LOGE("Get cloud gid fail when query log table.");
45         return errCode;
46     }
47 
48     if (pkSet.empty() && gid.empty()) {
49         LOGE("query log table failed because of both primary key and gid are empty.");
50         return -E_CLOUD_ERROR;
51     }
52     std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53         " a.cloud_gid, a.sharing_resource, a.status, a.version";
54     for (const auto &field : assetFields) {
55         sql += ", b." + field.colName;
56     }
57     for (const auto &pk : pkSet) {
58         sql += ", b." + pk;
59     }
60     sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61     if (!gid.empty()) {
62         sql += " a.cloud_gid = ? or ";
63     }
64     sql += "a.hash_key = ?";
65     querySql = sql;
66     return E_OK;
67 }
68 
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70     const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72     std::string sql = "UPDATE " + tableName + " SET ";
73     for (const auto &field: fields) {
74         sql += field.colName + " = ?,";
75     }
76     sql.pop_back();
77     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78     sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79     sqlite3_stmt *stmt = nullptr;
80     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81     if (errCode != E_OK) {
82         LOGE("Get fill asset statement failed, %d.", errCode);
83         return errCode;
84     }
85     for (size_t i = 0; i < fields.size(); ++i) {
86         errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87         if (errCode != E_OK) {
88             SQLiteUtils::ResetStatement(stmt, true, errCode);
89             return errCode;
90         }
91     }
92     statement = stmt;
93     return errCode;
94 }
95 
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)96 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
97     VBucket &vBucket, bool isDownloadSuccess)
98 {
99     std::string cloudGid;
100     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
101     if (errCode != E_OK) {
102         LOGE("Miss gid when fill Asset.");
103         return errCode;
104     }
105     std::vector<Field> assetsField;
106     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
107     if (errCode != E_OK) {
108         LOGE("No assets need to be filled.");
109         return errCode;
110     }
111     CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
112 
113     Bytes hashKey;
114     (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
115     VBucket dbAssets;
116     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
117     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
118         LOGE("get assets by gid or hashkey failed %d.", errCode);
119         return errCode;
120     }
121     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
122         AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
123 
124     if (isDownloadSuccess) {
125         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
126             CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
127         errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid);
128         if (errCode != E_OK) {
129             return errCode;
130         }
131     } else {
132         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
133             CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
134     }
135 
136     sqlite3_stmt *stmt = nullptr;
137     errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
138     if (errCode != E_OK) {
139         return errCode;
140     }
141     errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
142     int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
143     return errCode == E_OK ? ret : errCode;
144 }
145 
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid)146 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
147     const std::string &gid)
148 {
149     int cursor = GetCursor(tableName);
150     cursor++;
151     std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
152     sql += " SET cursor = ? where cloud_gid = ?;";
153     sqlite3_stmt *statement = nullptr;
154     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
155     if (errCode != E_OK) {
156         LOGE("get update asset data cursor stmt failed %d.", errCode);
157         return errCode;
158     }
159     ResFinalizer finalizer([statement]() {
160         sqlite3_stmt *statementInner = statement;
161         int ret = E_OK;
162         SQLiteUtils::ResetStatement(statementInner, true, ret);
163         if (ret != E_OK) {
164             LOGW("Reset  stmt failed %d when increase cursor on asset data", ret);
165         }
166     });
167     int index = 1;
168     errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
169     if (errCode != E_OK) {
170         LOGE("bind cursor data stmt failed %d.", errCode);
171         return errCode;
172     }
173     errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
174     if (errCode != E_OK) {
175         LOGE("bind cursor gid data stmt failed %d.", errCode);
176         return errCode;
177     }
178     errCode = SQLiteUtils::StepWithRetry(statement, false);
179     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
180         LOGE("Fill upload asset failed:%d.", errCode);
181         return errCode;
182     }
183     LOGI("Upgrade cursor to %d after asset download success.", cursor);
184     errCode = SetCursor(tableName, cursor);
185     if (errCode != E_OK) {
186         LOGE("Upgrade cursor failed after asset download success %d.", errCode);
187     }
188     return errCode;
189 }
190 
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)191 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
192     const CloudSyncBatch &data)
193 {
194     int errCode = E_OK;
195     if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
196         return errCode;
197     }
198     errCode = SetLogTriggerStatus(false);
199     if (errCode != E_OK) {
200         LOGE("Fail to set log trigger off, %d.", errCode);
201         return errCode;
202     }
203     sqlite3_stmt *stmt = nullptr;
204     for (size_t i = 0; i < data.assets.size(); ++i) {
205         if (data.assets.at(i).empty()) {
206             continue;
207         }
208         if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
209             DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
210             continue;
211         }
212         errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
213         if (errCode != E_OK) {
214             if (errCode == -E_NOT_FOUND) {
215                 errCode = E_OK;
216                 continue;
217             }
218             break;
219         }
220         errCode = SQLiteUtils::StepWithRetry(stmt, false);
221         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
222             LOGE("Fill upload asset failed:%d.", errCode);
223             break;
224         }
225         errCode = E_OK;
226         SQLiteUtils::ResetStatement(stmt, true, errCode);
227         stmt = nullptr;
228         if (errCode != E_OK) {
229             break;
230         }
231     }
232     int ret = E_OK;
233     SQLiteUtils::ResetStatement(stmt, true, ret);
234     int endCode = SetLogTriggerStatus(true);
235     if (endCode != E_OK) {
236         LOGE("Fail to set log trigger off, %d.", endCode);
237         return endCode;
238     }
239     return errCode != E_OK ? errCode : ret;
240 }
241 
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)242 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
243 {
244     switch (opType) {
245         case OpType::UPDATE_VERSION:
246             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
247         case OpType::INSERT_VERSION:
248             return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
249         default:
250             LOGE("Fill version with unknown type %d", static_cast<int>(opType));
251             return -E_INVALID_ARGS;
252     }
253 }
254 
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)255 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
256     sqlite3_stmt *&stmt)
257 {
258     int errCode = E_OK;
259     std::string version;
260     if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
261         vBucket, version) != E_OK) {
262         LOGW("get version from vBucket failed.");
263     }
264     if (hashKey.empty()) {
265         LOGE("hash key is empty when update version.");
266         return -E_CLOUD_ERROR;
267     }
268     errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
269     if (errCode != E_OK) {
270         return errCode;
271     }
272     errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
273     if (errCode != E_OK) {
274         return errCode;
275     }
276     errCode = SQLiteUtils::StepWithRetry(stmt, false);
277     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
278         errCode = E_OK;
279         SQLiteUtils::ResetStatement(stmt, false, errCode);
280     } else {
281         LOGE("step version stmt failed: %d.", errCode);
282     }
283     return errCode;
284 }
285 
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)286 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
287     const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
288 {
289     VBucket vBucket = data.assets.at(index);
290     VBucket dbAssets;
291     std::string cloudGid;
292     int errCode;
293     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
294     std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
295     if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
296         return errCode;
297     }
298     AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
299         AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
300     AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
301         action);
302     if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
303         CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
304             CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
305     } else {
306         if (DBCommon::IsRecordError(data.extend.at(index))) {
307             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
308                 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
309         } else {
310             CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
311                 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
312         }
313     }
314 
315     errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
316     if (errCode != E_OK) {
317         LOGE("get and bind asset failed %d.", errCode);
318         return errCode;
319     }
320     int64_t rowid = data.rowid[index];
321     return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
322 }
323 
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)324 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
325     TableInfo &tableInfo)
326 {
327     return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
328 }
329 
CreateTrackerTable(const TrackerTable & trackerTable,bool isUpgrade)330 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable, bool isUpgrade)
331 {
332     TableInfo table;
333     table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
334     int errCode = AnalysisTrackerTable(trackerTable, table);
335     if (errCode != E_OK) {
336         return errCode;
337     }
338     auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
339     if (trackerTable.GetTrackerColNames().empty()) {
340         // drop trigger
341         return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
342     }
343 
344     // create log table
345     errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
346     if (errCode != E_OK) {
347         return errCode;
348     }
349     // init cursor
350     errCode = InitCursorToMeta(table.GetTableName());
351     if (errCode != E_OK) {
352         return errCode;
353     }
354     std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, "");
355     if (isUpgrade) {
356         errCode = CleanExtendAndCursorForDeleteData(table.GetTableName());
357         if (errCode != E_OK) {
358             LOGE("clean tracker log info for deleted data failed %d.", errCode);
359             return errCode;
360         }
361     }
362     errCode = GeneLogInfoForExistedData(dbHandle_, trackerTable.GetTableName(), calPrimaryKeyHash, table);
363     if (errCode != E_OK) {
364         LOGE("general tracker log info for existed data failed %d.", errCode);
365         return errCode;
366     }
367     errCode = SetLogTriggerStatus(true);
368     if (errCode != E_OK) {
369         return errCode;
370     }
371     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
372     if (errCode != E_OK) {
373         return errCode;
374     }
375     if (!isUpgrade) {
376         return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
377     }
378     return E_OK;
379 }
380 
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)381 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
382 {
383     if (!schema.ToSchemaString().empty()) {
384         return E_OK;
385     }
386     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
387         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
388     Value schemaVal;
389     int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
390     if (errCode != E_OK) {
391         return errCode;
392     }
393     if (schemaVal.empty()) {
394         return -E_NOT_FOUND;
395     }
396     std::string schemaStr;
397     DBCommon::VectorToString(schemaVal, schemaStr);
398     errCode = schema.ParseFromTrackerSchemaString(schemaStr);
399     if (errCode != E_OK) {
400         LOGE("Parse from tracker schema string err.");
401     }
402     return errCode;
403 }
404 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
406 {
407     sqlite3_stmt *statement = nullptr;
408     int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
409     if (errCode != E_OK) {
410         LOGE("Execute sql failed when prepare stmt.");
411         return errCode;
412     }
413     size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
414     if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
415         LOGE("Sql bind args mismatch.");
416         SQLiteUtils::ResetStatement(statement, true, errCode);
417         return -E_INVALID_ARGS;
418     }
419     for (size_t i = 0; i < condition.bindArgs.size(); i++) {
420         Type type = condition.bindArgs[i];
421         errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
422         if (errCode != E_OK) {
423             int ret = E_OK;
424             SQLiteUtils::ResetStatement(statement, true, ret);
425             return errCode;
426         }
427     }
428     while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
429         VBucket bucket;
430         errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
431         if (errCode != E_OK) {
432             int ret = E_OK;
433             SQLiteUtils::ResetStatement(statement, true, ret);
434             return errCode;
435         }
436         records.push_back(std::move(bucket));
437     }
438     int ret = E_OK;
439     SQLiteUtils::ResetStatement(statement, true, ret);
440     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
441 }
442 
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)443 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
444     const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
445     std::set<std::string> &clearWaterMarkTables)
446 {
447     std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
448     for (const auto &table : changeTables) {
449         std::string logTableName = DBCommon::GetLogTableName(table);
450         bool isExists = false;
451         int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
452         if (errCode != E_OK) {
453             LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
454             return errCode;
455         }
456         if (!isExists) { // table maybe dropped after set reference
457             LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
458             continue;
459         }
460 
461         bool isEmpty = true;
462         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
463         if (errCode != E_OK) {
464             LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
465             clearWaterMarkTables.clear();
466             return errCode;
467         }
468         if (!isEmpty) {
469             clearWaterMarkTables.insert(table);
470         }
471     }
472     LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
473     return E_OK;
474 }
475 
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)476 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
477 {
478     if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
479         return E_OK;
480     }
481     std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
482     if (schemaChanged) {
483         std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
484             "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
485         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
486         if (ret != E_OK) {
487             LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
488             return ret;
489         }
490     }
491     if (tableInfo.GetTrackerTable().IsEmpty()) {
492         return E_OK;
493     }
494     LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
495     int errCode = SetLogTriggerStatus(false);
496     if (errCode != E_OK) {
497         return errCode;
498     }
499     std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
500     TrackerTable trackerTable = tableInfo.GetTrackerTable();
501     errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
502         [this, &sql]() {
503         int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
504         if (ret != E_OK) {
505             LOGE("Upgrade log for extend field failed.");
506         }
507         return ret;
508     });
509     return SetLogTriggerStatus(true);
510 }
511 
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)512 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
513 {
514     int errCode = E_OK;
515     std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
516     for (const auto &sql: dropSql) {
517         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
518         if (errCode != E_OK) {
519             LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
520             return errCode;
521         }
522     }
523     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
524     if (errCode != E_OK) {
525         LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
526         return errCode;
527     }
528     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
529     if (errCode != E_OK) {
530         LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
531         return errCode;
532     }
533     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
534     if (errCode != E_OK) {
535         LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
536     }
537     return errCode;
538 }
539 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)540 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
541     ChangeProperties &changeProperties)
542 {
543     std::string fileName;
544     if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
545         LOGE("get db file name failed.");
546         return -E_INVALID_DB;
547     }
548     SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
549     return E_OK;
550 }
551 
ClearAllTempSyncTrigger()552 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
553 {
554     sqlite3_stmt *stmt = nullptr;
555     static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
556     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
557     if (errCode != E_OK) {
558         LOGE("get clear all temp trigger stmt failed %d.", errCode);
559         return errCode;
560     }
561     int ret = E_OK;
562     while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
563         std::string str;
564         (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
565         if (errCode != E_OK) {
566             SQLiteUtils::ResetStatement(stmt, true, ret);
567             return errCode;
568         }
569         std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
570         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
571         if (errCode != E_OK) {
572             LOGE("drop temp trigger failed %d.", errCode);
573             SQLiteUtils::ResetStatement(stmt, true, ret);
574             return errCode;
575         }
576     }
577     SQLiteUtils::ResetStatement(stmt, true, ret);
578     return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
579 }
580 
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)581 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
582     bool isOnlyTrackTable)
583 {
584     std::string sql;
585     if (isOnlyTrackTable) {
586         sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
587     } else {
588         sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log SET extend_field = NULL";
589     }
590     sql += " where data_key = -1 and cursor <= ?;";
591     sqlite3_stmt *statement = nullptr;
592     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
593     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
594         LOGE("get clean tracker data stmt failed %d.", errCode);
595         return errCode;
596     }
597     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
598     int ret = E_OK;
599     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
600         LOGE("bind clean tracker data stmt failed %d.", errCode);
601         SQLiteUtils::ResetStatement(statement, true, ret);
602         return errCode;
603     }
604     errCode = SQLiteUtils::StepWithRetry(statement);
605     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
606         errCode = E_OK;
607     } else {
608         LOGE("clean tracker step failed: %d.", errCode);
609     }
610     SQLiteUtils::ResetStatement(statement, true, ret);
611     return errCode == E_OK ? ret : errCode;
612 }
613 
CreateSharedTable(const TableSchema & tableSchema)614 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
615 {
616     std::map<int32_t, std::string> cloudFieldTypeMap;
617     cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
618     cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
619     cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
620     cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
621     cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
622     cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
623     cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
624     cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
625 
626     std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
627     std::string primaryKey = ", PRIMARY KEY (";
628     createTableSql += CloudDbConstant::CLOUD_OWNER;
629     createTableSql += " TEXT, ";
630     createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
631     createTableSql += " TEXT";
632     primaryKey += CloudDbConstant::CLOUD_OWNER;
633     bool hasPrimaryKey = false;
634     for (const auto &field : tableSchema.fields) {
635         createTableSql += ", " + field.colName + " ";
636         createTableSql += cloudFieldTypeMap[field.type];
637         createTableSql += field.nullable ? "" : " NOT NULL";
638         if (field.primary) {
639             primaryKey += ", " + field.colName;
640             hasPrimaryKey = true;
641         }
642     }
643     if (hasPrimaryKey) {
644         createTableSql += primaryKey + ")";
645     }
646     createTableSql += ");";
647     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
648     if (errCode != E_OK) {
649         LOGE("Create shared table failed, %d.", errCode);
650     }
651     return errCode;
652 }
653 
DeleteTable(const std::vector<std::string> & tableNames)654 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
655 {
656     for (const auto &tableName : tableNames) {
657         std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
658         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
659         if (errCode != E_OK) {
660             LOGE("Delete table failed, %d.", errCode);
661             return errCode;
662         }
663     }
664     return E_OK;
665 }
666 
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)667 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
668     const std::map<std::string, std::vector<Field>> &updateTableNames)
669 {
670     int errCode = E_OK;
671     std::map<int32_t, std::string> fieldTypeMap;
672     fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
673     fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
674     fieldTypeMap[TYPE_INDEX<double>] = "REAL";
675     fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
676     fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
677     fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
678     fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
679     fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
680     for (const auto &table : updateTableNames) {
681         if (table.second.empty()) {
682             continue;
683         }
684         std::string addColumnSql = "";
685         for (const auto &field : table.second) {
686             addColumnSql += "ALTER TABLE " + table.first + " ADD ";
687             addColumnSql += field.colName + " ";
688             addColumnSql += fieldTypeMap[field.type];
689             addColumnSql += field.primary ? " PRIMARY KEY" : "";
690             addColumnSql += field.nullable ? ";" : " NOT NULL;";
691         }
692         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
693         if (errCode != E_OK) {
694             LOGE("Shared table add column failed, %d.", errCode);
695             return errCode;
696         }
697     }
698     return errCode;
699 }
700 
AlterTableName(const std::map<std::string,std::string> & tableNames)701 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
702 {
703     for (const auto &tableName : tableNames) {
704         std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
705         int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
706         if (errCode != E_OK) {
707             LOGE("Alter table name failed, %d.", errCode);
708             return errCode;
709         }
710     }
711     return E_OK;
712 }
713 
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)714 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
715     const VBucket &vBucket, std::string &sql)
716 {
717     std::string gidStr;
718     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
719     if (errCode != E_OK) {
720         LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
721         return errCode;
722     }
723 
724     sql += " WHERE ";
725     if (!gidStr.empty()) {
726         sql += "cloud_gid = '" + gidStr + "'";
727     }
728     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
729     if (!pkMap.empty()) {
730         if (!gidStr.empty()) {
731             sql += " OR ";
732         }
733         sql += "(hash_key = ?);";
734     }
735     return E_OK;
736 }
737 
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)738 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
739 {
740     int ret = E_OK;
741     int errCode = E_OK;
742     for (const auto &tableName: tableNameList) {
743         std::string delDataSql = "DELETE FROM '" + tableName + "';";
744         sqlite3_stmt *statement = nullptr;
745         errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
746         if (errCode != E_OK) {
747             LOGE("get clean shared data stmt failed %d.", errCode);
748             return errCode;
749         }
750         errCode = SQLiteUtils::StepWithRetry(statement);
751         SQLiteUtils::ResetStatement(statement, true, ret);
752         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
753             errCode = E_OK;
754         } else {
755             LOGE("clean shared data failed: %d.", errCode);
756             break;
757         }
758         statement = nullptr;
759         std::string delLogSql = "DELETE FROM '" + DBConstant::RELATIONAL_PREFIX + tableName + "_log';";
760         errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
761         if (errCode != E_OK) {
762             LOGE("get clean shared log stmt failed %d.", errCode);
763             return errCode;
764         }
765         errCode = SQLiteUtils::StepWithRetry(statement);
766         SQLiteUtils::ResetStatement(statement, true, ret);
767         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
768             errCode = E_OK;
769         } else {
770             LOGE("clean shared log failed: %d.", errCode);
771             break;
772         }
773     }
774     return errCode == E_OK ? ret : errCode;
775 }
776 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)777 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
778     const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
779     std::map<int64_t, Entries> &referenceGid)
780 {
781     int errCode = E_OK;
782     for (const auto &[targetTable, targetReference] : tableReference) {
783         errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
784         if (errCode != E_OK) {
785             LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
786             return errCode;
787         }
788     }
789     return errCode;
790 }
791 
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)792 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
793     const std::string &targetTable, const CloudSyncBatch &syncBatch,
794     const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
795 {
796     auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
797     if (sourceFields.empty()) {
798         LOGD("[RDBExecutor] source field is empty.");
799         return E_OK;
800     }
801     if (sourceFields.size() != targetFields.size()) {
802         LOGE("[RDBExecutor] reference field size not equal.");
803         return -E_INTERNAL_ERROR;
804     }
805     std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
806     sqlite3_stmt *statement = nullptr;
807     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
808     if (errCode != E_OK) {
809         LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
810         return errCode;
811     }
812     errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
813     int ret = E_OK;
814     SQLiteUtils::ResetStatement(statement, true, ret);
815     return errCode == E_OK ? ret : errCode;
816 }
817 
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)818 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
819     const std::string &targetTable, const std::vector<std::string> &sourceFields,
820     const std::vector<std::string> &targetFields)
821 {
822     // sql like this:
823     // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
824     //   (SELECT parent._rowid_ AS rowid_b FROM parent,
825     //     (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
826     //     WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
827     //   WHERE parent.name = source_a.name ) temp_table
828     // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
829     std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
830     std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
831     std::string sql;
832     sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
833     sql += "(";
834     sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
835            + ", ";
836     sql += "(SELECT " + sourceTable + "._rowid_,";
837     std::set<std::string> sourceFieldSet;
838     for (const auto &item : sourceFields) {
839         sourceFieldSet.insert(item);
840     }
841     for (const auto &sourceField : sourceFieldSet) {
842         sql += sourceField + ",";
843     }
844     sql.pop_back();
845     sql += " FROM " + sourceTable + ", " + logSourceTable;
846     sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
847     sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
848     sql += " WHERE ";
849     for (size_t i = 0u; i < sourceFields.size(); ++i) {
850         if (i != 0u) {
851             sql += " AND ";
852         }
853         sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
854     }
855     sql += ") temp_table ";
856     sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
857     sql += " AND " + logTargetTable + ".flag&0x08=0x00";
858     return sql;
859 }
860 
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)861 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
862     const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
863 {
864     int errCode = E_OK;
865     if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
866         LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
867             syncBatch.timestamp.size());
868         return -E_INVALID_ARGS;
869     }
870     int matchCount = 0;
871     for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
872         errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
873         if (errCode != E_OK) {
874             LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
875             break;
876         }
877         errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
878         if (errCode != E_OK) {
879             LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
880             break;
881         }
882         while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
883             std::string gid;
884             (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
885             if (gid.empty()) {
886                 LOGE("[RDBExecutor] reference data don't contain gid.");
887                 errCode = -E_CLOUD_ERROR;
888                 break;
889             }
890             referenceGid[syncBatch.rowid[i]][targetTable] = gid;
891             matchCount++;
892         }
893         if (errCode == -E_FINISHED) {
894             errCode = E_OK;
895         }
896         if (errCode != E_OK) {
897             LOGE("[RDBExecutor] step stmt failed %d.", errCode);
898             break;
899         }
900         SQLiteUtils::ResetStatement(statement, false, errCode);
901         if (errCode != E_OK) {
902             LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
903             break;
904         }
905     }
906     if (matchCount != 0) {
907         LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
908     }
909     return errCode;
910 }
911 
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)912 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
913     const std::vector<TableReferenceProperty> &targetTableReference)
914 {
915     PairStringVector sourceTargetFiled;
916     for (const auto &reference : targetTableReference) {
917         for (const auto &column : reference.columns) {
918             sourceTargetFiled.first.push_back(column.first);
919             sourceTargetFiled.second.push_back(column.second);
920         }
921     }
922     return sourceTargetFiled;
923 }
924 
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)925 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
926     bool ignoreEmptyGid, sqlite3_stmt *&stmt)
927 {
928     int fillGidCount = 0;
929     int errCode = E_OK;
930     for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
931         auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
932         if (gidEntry == cloudDataResult.insData.extend[i].end()) {
933             bool isSkipAssetsMissRecord = false;
934             if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
935                 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
936                 isSkipAssetsMissRecord = true;
937             }
938             if (ignoreEmptyGid || isSkipAssetsMissRecord) {
939                 continue;
940             }
941             errCode = -E_INVALID_ARGS;
942             LOGE("[RDBExecutor] Extend not contain gid.");
943             break;
944         }
945         bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
946         if (ignoreEmptyGid && containError) {
947             continue;
948         }
949         std::string val;
950         if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
951             cloudDataResult.insData.extend[i], val) != E_OK) {
952             errCode = -E_CLOUD_ERROR;
953             LOGE("[RDBExecutor] Can't get string gid from extend.");
954             break;
955         }
956         if (val.empty()) {
957             errCode = -E_CLOUD_ERROR;
958             LOGE("[RDBExecutor] Get empty gid from extend.");
959             break;
960         }
961         errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
962         if (errCode != E_OK) {
963             LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
964             break;
965         }
966     }
967     LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
968     return errCode;
969 }
970 
CleanExtendAndCursorForDeleteData(const std::string & tableName)971 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
972 {
973     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
974     std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
975     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
976     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
977         LOGE("update extend field and cursor failed %d.", errCode);
978     }
979     return errCode;
980 }
981 
CheckIfExistUserTable(const std::string & tableName)982 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
983 {
984     std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
985     sqlite3_stmt *statement = nullptr;
986     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
987     if (errCode != E_OK) {
988         LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
989         return errCode;
990     }
991     errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
992     if (errCode != E_OK) {
993         LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
994         SQLiteUtils::ResetStatement(statement, true, errCode);
995         return errCode;
996     }
997     if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
998         LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
999         SQLiteUtils::ResetStatement(statement, true, errCode);
1000         return -E_INVALID_ARGS;
1001     }
1002     SQLiteUtils::ResetStatement(statement, true, errCode);
1003     return E_OK;
1004 }
1005 
GetCloudDeleteSql(const std::string & table)1006 std::string SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table)
1007 {
1008     std::string logTable = DBCommon::GetLogTableName(table);
1009     int cursor = GetCursor(table);
1010     std::string sql;
1011     sql += " cloud_gid = '', version = '', ";
1012     if (isLogicDelete_) {
1013         // cursor already increased by DeleteCloudData, can be assigned directly here
1014         // 1001 which is logicDelete|cloudForcePush|local|delete
1015         sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1016             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1017             static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = " + std::to_string(cursor) + " ";
1018     } else {
1019         sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1020             std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1021         int errCode = SetCursor(table, cursor + 1);
1022         if (errCode == E_OK) {
1023             sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1024         } else {
1025             LOGW("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1026         }
1027     }
1028     return sql;
1029 }
1030 
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1031 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1032 {
1033     int errCode = E_OK;
1034     std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1035         std::to_string(dataKey);
1036     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1037     if (errCode != E_OK) {
1038         LOGE("[RDBExecutor] remove data failed %d", errCode);
1039         return errCode;
1040     }
1041     std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1042         std::to_string(dataKey);
1043     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1044     if (errCode != E_OK) {
1045         LOGE("[RDBExecutor] remove log failed %d", errCode);
1046     }
1047     return errCode;
1048 }
1049 
GetLocalDataKey(size_t index,const DownloadData & downloadData)1050 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1051     const DownloadData &downloadData)
1052 {
1053     if (index >= downloadData.existDataKey.size()) {
1054         LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1055         return -1; // -1 means not exist
1056     }
1057     return downloadData.existDataKey[index];
1058 }
1059 
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1060 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1061     sqlite3_stmt *&stmt, int &fillGidCount)
1062 {
1063     int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1064     if (errCode != E_OK) {
1065         return errCode;
1066     }
1067     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1068     if (errCode != E_OK) {
1069         return errCode;
1070     }
1071     errCode = SQLiteUtils::StepWithRetry(stmt, false);
1072     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1073         errCode = E_OK;
1074         fillGidCount++;
1075         SQLiteUtils::ResetStatement(stmt, false, errCode);
1076     } else {
1077         LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1078     }
1079     return errCode;
1080 }
1081 
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1082 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1083     const TableInfo &tableInfo, TableSyncType syncType)
1084 {
1085     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1086     return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1087 }
1088 
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1089 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1090     const RelationalSchemaObject &localSchema)
1091 {
1092     std::vector<int64_t> dataKeys;
1093     std::string logTableName = DBCommon::GetLogTableName(tableName);
1094     int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1095     if (errCode != E_OK) {
1096         LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1097         return errCode;
1098     }
1099     std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1100     errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1101     if (errCode != E_OK) {
1102         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1103     }
1104     return errCode;
1105 }
1106 
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1107 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1108     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1109 {
1110     int errCode = E_OK;
1111     for (const auto &fieldInfo : fieldInfos) {
1112         if (fieldInfo.IsAssetType()) {
1113             Assets assets;
1114             errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1115             if (errCode != E_OK) {
1116                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1117                 return errCode;
1118             }
1119             errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1120             if (errCode != E_OK) {
1121                 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1122                 return errCode;
1123             }
1124         } else if (fieldInfo.IsAssetsType()) {
1125             errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1126             if (errCode != E_OK) {
1127                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1128                 return errCode;
1129             }
1130         }
1131     }
1132     return errCode;
1133 }
1134 
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1135 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1136     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1137 {
1138     if (assets.empty()) { // LCOV_EXCL_BR_LINE
1139         return E_OK;
1140     }
1141     int errCode = E_OK;
1142     int ret = E_OK;
1143     sqlite3_stmt *stmt = nullptr;
1144     size_t index = 0;
1145     for (const auto &rowId : dataKeys) {
1146         if (rowId == -1) { // -1 means data is deleted
1147             continue;
1148         }
1149         if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1150             index++;
1151             continue;
1152         }
1153         std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1154             std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1155         errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1156         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1157             LOGE("Get statement failed, %d", errCode);
1158             return errCode;
1159         }
1160         assets[index].assetId = "";
1161         assets[index].status &= ~AssetStatus::UPLOADING;
1162         errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1163         index++;
1164         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1165             LOGE("Bind asset to blob statement failed, %d", errCode);
1166             goto END;
1167         }
1168         errCode = SQLiteUtils::StepWithRetry(stmt);
1169         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1170             errCode = E_OK;
1171         } else {
1172             LOGE("Step statement failed, %d", errCode);
1173             goto END;
1174         }
1175         SQLiteUtils::ResetStatement(stmt, true, ret);
1176     }
1177     return errCode != E_OK ? errCode : ret;
1178 END:
1179     SQLiteUtils::ResetStatement(stmt, true, ret);
1180     return errCode != E_OK ? errCode : ret;
1181 }
1182 
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1183 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1184     const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1185 {
1186     int errCode = E_OK;
1187     int ret = E_OK;
1188     sqlite3_stmt *selectStmt = nullptr;
1189     for (const auto &rowId : dataKeys) {
1190         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1191             "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1192         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1193         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1194             LOGE("Get select assets statement failed, %d.", errCode);
1195             goto END;
1196         }
1197         Assets assets;
1198         errCode = GetAssetsByRowId(selectStmt, assets);
1199         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1200             LOGE("Get assets by rowId failed, %d.", errCode);
1201             goto END;
1202         }
1203         SQLiteUtils::ResetStatement(selectStmt, true, ret);
1204         if (assets.empty()) { // LCOV_EXCL_BR_LINE
1205             continue;
1206         }
1207         for (auto &asset : assets) {
1208             asset.assetId = "";
1209             asset.status &= ~AssetStatus::UPLOADING;
1210         }
1211         std::vector<uint8_t> assetsValue;
1212         errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1213         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1214             LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1215             return errCode;
1216         }
1217         errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1218         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1219             LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1220             return errCode;
1221         }
1222     }
1223     return errCode != E_OK ? errCode : ret;
1224 END:
1225     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1226     return errCode != E_OK ? errCode : ret;
1227 }
1228 
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1229 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1230     const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1231 {
1232     std::string cleanAssetIdSql = "UPDATE " + tableName  + " SET " + fieldName + " = ? WHERE " +
1233         std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1234     sqlite3_stmt *stmt = nullptr;
1235     int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1236     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1237         LOGE("Get statement failed, %d", errCode);
1238         SQLiteUtils::ResetStatement(stmt, true, errCode);
1239         return errCode;
1240     }
1241     errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1242     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1243         SQLiteUtils::ResetStatement(stmt, true, errCode);
1244         return errCode;
1245     }
1246     errCode = SQLiteUtils::StepWithRetry(stmt);
1247     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1248         errCode = E_OK;
1249     }
1250     SQLiteUtils::ResetStatement(stmt, true, errCode);
1251     return errCode;
1252 }
1253 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1254 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1255     const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1256 {
1257     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1258     auto &[errCode, status] = res;
1259     std::vector<Field> assetFields;
1260     std::string sql = "SELECT";
1261     for (const auto &field: tableSchema.fields) {
1262         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1263             assetFields.emplace_back(field);
1264             sql += " b." + field.colName + ",";
1265         }
1266     }
1267     if (assetFields.empty()) {
1268         return { -E_NOT_FOUND, status };
1269     }
1270     sql += "a.cloud_gid, a.status ";
1271     sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1272         (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR  a.hash_key = ?);");
1273     sqlite3_stmt *stmt = nullptr;
1274     errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1275     if (errCode != E_OK) {
1276         return res;
1277     }
1278     errCode = SQLiteUtils::StepWithRetry(stmt);
1279     int index = 0;
1280     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1281         for (const auto &field: assetFields) {
1282             Type cloudValue;
1283             errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1284             if (errCode != E_OK) {
1285                 break;
1286             }
1287             errCode = PutVBucketByType(assets, field, cloudValue);
1288             if (errCode != E_OK) {
1289                 break;
1290             }
1291         }
1292         std::string curGid;
1293         errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1294         if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1295             // Gid is different, there may be duplicate primary keys in the cloud
1296             errCode = -E_CLOUD_GID_MISMATCH;
1297         }
1298         status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1299     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1300         errCode = -E_NOT_FOUND;
1301     } else {
1302         LOGE("step get asset stmt failed %d.", errCode);
1303     }
1304     SQLiteUtils::ResetStatement(stmt, true, errCode);
1305     return res;
1306 }
1307 
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1308 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1309     const Bytes &hashKey, sqlite3_stmt *&stmt)
1310 {
1311     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1312     if (errCode != E_OK) {
1313         LOGE("Get asset statement failed, %d.", errCode);
1314         return errCode;
1315     }
1316     int index = 1;
1317     if (!gid.empty()) {
1318         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1319         if (errCode != E_OK) {
1320             LOGE("bind gid failed %d.", errCode);
1321             SQLiteUtils::ResetStatement(stmt, true, errCode);
1322             return errCode;
1323         }
1324     }
1325     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1326     if (errCode != E_OK) {
1327         LOGE("bind hash failed %d.", errCode);
1328         SQLiteUtils::ResetStatement(stmt, true, errCode);
1329     }
1330     return errCode;
1331 }
1332 
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1333 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1334     bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1335 {
1336     int errCode = E_OK;
1337     switch (opType) {
1338         case OpType::UPDATE_VERSION: // fallthrough
1339         case OpType::INSERT_VERSION: {
1340             errCode = FillCloudVersionForUpload(opType, data);
1341             break;
1342         }
1343         case OpType::SET_UPLOADING: {
1344             errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1345             if (errCode != E_OK) {
1346                 LOGE("Failed to set uploading for ins data, %d.", errCode);
1347                 return errCode;
1348             }
1349             errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1350             break;
1351         }
1352         case OpType::INSERT: {
1353             errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1354             if (errCode != E_OK) {
1355                 LOGE("Failed to fill cloud log gid, %d.", errCode);
1356                 return errCode;
1357             }
1358             if (fillAsset) {
1359                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1360                 if (errCode != E_OK) {
1361                     LOGE("Failed to fill asset for ins, %d.", errCode);
1362                     return errCode;
1363                 }
1364             }
1365             errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1366             break;
1367         }
1368         case OpType::UPDATE: {
1369             if (fillAsset && !data.updData.assets.empty()) {
1370                 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1371                 if (errCode != E_OK) {
1372                     LOGE("Failed to fill asset for upd, %d.", errCode);
1373                     return errCode;
1374                 }
1375             }
1376             errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1377             break;
1378         }
1379         default:
1380             break;
1381     }
1382     return errCode;
1383 }
1384 
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1385 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1386 {
1387     int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1388     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1389         std::vector<uint8_t> blobValue;
1390         errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1391         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1392             LOGE("Get column blob value failed %d.", errCode);
1393             return errCode;
1394         }
1395         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1396         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1397             LOGE("Transfer blob to assets failed %d", errCode);
1398         }
1399         return errCode;
1400     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1401         return E_OK;
1402     } else {
1403         LOGE("Step select statement failed %d.", errCode);
1404         return errCode;
1405     }
1406 }
1407 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1408 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1409 {
1410     assetLoader_ = loader;
1411 }
1412 
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1413 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1414     int beginIndex, const std::string &cloudGid)
1415 {
1416     int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1417     if (errCode != E_OK) {
1418         LOGE("Bind cloud gid to statement failed %d.", errCode);
1419         int ret = E_OK;
1420         SQLiteUtils::ResetStatement(stmt, true, ret);
1421         return errCode;
1422     }
1423     errCode = SQLiteUtils::StepWithRetry(stmt);
1424     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1425         errCode = E_OK;
1426     } else {
1427         LOGE("Fill cloud asset failed: %d.", errCode);
1428     }
1429     int ret = E_OK;
1430     SQLiteUtils::ResetStatement(stmt, true, ret);
1431     return errCode != E_OK ? errCode : ret;
1432 }
1433 
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1434 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1435     const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1436 {
1437     if (assetLoader_ == nullptr) {
1438         LOGE("assetLoader may be not set.");
1439         return -E_NOT_SET;
1440     }
1441     std::vector<Asset> toDeleteAssets;
1442     CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1443     if (toDeleteAssets.empty()) {
1444         return E_OK;
1445     }
1446     DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1447     if (ret != OK) {
1448         LOGE("remove local assets failed %d.", ret);
1449         return -E_REMOVE_ASSETS_FAILED;
1450     }
1451     return E_OK;
1452 }
1453 
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1454 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1455     const VBucket &assets, sqlite3_stmt *&statement)
1456 {
1457     std::string sql = "UPDATE '" + tableName + "' SET ";
1458     for (const auto &item: assets) {
1459         sql += item.first + " = ?,";
1460     }
1461     sql.pop_back();
1462     sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1463     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1464     if (errCode != E_OK) {
1465         return errCode;
1466     }
1467     int bindIndex = 1;
1468     for (const auto &item: assets) {
1469         Field field = {
1470             .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1471         };
1472         errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1473         if (errCode != E_OK) {
1474             return errCode;
1475         }
1476     }
1477     return errCode;
1478 }
1479 
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1480 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1481     const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1482 {
1483     if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1484         opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1485         return E_OK;
1486     }
1487     if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1488         // this is shared table, not need to update asset id.
1489         return E_OK;
1490     }
1491     if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket)) {
1492         return E_OK;
1493     }
1494     int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1495     if (errCode != E_OK) {
1496         LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1497     }
1498     return errCode;
1499 }
1500 
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1501 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1502     Asset &asset)
1503 {
1504     for (const auto &[col, value] : vBucket) {
1505         if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1506             asset = std::get<Asset>(value);
1507         }
1508     }
1509 }
1510 
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1511 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1512     Assets &assets)
1513 {
1514     for (const auto &[col, value] : vBucket) {
1515         if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1516             assets = std::get<Assets>(value);
1517         }
1518     }
1519 }
1520 
UpdateLocalAssetsIdInner(const Assets & cloudAssets,Assets & assets)1521 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsIdInner(const Assets &cloudAssets, Assets &assets)
1522 {
1523     for (const auto &cloudAsset : cloudAssets) {
1524         for (auto &asset : assets) {
1525             if (asset.name == cloudAsset.name) {
1526                 asset.assetId = cloudAsset.assetId;
1527             }
1528         }
1529     }
1530 }
1531 
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1532 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1533     sqlite3_stmt *&stmt)
1534 {
1535     std::vector<uint8_t> blobValue;
1536     int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1537     if (errCode != E_OK) {
1538         LOGE("Transfer asset to blob failed, %d.", errCode);
1539         return errCode;
1540     }
1541     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1542     if (errCode != E_OK) {
1543         LOGE("Bind asset blob to statement failed, %d.", errCode);
1544     }
1545     return errCode;
1546 }
1547 
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1548 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1549     sqlite3_stmt *&stmt)
1550 {
1551     std::vector<uint8_t> blobValue;
1552     int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1553     if (errCode != E_OK) {
1554         LOGE("Transfer asset to blob failed, %d.", errCode);
1555         return errCode;
1556     }
1557     errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1558     if (errCode != E_OK) {
1559         LOGE("Bind asset blob to statement failed, %d.", errCode);
1560     }
1561     return errCode;
1562 }
1563 
GetAssetOnTableInner(sqlite3_stmt * & stmt,Asset & asset)1564 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTableInner(sqlite3_stmt *&stmt, Asset &asset)
1565 {
1566     int errCode = SQLiteUtils::StepWithRetry(stmt);
1567     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1568         std::vector<uint8_t> blobValue;
1569         errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1570         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1571             LOGE("[RDBExecutor][GetAssetOnTableInner] Get column blob value failed, %d.", errCode);
1572             return errCode;
1573         }
1574         errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
1575         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1576             LOGE("[RDBExecutor] Transfer blob to asset failed, %d.", errCode);
1577         }
1578     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1579         errCode = E_OK;
1580     } else {
1581         LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1582     }
1583     return errCode;
1584 }
1585 
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Asset & asset)1586 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
1587     const std::string &fieldName, const int64_t dataKey, Asset &asset)
1588 {
1589     sqlite3_stmt *selectStmt = nullptr;
1590     std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
1591         "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1592     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
1593     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1594         LOGE("Get select asset statement failed, %d.", errCode);
1595         return errCode;
1596     }
1597     errCode = GetAssetOnTableInner(selectStmt, asset);
1598     int ret = E_OK;
1599     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1600     return errCode != E_OK ? errCode : ret;
1601 }
1602 
GetAssetsOnTableInner(sqlite3_stmt * & stmt,Assets & assets)1603 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTableInner(sqlite3_stmt *&stmt, Assets &assets)
1604 {
1605     int errCode = SQLiteUtils::StepWithRetry(stmt);
1606     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1607         std::vector<uint8_t> blobValue;
1608         errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1609         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1610             LOGE("[RDBExecutor][GetAssetsOnTableInner] Get column blob value failed, %d.", errCode);
1611             return errCode;
1612         }
1613         errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1614         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1615             LOGE("[RDBExecutor] Transfer blob to assets failed, %d.", errCode);
1616         }
1617     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1618         errCode = E_OK;
1619     } else {
1620         LOGE("[RDBExecutor] Step failed when get assets from table, errCode = %d.", errCode);
1621     }
1622     return errCode;
1623 }
1624 
GetAssetsOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Assets & assets)1625 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTable(const std::string &tableName,
1626     const std::string &fieldName, const int64_t dataKey, Assets &assets)
1627 {
1628     sqlite3_stmt *selectStmt = nullptr;
1629     std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1630         "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1631     int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1632     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1633         LOGE("Get select assets statement failed, %d.", errCode);
1634         return errCode;
1635     }
1636     errCode = GetAssetsOnTableInner(selectStmt, assets);
1637     int ret = E_OK;
1638     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1639     return errCode != E_OK ? errCode : ret;
1640 }
1641 
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1642 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1643     const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1644 {
1645     int assetIndex = 0;
1646     int assetsIndex = 0;
1647     for (const auto &field : tableSchema.fields) {
1648         if (field.type == TYPE_INDEX<Asset>) {
1649             if (assetOfOneRecord[assetIndex].name.empty()) {
1650                 continue;
1651             }
1652             int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1653             if (errCode != E_OK) {
1654                 LOGE("Bind asset to blob statement failed, %d.", errCode);
1655                 return errCode;
1656             }
1657             assetIndex++;
1658         } else if (field.type == TYPE_INDEX<Assets>) {
1659             if (assetsOfOneRecord[assetsIndex].empty()) {
1660                 continue;
1661             }
1662             int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1663             if (errCode != E_OK) {
1664                 LOGE("Bind assets to blob statement failed, %d.", errCode);
1665                 return errCode;
1666             }
1667             assetsIndex++;
1668         }
1669     }
1670     return E_OK;
1671 }
1672 
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1673 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1674     const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1675 {
1676     int errCode = E_OK;
1677     int ret = E_OK;
1678     sqlite3_stmt *stmt = nullptr;
1679     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1680     if (errCode != E_OK) {
1681         LOGE("Get update asset statement failed, %d.", errCode);
1682         return errCode;
1683     }
1684     errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1685     if (errCode != E_OK) {
1686         LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1687         SQLiteUtils::ResetStatement(stmt, true, ret);
1688         return errCode != E_OK ? errCode : ret;
1689     }
1690     errCode = SQLiteUtils::StepWithRetry(stmt);
1691     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1692         errCode = E_OK;
1693     } else {
1694         LOGE("Step statement failed, %d", errCode);
1695     }
1696     SQLiteUtils::ResetStatement(stmt, true, ret);
1697     return errCode != E_OK ? errCode : ret;
1698 }
1699 
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1700 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1701     const VBucket &vBucket)
1702 {
1703     int errCode = E_OK;
1704     std::vector<Asset> assetOfOneRecord;
1705     std::vector<Assets> assetsOfOneRecord;
1706     std::string updateAssetIdSql = "UPDATE " + tableSchema.name  + " SET";
1707     for (const auto &field : tableSchema.fields) {
1708         if (field.type == TYPE_INDEX<Asset>) {
1709             Asset asset;
1710             UpdateLocalAssetId(vBucket, field.colName, asset);
1711             assetOfOneRecord.push_back(asset);
1712             if (!asset.name.empty()) {
1713                 updateAssetIdSql += " " + field.colName + " = ?,";
1714             }
1715         }
1716         if (field.type == TYPE_INDEX<Assets>) {
1717             Assets assets;
1718             UpdateLocalAssetsId(vBucket, field.colName, assets);
1719             assetsOfOneRecord.push_back(assets);
1720             if (!assets.empty()) {
1721                 updateAssetIdSql += " " + field.colName + " = ?,";
1722             }
1723         }
1724     }
1725     if (updateAssetIdSql == "UPDATE " + tableSchema.name  + " SET") {
1726         return E_OK;
1727     }
1728     updateAssetIdSql.pop_back();
1729     updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1730     errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1731     if (errCode != E_OK) {
1732         LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1733     }
1734     return errCode;
1735 }
1736 
SetPutDataMode(PutDataMode mode)1737 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1738 {
1739     putDataMode_ = mode;
1740 }
1741 
SetMarkFlagOption(MarkFlagOption option)1742 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1743 {
1744     markFlagOption_ = option;
1745 }
1746 
GetDataFlag()1747 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1748 {
1749     if (putDataMode_ != PutDataMode::USER) {
1750         return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1751             static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1752     }
1753     uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1754     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1755         flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1756     }
1757     flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1758     return static_cast<int64_t>(flag);
1759 }
1760 
GetUpdateDataFlagSql()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql()
1762 {
1763     if (putDataMode_ == PutDataMode::SYNC) {
1764         return UPDATE_FLAG_CLOUD;
1765     }
1766     if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1767         return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1768     }
1769     return UPDATE_FLAG_CLOUD;
1770 }
1771 
GetDev()1772 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1773 {
1774     return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1775 }
1776 
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1777 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1778     const TableSchema &tableSchema)
1779 {
1780     std::set<std::string> useFields;
1781     std::vector<Field> fields;
1782     if (putDataMode_ == PutDataMode::SYNC) {
1783         for (const auto &field : tableSchema.fields) {
1784             useFields.insert(field.colName);
1785         }
1786         fields = tableSchema.fields;
1787     } else {
1788         for (const auto &field : vBucket) {
1789             if (field.first.empty() || field.first[0] == '#') {
1790                 continue;
1791             }
1792             useFields.insert(field.first);
1793         }
1794         for (const auto &field : tableSchema.fields) {
1795             if (useFields.find(field.colName) == useFields.end()) {
1796                 continue;
1797             }
1798             fields.push_back(field);
1799         }
1800     }
1801     return fields;
1802 }
1803 
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1804 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1805     const LogInfo &logInfo)
1806 {
1807     bool useHashKey = false;
1808     if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1809         if (logInfo.hashKey.empty()) {
1810             LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1811             return -E_INVALID_ARGS;
1812         }
1813         useHashKey = true;
1814     }
1815     sqlite3_stmt *stmt = nullptr;
1816     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1817     if (errCode != E_OK) {
1818         LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1819         return errCode;
1820     }
1821     int ret = E_OK;
1822     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1823     if (errCode != E_OK) {
1824         LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1825         SQLiteUtils::ResetStatement(stmt, true, ret);
1826         return errCode;
1827     }
1828     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1829     if (errCode != E_OK) {
1830         LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1831         SQLiteUtils::ResetStatement(stmt, true, ret);
1832         return errCode;
1833     }
1834     if (useHashKey) {
1835         errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1836         if (errCode != E_OK) {
1837             LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1838             SQLiteUtils::ResetStatement(stmt, true, ret);
1839             return errCode;
1840         }
1841     }
1842     errCode = SQLiteUtils::StepWithRetry(stmt);
1843     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1844         errCode = E_OK;
1845     } else {
1846         LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1847     }
1848     SQLiteUtils::ResetStatement(stmt, true, ret);
1849     return errCode == E_OK ? ret : errCode;
1850 }
1851 
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp)1852 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1853     const Key &hashKey, Timestamp timestamp)
1854 {
1855     sqlite3_stmt *stmt = nullptr;
1856     int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName),
1857         stmt);
1858     int index = 1;
1859     errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1860     if (errCode != E_OK) {
1861         SQLiteUtils::ResetStatement(stmt, true, errCode);
1862         LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1863         return;
1864     }
1865     errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1866     if (errCode != E_OK) {
1867         SQLiteUtils::ResetStatement(stmt, true, errCode);
1868         LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1869         return;
1870     }
1871     errCode = SQLiteUtils::StepWithRetry(stmt);
1872     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1873         errCode = E_OK;
1874     } else {
1875         LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1876     }
1877     SQLiteUtils::ResetStatement(stmt, true, errCode);
1878 }
1879 
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data)1880 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1881     std::vector<VBucket> &data)
1882 {
1883     std::string sql = "SELECT ";
1884     std::vector<Field> pkFields;
1885     for (const auto &field : table.fields) {
1886         if (!field.primary) {
1887             continue;
1888         }
1889         sql += "b." + field.colName + ",";
1890         pkFields.push_back(field);
1891     }
1892     if (pkFields.empty()) {
1893         // ignore no pk table
1894         return E_OK;
1895     }
1896     sql.pop_back();
1897     sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1898     sqlite3_stmt *stmt = nullptr;
1899     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1900     if (errCode != E_OK) {
1901         LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1902         return errCode;
1903     }
1904     do {
1905         errCode = SQLiteUtils::StepWithRetry(stmt);
1906         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1907             VBucket pkData;
1908             errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1909             if (errCode != E_OK) {
1910                 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1911                 break;
1912             }
1913             data.push_back(pkData);
1914         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1915             errCode = E_OK;
1916             break;
1917         } else {
1918             LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1919             break;
1920         }
1921     } while (errCode == E_OK);
1922     int ret = E_OK;
1923     SQLiteUtils::ResetStatement(stmt, true, ret);
1924     return errCode == E_OK ? ret : errCode;
1925 }
1926 
ClearUnLockingStatus(const std::string & tableName)1927 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1928 {
1929     std::string sql;
1930     sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1931         "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1932     sqlite3_stmt *stmt = nullptr;
1933     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1934     if (errCode != E_OK) {
1935         LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1936         return errCode;
1937     }
1938     int ret = E_OK;
1939     errCode = SQLiteUtils::StepWithRetry(stmt);
1940     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1941         errCode = E_OK;
1942     } else {
1943         LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1944     }
1945     SQLiteUtils::ResetStatement(stmt, true, ret);
1946     return errCode == E_OK ? ret : errCode;
1947 }
1948 
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1949 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1950     int startIndex, VBucket &record)
1951 {
1952     int errCode = E_OK;
1953     for (const auto &field : fields) {
1954         Type cloudValue;
1955         errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1956         if (errCode != E_OK) {
1957             break;
1958         }
1959         errCode = PutVBucketByType(record, field, cloudValue);
1960         if (errCode != E_OK) {
1961             break;
1962         }
1963         startIndex++;
1964     }
1965     return errCode;
1966 }
1967 
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)1968 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1969     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
1970 {
1971     int errCode = E_OK;
1972     std::string version;
1973     if (putDataMode_ == PutDataMode::SYNC) {
1974         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1975         if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1976             LOGE("get version for insert log statement failed, %d", errCode);
1977             return -E_CLOUD_ERROR;
1978         }
1979     }
1980     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 10, version); // 10 is version
1981     if (errCode != E_OK) {
1982         LOGE("Bind version to insert log statement failed, %d", errCode);
1983         return errCode;
1984     }
1985 
1986     std::string shareUri;
1987     if (putDataMode_ == PutDataMode::SYNC) {
1988         errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1989             vBucket, shareUri);
1990         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1991             LOGE("get shareUri for insert log statement failed, %d", errCode);
1992             return -E_CLOUD_ERROR;
1993         }
1994     }
1995 
1996     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 11, shareUri); // 11 is sharing_resource
1997     if (errCode != E_OK) {
1998         LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1999     }
2000     return errCode;
2001 }
2002 } // namespace DistributedDB
2003 #endif