1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include "cloud/asset_operation_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "cloud/cloud_storage_utils.h"
21 #include "db_common.h"
22 #include "log_table_manager_factory.h"
23 #include "res_finalizer.h"
24 #include "runtime_context.h"
25 #include "simple_tracker_log_table_manager.h"
26 #include "sqlite_relational_utils.h"
27
28 namespace DistributedDB {
29 static constexpr const int ROW_ID_INDEX = 1;
30 static constexpr const char *HASH_KEY = "HASH_KEY";
31 static constexpr const char *FLAG_NOT_LOGIC_DELETE = "FLAG & 0x08 = 0"; // see if 3th bit of a flag is not logic delete
32
33 using PairStringVector = std::pair<std::vector<std::string>, std::vector<std::string>>;
34
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)35 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
36 std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
37 {
38 if (assetFields.empty() && pkSet.empty()) {
39 return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
40 }
41 std::string gid;
42 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
43 if (putDataMode_ == PutDataMode::SYNC && errCode != E_OK) {
44 LOGE("Get cloud gid fail when query log table.");
45 return errCode;
46 }
47
48 if (pkSet.empty() && gid.empty()) {
49 LOGE("query log table failed because of both primary key and gid are empty.");
50 return -E_CLOUD_ERROR;
51 }
52 std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
53 " a.cloud_gid, a.sharing_resource, a.status, a.version";
54 for (const auto &field : assetFields) {
55 sql += ", b." + field.colName;
56 }
57 for (const auto &pk : pkSet) {
58 sql += ", b." + pk;
59 }
60 sql += CloudStorageUtils::GetLeftJoinLogSql(tableName) + " WHERE ";
61 if (!gid.empty()) {
62 sql += " a.cloud_gid = ? or ";
63 }
64 sql += "a.hash_key = ?";
65 querySql = sql;
66 return E_OK;
67 }
68
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)69 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
70 const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
71 {
72 std::string sql = "UPDATE " + tableName + " SET ";
73 for (const auto &field: fields) {
74 sql += field.colName + " = ?,";
75 }
76 sql.pop_back();
77 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = (";
78 sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
79 sqlite3_stmt *stmt = nullptr;
80 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
81 if (errCode != E_OK) {
82 LOGE("Get fill asset statement failed, %d.", errCode);
83 return errCode;
84 }
85 for (size_t i = 0; i < fields.size(); ++i) {
86 errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
87 if (errCode != E_OK) {
88 SQLiteUtils::ResetStatement(stmt, true, errCode);
89 return errCode;
90 }
91 }
92 statement = stmt;
93 return errCode;
94 }
95
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)96 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
97 VBucket &vBucket, bool isDownloadSuccess)
98 {
99 std::string cloudGid;
100 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
101 if (errCode != E_OK) {
102 LOGE("Miss gid when fill Asset.");
103 return errCode;
104 }
105 std::vector<Field> assetsField;
106 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
107 if (errCode != E_OK) {
108 LOGE("No assets need to be filled.");
109 return errCode;
110 }
111 CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
112
113 Bytes hashKey;
114 (void)CloudStorageUtils::GetValueFromVBucket<Bytes>(HASH_KEY, vBucket, hashKey);
115 VBucket dbAssets;
116 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, hashKey, dbAssets);
117 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
118 LOGE("get assets by gid or hashkey failed %d.", errCode);
119 return errCode;
120 }
121 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
122 AssetOperationUtils::CloudSyncAction::END_DOWNLOAD);
123
124 if (isDownloadSuccess) {
125 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
126 CloudStorageUtils::FillAssetAfterDownload, CloudStorageUtils::FillAssetsAfterDownload);
127 errCode = IncreaseCursorOnAssetData(tableSchema.name, cloudGid);
128 if (errCode != E_OK) {
129 return errCode;
130 }
131 } else {
132 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
133 CloudStorageUtils::FillAssetAfterDownloadFail, CloudStorageUtils::FillAssetsAfterDownloadFail);
134 }
135
136 sqlite3_stmt *stmt = nullptr;
137 errCode = GetFillDownloadAssetStatement(tableSchema.name, dbAssets, assetsField, stmt);
138 if (errCode != E_OK) {
139 return errCode;
140 }
141 errCode = ExecuteFillDownloadAssetStatement(stmt, assetsField.size() + 1, cloudGid);
142 int ret = CleanDownloadChangedAssets(vBucket, assetOpType);
143 return errCode == E_OK ? ret : errCode;
144 }
145
IncreaseCursorOnAssetData(const std::string & tableName,const std::string & gid)146 int SQLiteSingleVerRelationalStorageExecutor::IncreaseCursorOnAssetData(const std::string &tableName,
147 const std::string &gid)
148 {
149 int cursor = GetCursor(tableName);
150 cursor++;
151 std::string sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
152 sql += " SET cursor = ? where cloud_gid = ?;";
153 sqlite3_stmt *statement = nullptr;
154 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
155 if (errCode != E_OK) {
156 LOGE("get update asset data cursor stmt failed %d.", errCode);
157 return errCode;
158 }
159 ResFinalizer finalizer([statement]() {
160 sqlite3_stmt *statementInner = statement;
161 int ret = E_OK;
162 SQLiteUtils::ResetStatement(statementInner, true, ret);
163 if (ret != E_OK) {
164 LOGW("Reset stmt failed %d when increase cursor on asset data", ret);
165 }
166 });
167 int index = 1;
168 errCode = SQLiteUtils::BindInt64ToStatement(statement, index++, cursor);
169 if (errCode != E_OK) {
170 LOGE("bind cursor data stmt failed %d.", errCode);
171 return errCode;
172 }
173 errCode = SQLiteUtils::BindTextToStatement(statement, index, gid);
174 if (errCode != E_OK) {
175 LOGE("bind cursor gid data stmt failed %d.", errCode);
176 return errCode;
177 }
178 errCode = SQLiteUtils::StepWithRetry(statement, false);
179 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
180 LOGE("Fill upload asset failed:%d.", errCode);
181 return errCode;
182 }
183 LOGI("Upgrade cursor to %d after asset download success.", cursor);
184 errCode = SetCursor(tableName, cursor);
185 if (errCode != E_OK) {
186 LOGE("Upgrade cursor failed after asset download success %d.", errCode);
187 }
188 return errCode;
189 }
190
FillCloudAssetForUpload(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data)191 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema,
192 const CloudSyncBatch &data)
193 {
194 int errCode = E_OK;
195 if (CloudStorageUtils::ChkFillCloudAssetParam(data, errCode)) {
196 return errCode;
197 }
198 errCode = SetLogTriggerStatus(false);
199 if (errCode != E_OK) {
200 LOGE("Fail to set log trigger off, %d.", errCode);
201 return errCode;
202 }
203 sqlite3_stmt *stmt = nullptr;
204 for (size_t i = 0; i < data.assets.size(); ++i) {
205 if (data.assets.at(i).empty()) {
206 continue;
207 }
208 if (DBCommon::IsRecordIgnored(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i]) ||
209 DBCommon::IsCloudRecordNotFound(data.extend[i]) || DBCommon::IsCloudRecordAlreadyExisted(data.extend[i])) {
210 continue;
211 }
212 errCode = InitFillUploadAssetStatement(opType, tableSchema, data, i, stmt);
213 if (errCode != E_OK) {
214 if (errCode == -E_NOT_FOUND) {
215 errCode = E_OK;
216 continue;
217 }
218 break;
219 }
220 errCode = SQLiteUtils::StepWithRetry(stmt, false);
221 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
222 LOGE("Fill upload asset failed:%d.", errCode);
223 break;
224 }
225 errCode = E_OK;
226 SQLiteUtils::ResetStatement(stmt, true, errCode);
227 stmt = nullptr;
228 if (errCode != E_OK) {
229 break;
230 }
231 }
232 int ret = E_OK;
233 SQLiteUtils::ResetStatement(stmt, true, ret);
234 int endCode = SetLogTriggerStatus(true);
235 if (endCode != E_OK) {
236 LOGE("Fail to set log trigger off, %d.", endCode);
237 return endCode;
238 }
239 return errCode != E_OK ? errCode : ret;
240 }
241
FillCloudVersionForUpload(const OpType opType,const CloudSyncData & data)242 int SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data)
243 {
244 switch (opType) {
245 case OpType::UPDATE_VERSION:
246 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.updData);
247 case OpType::INSERT_VERSION:
248 return SQLiteSingleVerRelationalStorageExecutor::FillCloudVersionForUpload(data.tableName, data.insData);
249 default:
250 LOGE("Fill version with unknown type %d", static_cast<int>(opType));
251 return -E_INVALID_ARGS;
252 }
253 }
254
BindUpdateVersionStatement(const VBucket & vBucket,const Bytes & hashKey,sqlite3_stmt * & stmt)255 int SQLiteSingleVerRelationalStorageExecutor::BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey,
256 sqlite3_stmt *&stmt)
257 {
258 int errCode = E_OK;
259 std::string version;
260 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
261 vBucket, version) != E_OK) {
262 LOGW("get version from vBucket failed.");
263 }
264 if (hashKey.empty()) {
265 LOGE("hash key is empty when update version.");
266 return -E_CLOUD_ERROR;
267 }
268 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, version);
269 if (errCode != E_OK) {
270 return errCode;
271 }
272 errCode = SQLiteUtils::BindBlobToStatement(stmt, 2, hashKey); // 2 means the second bind args
273 if (errCode != E_OK) {
274 return errCode;
275 }
276 errCode = SQLiteUtils::StepWithRetry(stmt, false);
277 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
278 errCode = E_OK;
279 SQLiteUtils::ResetStatement(stmt, false, errCode);
280 } else {
281 LOGE("step version stmt failed: %d.", errCode);
282 }
283 return errCode;
284 }
285
InitFillUploadAssetStatement(OpType opType,const TableSchema & tableSchema,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)286 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpType opType,
287 const TableSchema &tableSchema, const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
288 {
289 VBucket vBucket = data.assets.at(index);
290 VBucket dbAssets;
291 std::string cloudGid;
292 int errCode;
293 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
294 std::tie(errCode, std::ignore) = GetAssetsByGidOrHashKey(tableSchema, cloudGid, data.hashKey.at(index), dbAssets);
295 if (errCode != E_OK && errCode != -E_CLOUD_GID_MISMATCH) {
296 return errCode;
297 }
298 AssetOperationUtils::CloudSyncAction action = opType == OpType::SET_UPLOADING ?
299 AssetOperationUtils::CloudSyncAction::START_UPLOAD : AssetOperationUtils::CloudSyncAction::END_UPLOAD;
300 AssetOperationUtils::RecordAssetOpType assetOpType = AssetOperationUtils::CalAssetOperation(vBucket, dbAssets,
301 action);
302 if (action == AssetOperationUtils::CloudSyncAction::START_UPLOAD) {
303 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
304 CloudStorageUtils::FillAssetBeforeUpload, CloudStorageUtils::FillAssetsBeforeUpload);
305 } else {
306 if (DBCommon::IsRecordError(data.extend.at(index))) {
307 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
308 CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed);
309 } else {
310 CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets,
311 CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload);
312 }
313 }
314
315 errCode = GetAndBindFillUploadAssetStatement(tableSchema.name, dbAssets, statement);
316 if (errCode != E_OK) {
317 LOGE("get and bind asset failed %d.", errCode);
318 return errCode;
319 }
320 int64_t rowid = data.rowid[index];
321 return SQLiteUtils::BindInt64ToStatement(statement, dbAssets.size() + ROW_ID_INDEX, rowid);
322 }
323
AnalysisTrackerTable(const TrackerTable & trackerTable,TableInfo & tableInfo)324 int SQLiteSingleVerRelationalStorageExecutor::AnalysisTrackerTable(const TrackerTable &trackerTable,
325 TableInfo &tableInfo)
326 {
327 return SQLiteRelationalUtils::AnalysisTrackerTable(dbHandle_, trackerTable, tableInfo);
328 }
329
CreateTrackerTable(const TrackerTable & trackerTable,bool isUpgrade)330 int SQLiteSingleVerRelationalStorageExecutor::CreateTrackerTable(const TrackerTable &trackerTable, bool isUpgrade)
331 {
332 TableInfo table;
333 table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
334 int errCode = AnalysisTrackerTable(trackerTable, table);
335 if (errCode != E_OK) {
336 return errCode;
337 }
338 auto tableManager = std::make_unique<SimpleTrackerLogTableManager>();
339 if (trackerTable.GetTrackerColNames().empty()) {
340 // drop trigger
341 return tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
342 }
343
344 // create log table
345 errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
346 if (errCode != E_OK) {
347 return errCode;
348 }
349 // init cursor
350 errCode = InitCursorToMeta(table.GetTableName());
351 if (errCode != E_OK) {
352 return errCode;
353 }
354 std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, "");
355 if (isUpgrade) {
356 errCode = CleanExtendAndCursorForDeleteData(table.GetTableName());
357 if (errCode != E_OK) {
358 LOGE("clean tracker log info for deleted data failed %d.", errCode);
359 return errCode;
360 }
361 }
362 errCode = GeneLogInfoForExistedData(dbHandle_, trackerTable.GetTableName(), calPrimaryKeyHash, table);
363 if (errCode != E_OK) {
364 LOGE("general tracker log info for existed data failed %d.", errCode);
365 return errCode;
366 }
367 errCode = SetLogTriggerStatus(true);
368 if (errCode != E_OK) {
369 return errCode;
370 }
371 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, "");
372 if (errCode != E_OK) {
373 return errCode;
374 }
375 if (!isUpgrade) {
376 return CheckInventoryData(DBCommon::GetLogTableName(table.GetTableName()));
377 }
378 return E_OK;
379 }
380
GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject & schema)381 int SQLiteSingleVerRelationalStorageExecutor::GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema)
382 {
383 if (!schema.ToSchemaString().empty()) {
384 return E_OK;
385 }
386 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
387 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
388 Value schemaVal;
389 int errCode = GetKvData(schemaKey, schemaVal); // save schema to meta_data
390 if (errCode != E_OK) {
391 return errCode;
392 }
393 if (schemaVal.empty()) {
394 return -E_NOT_FOUND;
395 }
396 std::string schemaStr;
397 DBCommon::VectorToString(schemaVal, schemaStr);
398 errCode = schema.ParseFromTrackerSchemaString(schemaStr);
399 if (errCode != E_OK) {
400 LOGE("Parse from tracker schema string err.");
401 }
402 return errCode;
403 }
404
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)405 int SQLiteSingleVerRelationalStorageExecutor::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
406 {
407 sqlite3_stmt *statement = nullptr;
408 int errCode = SQLiteUtils::GetStatement(dbHandle_, condition.sql, statement);
409 if (errCode != E_OK) {
410 LOGE("Execute sql failed when prepare stmt.");
411 return errCode;
412 }
413 size_t bindCount = static_cast<size_t>(sqlite3_bind_parameter_count(statement));
414 if (bindCount > condition.bindArgs.size() || bindCount < condition.bindArgs.size()) {
415 LOGE("Sql bind args mismatch.");
416 SQLiteUtils::ResetStatement(statement, true, errCode);
417 return -E_INVALID_ARGS;
418 }
419 for (size_t i = 0; i < condition.bindArgs.size(); i++) {
420 Type type = condition.bindArgs[i];
421 errCode = SQLiteRelationalUtils::BindStatementByType(statement, i + 1, type);
422 if (errCode != E_OK) {
423 int ret = E_OK;
424 SQLiteUtils::ResetStatement(statement, true, ret);
425 return errCode;
426 }
427 }
428 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
429 VBucket bucket;
430 errCode = SQLiteRelationalUtils::GetSelectVBucket(statement, bucket);
431 if (errCode != E_OK) {
432 int ret = E_OK;
433 SQLiteUtils::ResetStatement(statement, true, ret);
434 return errCode;
435 }
436 records.push_back(std::move(bucket));
437 }
438 int ret = E_OK;
439 SQLiteUtils::ResetStatement(statement, true, ret);
440 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
441 }
442
GetClearWaterMarkTables(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema,std::set<std::string> & clearWaterMarkTables)443 int SQLiteSingleVerRelationalStorageExecutor::GetClearWaterMarkTables(
444 const std::vector<TableReferenceProperty> &tableReferenceProperty, const RelationalSchemaObject &schema,
445 std::set<std::string> &clearWaterMarkTables)
446 {
447 std::set<std::string> changeTables = schema.CompareReferenceProperty(tableReferenceProperty);
448 for (const auto &table : changeTables) {
449 std::string logTableName = DBCommon::GetLogTableName(table);
450 bool isExists = false;
451 int errCode = SQLiteUtils::CheckTableExists(dbHandle_, logTableName, isExists);
452 if (errCode != E_OK) {
453 LOGE("[GetClearWaterMarkTables] check table exists failed, errCode = %d.", errCode);
454 return errCode;
455 }
456 if (!isExists) { // table maybe dropped after set reference
457 LOGI("[GetClearWaterMarkTables] log table not exists, skip this table.");
458 continue;
459 }
460
461 bool isEmpty = true;
462 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, logTableName, isEmpty);
463 if (errCode != E_OK) {
464 LOGE("[GetClearWaterMarkTables] check table empty failed, errCode = %d.", errCode);
465 clearWaterMarkTables.clear();
466 return errCode;
467 }
468 if (!isEmpty) {
469 clearWaterMarkTables.insert(table);
470 }
471 }
472 LOGI("[GetClearWaterMarkTables] clearWaterMarkTables size = %zu.", clearWaterMarkTables.size());
473 return E_OK;
474 }
475
UpgradedLogForExistedData(TableInfo & tableInfo,bool schemaChanged)476 int SQLiteSingleVerRelationalStorageExecutor::UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged)
477 {
478 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
479 return E_OK;
480 }
481 std::string logTable = DBCommon::GetLogTableName(tableInfo.GetTableName());
482 if (schemaChanged) {
483 std::string markAsInconsistent = "UPDATE " + logTable + " SET flag=" +
484 "(CASE WHEN (cloud_gid='' and data_key=-1 and flag&0x02=0x02) then flag else flag|0x20 END)";
485 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, markAsInconsistent);
486 if (ret != E_OK) {
487 LOGE("Mark upgrade log info as inconsistent failed:%d", ret);
488 return ret;
489 }
490 }
491 if (tableInfo.GetTrackerTable().IsEmpty()) {
492 return E_OK;
493 }
494 LOGI("Upgrade tracker table log, schemaChanged:%d.", schemaChanged);
495 int errCode = SetLogTriggerStatus(false);
496 if (errCode != E_OK) {
497 return errCode;
498 }
499 std::string sql = "UPDATE " + tableInfo.GetTableName() + " SET _rowid_=_rowid_";
500 TrackerTable trackerTable = tableInfo.GetTrackerTable();
501 errCode = trackerTable.ReBuildTempTrigger(dbHandle_, TriggerMode::TriggerModeEnum::UPDATE,
502 [this, &sql]() {
503 int ret = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
504 if (ret != E_OK) {
505 LOGE("Upgrade log for extend field failed.");
506 }
507 return ret;
508 });
509 return SetLogTriggerStatus(true);
510 }
511
CreateTempSyncTrigger(const TrackerTable & trackerTable,bool flag)512 int SQLiteSingleVerRelationalStorageExecutor::CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag)
513 {
514 int errCode = E_OK;
515 std::vector<std::string> dropSql = trackerTable.GetDropTempTriggerSql();
516 for (const auto &sql: dropSql) {
517 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
518 if (errCode != E_OK) {
519 LOGE("[RDBExecutor] execute drop sql failed %d.", errCode);
520 return errCode;
521 }
522 }
523 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempInsertTriggerSql(flag));
524 if (errCode != E_OK) {
525 LOGE("[RDBExecutor] create temp insert trigger failed %d.", errCode);
526 return errCode;
527 }
528 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempUpdateTriggerSql(flag));
529 if (errCode != E_OK) {
530 LOGE("[RDBExecutor] create temp update trigger failed %d.", errCode);
531 return errCode;
532 }
533 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, trackerTable.GetTempDeleteTriggerSql(flag));
534 if (errCode != E_OK) {
535 LOGE("[RDBExecutor] create temp delete trigger failed %d.", errCode);
536 }
537 return errCode;
538 }
539
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)540 int SQLiteSingleVerRelationalStorageExecutor::GetAndResetServerObserverData(const std::string &tableName,
541 ChangeProperties &changeProperties)
542 {
543 std::string fileName;
544 if (!SQLiteRelationalUtils::GetDbFileName(dbHandle_, fileName)) {
545 LOGE("get db file name failed.");
546 return -E_INVALID_DB;
547 }
548 SQLiteUtils::GetAndResetServerObserverData(fileName, tableName, changeProperties);
549 return E_OK;
550 }
551
ClearAllTempSyncTrigger()552 int SQLiteSingleVerRelationalStorageExecutor::ClearAllTempSyncTrigger()
553 {
554 sqlite3_stmt *stmt = nullptr;
555 static const std::string sql = "SELECT name FROM sqlite_temp_master WHERE type = 'trigger';";
556 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
557 if (errCode != E_OK) {
558 LOGE("get clear all temp trigger stmt failed %d.", errCode);
559 return errCode;
560 }
561 int ret = E_OK;
562 while ((errCode = SQLiteUtils::StepNext(stmt, isMemDb_)) == E_OK) {
563 std::string str;
564 (void)SQLiteUtils::GetColumnTextValue(stmt, 0, str);
565 if (errCode != E_OK) {
566 SQLiteUtils::ResetStatement(stmt, true, ret);
567 return errCode;
568 }
569 std::string dropSql = "DROP TRIGGER IF EXISTS '" + str + "';";
570 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, dropSql);
571 if (errCode != E_OK) {
572 LOGE("drop temp trigger failed %d.", errCode);
573 SQLiteUtils::ResetStatement(stmt, true, ret);
574 return errCode;
575 }
576 }
577 SQLiteUtils::ResetStatement(stmt, true, ret);
578 return errCode == -E_FINISHED ? (ret == E_OK ? E_OK : ret) : errCode;
579 }
580
CleanTrackerData(const std::string & tableName,int64_t cursor,bool isOnlyTrackTable)581 int SQLiteSingleVerRelationalStorageExecutor::CleanTrackerData(const std::string &tableName, int64_t cursor,
582 bool isOnlyTrackTable)
583 {
584 std::string sql;
585 if (isOnlyTrackTable) {
586 sql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log";
587 } else {
588 sql = "UPDATE " + DBConstant::RELATIONAL_PREFIX + tableName + "_log SET extend_field = NULL";
589 }
590 sql += " where data_key = -1 and cursor <= ?;";
591 sqlite3_stmt *statement = nullptr;
592 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
593 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
594 LOGE("get clean tracker data stmt failed %d.", errCode);
595 return errCode;
596 }
597 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, cursor);
598 int ret = E_OK;
599 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
600 LOGE("bind clean tracker data stmt failed %d.", errCode);
601 SQLiteUtils::ResetStatement(statement, true, ret);
602 return errCode;
603 }
604 errCode = SQLiteUtils::StepWithRetry(statement);
605 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
606 errCode = E_OK;
607 } else {
608 LOGE("clean tracker step failed: %d.", errCode);
609 }
610 SQLiteUtils::ResetStatement(statement, true, ret);
611 return errCode == E_OK ? ret : errCode;
612 }
613
CreateSharedTable(const TableSchema & tableSchema)614 int SQLiteSingleVerRelationalStorageExecutor::CreateSharedTable(const TableSchema &tableSchema)
615 {
616 std::map<int32_t, std::string> cloudFieldTypeMap;
617 cloudFieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
618 cloudFieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
619 cloudFieldTypeMap[TYPE_INDEX<double>] = "REAL";
620 cloudFieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
621 cloudFieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
622 cloudFieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
623 cloudFieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
624 cloudFieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
625
626 std::string createTableSql = "CREATE TABLE IF NOT EXISTS " + tableSchema.sharedTableName + "(";
627 std::string primaryKey = ", PRIMARY KEY (";
628 createTableSql += CloudDbConstant::CLOUD_OWNER;
629 createTableSql += " TEXT, ";
630 createTableSql += CloudDbConstant::CLOUD_PRIVILEGE;
631 createTableSql += " TEXT";
632 primaryKey += CloudDbConstant::CLOUD_OWNER;
633 bool hasPrimaryKey = false;
634 for (const auto &field : tableSchema.fields) {
635 createTableSql += ", " + field.colName + " ";
636 createTableSql += cloudFieldTypeMap[field.type];
637 createTableSql += field.nullable ? "" : " NOT NULL";
638 if (field.primary) {
639 primaryKey += ", " + field.colName;
640 hasPrimaryKey = true;
641 }
642 }
643 if (hasPrimaryKey) {
644 createTableSql += primaryKey + ")";
645 }
646 createTableSql += ");";
647 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, createTableSql);
648 if (errCode != E_OK) {
649 LOGE("Create shared table failed, %d.", errCode);
650 }
651 return errCode;
652 }
653
DeleteTable(const std::vector<std::string> & tableNames)654 int SQLiteSingleVerRelationalStorageExecutor::DeleteTable(const std::vector<std::string> &tableNames)
655 {
656 for (const auto &tableName : tableNames) {
657 std::string deleteTableSql = "DROP TABLE IF EXISTS " + tableName + ";";
658 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteTableSql);
659 if (errCode != E_OK) {
660 LOGE("Delete table failed, %d.", errCode);
661 return errCode;
662 }
663 }
664 return E_OK;
665 }
666
UpdateSharedTable(const std::map<std::string,std::vector<Field>> & updateTableNames)667 int SQLiteSingleVerRelationalStorageExecutor::UpdateSharedTable(
668 const std::map<std::string, std::vector<Field>> &updateTableNames)
669 {
670 int errCode = E_OK;
671 std::map<int32_t, std::string> fieldTypeMap;
672 fieldTypeMap[TYPE_INDEX<Nil>] = "NULL";
673 fieldTypeMap[TYPE_INDEX<int64_t>] = "INT";
674 fieldTypeMap[TYPE_INDEX<double>] = "REAL";
675 fieldTypeMap[TYPE_INDEX<std::string>] = "TEXT";
676 fieldTypeMap[TYPE_INDEX<bool>] = "BOOLEAN";
677 fieldTypeMap[TYPE_INDEX<Bytes>] = "BLOB";
678 fieldTypeMap[TYPE_INDEX<Asset>] = "ASSET";
679 fieldTypeMap[TYPE_INDEX<Assets>] = "ASSETS";
680 for (const auto &table : updateTableNames) {
681 if (table.second.empty()) {
682 continue;
683 }
684 std::string addColumnSql = "";
685 for (const auto &field : table.second) {
686 addColumnSql += "ALTER TABLE " + table.first + " ADD ";
687 addColumnSql += field.colName + " ";
688 addColumnSql += fieldTypeMap[field.type];
689 addColumnSql += field.primary ? " PRIMARY KEY" : "";
690 addColumnSql += field.nullable ? ";" : " NOT NULL;";
691 }
692 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, addColumnSql);
693 if (errCode != E_OK) {
694 LOGE("Shared table add column failed, %d.", errCode);
695 return errCode;
696 }
697 }
698 return errCode;
699 }
700
AlterTableName(const std::map<std::string,std::string> & tableNames)701 int SQLiteSingleVerRelationalStorageExecutor::AlterTableName(const std::map<std::string, std::string> &tableNames)
702 {
703 for (const auto &tableName : tableNames) {
704 std::string alterTableSql = "ALTER TABLE " + tableName.first + " RENAME TO " + tableName.second + ";";
705 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, alterTableSql);
706 if (errCode != E_OK) {
707 LOGE("Alter table name failed, %d.", errCode);
708 return errCode;
709 }
710 }
711 return E_OK;
712 }
713
AppendUpdateLogRecordWhereSqlCondition(const TableSchema & tableSchema,const VBucket & vBucket,std::string & sql)714 int SQLiteSingleVerRelationalStorageExecutor::AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema,
715 const VBucket &vBucket, std::string &sql)
716 {
717 std::string gidStr;
718 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
719 if (errCode != E_OK) {
720 LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d.", errCode);
721 return errCode;
722 }
723
724 sql += " WHERE ";
725 if (!gidStr.empty()) {
726 sql += "cloud_gid = '" + gidStr + "'";
727 }
728 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
729 if (!pkMap.empty()) {
730 if (!gidStr.empty()) {
731 sql += " OR ";
732 }
733 sql += "(hash_key = ?);";
734 }
735 return E_OK;
736 }
737
DoCleanShareTableDataAndLog(const std::vector<std::string> & tableNameList)738 int SQLiteSingleVerRelationalStorageExecutor::DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList)
739 {
740 int ret = E_OK;
741 int errCode = E_OK;
742 for (const auto &tableName: tableNameList) {
743 std::string delDataSql = "DELETE FROM '" + tableName + "';";
744 sqlite3_stmt *statement = nullptr;
745 errCode = SQLiteUtils::GetStatement(dbHandle_, delDataSql, statement);
746 if (errCode != E_OK) {
747 LOGE("get clean shared data stmt failed %d.", errCode);
748 return errCode;
749 }
750 errCode = SQLiteUtils::StepWithRetry(statement);
751 SQLiteUtils::ResetStatement(statement, true, ret);
752 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
753 errCode = E_OK;
754 } else {
755 LOGE("clean shared data failed: %d.", errCode);
756 break;
757 }
758 statement = nullptr;
759 std::string delLogSql = "DELETE FROM '" + DBConstant::RELATIONAL_PREFIX + tableName + "_log';";
760 errCode = SQLiteUtils::GetStatement(dbHandle_, delLogSql, statement);
761 if (errCode != E_OK) {
762 LOGE("get clean shared log stmt failed %d.", errCode);
763 return errCode;
764 }
765 errCode = SQLiteUtils::StepWithRetry(statement);
766 SQLiteUtils::ResetStatement(statement, true, ret);
767 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
768 errCode = E_OK;
769 } else {
770 LOGE("clean shared log failed: %d.", errCode);
771 break;
772 }
773 }
774 return errCode == E_OK ? ret : errCode;
775 }
776
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,const std::map<std::string,std::vector<TableReferenceProperty>> & tableReference,std::map<int64_t,Entries> & referenceGid)777 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGid(const std::string &tableName,
778 const CloudSyncBatch &syncBatch, const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference,
779 std::map<int64_t, Entries> &referenceGid)
780 {
781 int errCode = E_OK;
782 for (const auto &[targetTable, targetReference] : tableReference) {
783 errCode = GetReferenceGidInner(tableName, targetTable, syncBatch, targetReference, referenceGid);
784 if (errCode != E_OK) {
785 LOGE("[RDBExecutor] get reference gid inner failed %d.", errCode);
786 return errCode;
787 }
788 }
789 return errCode;
790 }
791
GetReferenceGidInner(const std::string & sourceTable,const std::string & targetTable,const CloudSyncBatch & syncBatch,const std::vector<TableReferenceProperty> & targetTableReference,std::map<int64_t,Entries> & referenceGid)792 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidInner(const std::string &sourceTable,
793 const std::string &targetTable, const CloudSyncBatch &syncBatch,
794 const std::vector<TableReferenceProperty> &targetTableReference, std::map<int64_t, Entries> &referenceGid)
795 {
796 auto [sourceFields, targetFields] = SplitReferenceByField(targetTableReference);
797 if (sourceFields.empty()) {
798 LOGD("[RDBExecutor] source field is empty.");
799 return E_OK;
800 }
801 if (sourceFields.size() != targetFields.size()) {
802 LOGE("[RDBExecutor] reference field size not equal.");
803 return -E_INTERNAL_ERROR;
804 }
805 std::string sql = GetReferenceGidSql(sourceTable, targetTable, sourceFields, targetFields);
806 sqlite3_stmt *statement = nullptr;
807 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
808 if (errCode != E_OK) {
809 LOGE("[RDBExecutor] get ref gid data stmt failed. %d", errCode);
810 return errCode;
811 }
812 errCode = GetReferenceGidByStmt(statement, syncBatch, targetTable, referenceGid);
813 int ret = E_OK;
814 SQLiteUtils::ResetStatement(statement, true, ret);
815 return errCode == E_OK ? ret : errCode;
816 }
817
GetReferenceGidSql(const std::string & sourceTable,const std::string & targetTable,const std::vector<std::string> & sourceFields,const std::vector<std::string> & targetFields)818 std::string SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidSql(const std::string &sourceTable,
819 const std::string &targetTable, const std::vector<std::string> &sourceFields,
820 const std::vector<std::string> &targetFields)
821 {
822 // sql like this:
823 // SELECT naturalbase_rdb_aux_parent_log.cloud_gid FROM naturalbase_rdb_aux_parent_log,
824 // (SELECT parent._rowid_ AS rowid_b FROM parent,
825 // (SELECT child._rowid_, name FROM child, naturalbase_rdb_aux_child_log
826 // WHERE child._rowid_ = ? AND naturalbase_rdb_aux_child_log.timestamp = ? ) source_a
827 // WHERE parent.name = source_a.name ) temp_table
828 // WHERE naturalbase_rdb_aux_parent_log.data_key = temp_table.rowid_b
829 std::string logTargetTable = DBCommon::GetLogTableName(targetTable);
830 std::string logSourceTable = DBCommon::GetLogTableName(sourceTable);
831 std::string sql;
832 sql += "SELECT " + logTargetTable + ".cloud_gid" + " FROM " + logTargetTable + ", ";
833 sql += "(";
834 sql += "SELECT " + targetTable + "._rowid_ AS rowid_b FROM " + targetTable
835 + ", ";
836 sql += "(SELECT " + sourceTable + "._rowid_,";
837 std::set<std::string> sourceFieldSet;
838 for (const auto &item : sourceFields) {
839 sourceFieldSet.insert(item);
840 }
841 for (const auto &sourceField : sourceFieldSet) {
842 sql += sourceField + ",";
843 }
844 sql.pop_back();
845 sql += " FROM " + sourceTable + ", " + logSourceTable;
846 sql +=" WHERE " + sourceTable + "._rowid_ = ? AND " + logSourceTable + ".timestamp = ? ";
847 sql += " AND " + logSourceTable + ".flag&0x08=0x00) source_a";
848 sql += " WHERE ";
849 for (size_t i = 0u; i < sourceFields.size(); ++i) {
850 if (i != 0u) {
851 sql += " AND ";
852 }
853 sql += targetTable + "." + targetFields[i] + " = source_a." + sourceFields[i];
854 }
855 sql += ") temp_table ";
856 sql += "WHERE " + logTargetTable + ".data_key = temp_table.rowid_b";
857 sql += " AND " + logTargetTable + ".flag&0x08=0x00";
858 return sql;
859 }
860
GetReferenceGidByStmt(sqlite3_stmt * statement,const CloudSyncBatch & syncBatch,const std::string & targetTable,std::map<int64_t,Entries> & referenceGid)861 int SQLiteSingleVerRelationalStorageExecutor::GetReferenceGidByStmt(sqlite3_stmt *statement,
862 const CloudSyncBatch &syncBatch, const std::string &targetTable, std::map<int64_t, Entries> &referenceGid)
863 {
864 int errCode = E_OK;
865 if (syncBatch.rowid.size() != syncBatch.timestamp.size()) {
866 LOGE("[RDBExecutor] rowid size [%zu] not equal to timestamp size [%zu].", syncBatch.rowid.size(),
867 syncBatch.timestamp.size());
868 return -E_INVALID_ARGS;
869 }
870 int matchCount = 0;
871 for (size_t i = 0u; i < syncBatch.rowid.size(); i++) {
872 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, syncBatch.rowid[i]); // 1 is rowid index
873 if (errCode != E_OK) {
874 LOGE("[RDBExecutor] bind rowid to stmt failed %d.", errCode);
875 break;
876 }
877 errCode = SQLiteUtils::BindInt64ToStatement(statement, 2, syncBatch.timestamp[i]); // 2 is timestamp index
878 if (errCode != E_OK) {
879 LOGE("[RDBExecutor] bind timestamp to stmt failed %d.", errCode);
880 break;
881 }
882 while ((errCode = SQLiteUtils::StepNext(statement, isMemDb_)) == E_OK) {
883 std::string gid;
884 (void)SQLiteUtils::GetColumnTextValue(statement, 0, gid);
885 if (gid.empty()) {
886 LOGE("[RDBExecutor] reference data don't contain gid.");
887 errCode = -E_CLOUD_ERROR;
888 break;
889 }
890 referenceGid[syncBatch.rowid[i]][targetTable] = gid;
891 matchCount++;
892 }
893 if (errCode == -E_FINISHED) {
894 errCode = E_OK;
895 }
896 if (errCode != E_OK) {
897 LOGE("[RDBExecutor] step stmt failed %d.", errCode);
898 break;
899 }
900 SQLiteUtils::ResetStatement(statement, false, errCode);
901 if (errCode != E_OK) {
902 LOGE("[RDBExecutor] reset stmt failed %d.", errCode);
903 break;
904 }
905 }
906 if (matchCount != 0) {
907 LOGD("[RDBExecutor] get reference gid match %d.", matchCount);
908 }
909 return errCode;
910 }
911
SplitReferenceByField(const std::vector<TableReferenceProperty> & targetTableReference)912 PairStringVector SQLiteSingleVerRelationalStorageExecutor::SplitReferenceByField(
913 const std::vector<TableReferenceProperty> &targetTableReference)
914 {
915 PairStringVector sourceTargetFiled;
916 for (const auto &reference : targetTableReference) {
917 for (const auto &column : reference.columns) {
918 sourceTargetFiled.first.push_back(column.first);
919 sourceTargetFiled.second.push_back(column.second);
920 }
921 }
922 return sourceTargetFiled;
923 }
924
BindStmtWithCloudGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid,sqlite3_stmt * & stmt)925 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSyncData &cloudDataResult,
926 bool ignoreEmptyGid, sqlite3_stmt *&stmt)
927 {
928 int fillGidCount = 0;
929 int errCode = E_OK;
930 for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
931 auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
932 if (gidEntry == cloudDataResult.insData.extend[i].end()) {
933 bool isSkipAssetsMissRecord = false;
934 if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) {
935 LOGI("[RDBExecutor] Local assets missing and skip filling assets.");
936 isSkipAssetsMissRecord = true;
937 }
938 if (ignoreEmptyGid || isSkipAssetsMissRecord) {
939 continue;
940 }
941 errCode = -E_INVALID_ARGS;
942 LOGE("[RDBExecutor] Extend not contain gid.");
943 break;
944 }
945 bool containError = DBCommon::IsRecordError(cloudDataResult.insData.extend[i]);
946 if (ignoreEmptyGid && containError) {
947 continue;
948 }
949 std::string val;
950 if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
951 cloudDataResult.insData.extend[i], val) != E_OK) {
952 errCode = -E_CLOUD_ERROR;
953 LOGE("[RDBExecutor] Can't get string gid from extend.");
954 break;
955 }
956 if (val.empty()) {
957 errCode = -E_CLOUD_ERROR;
958 LOGE("[RDBExecutor] Get empty gid from extend.");
959 break;
960 }
961 errCode = BindStmtWithCloudGidInner(val, cloudDataResult.insData.rowid[i], stmt, fillGidCount);
962 if (errCode != E_OK) {
963 LOGE("[RDBExecutor] Bind stmt error %d.", errCode);
964 break;
965 }
966 }
967 LOGD("[RDBExecutor] Fill gid count %d.", fillGidCount);
968 return errCode;
969 }
970
CleanExtendAndCursorForDeleteData(const std::string & tableName)971 int SQLiteSingleVerRelationalStorageExecutor::CleanExtendAndCursorForDeleteData(const std::string &tableName)
972 {
973 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
974 std::string sql = "DELETE FROM " + logTable + " where flag&0x01=0x01;";
975 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
976 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
977 LOGE("update extend field and cursor failed %d.", errCode);
978 }
979 return errCode;
980 }
981
CheckIfExistUserTable(const std::string & tableName)982 int SQLiteSingleVerRelationalStorageExecutor::CheckIfExistUserTable(const std::string &tableName)
983 {
984 std::string sql = "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?";
985 sqlite3_stmt *statement = nullptr;
986 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
987 if (errCode != E_OK) {
988 LOGE("[RDBExecutor] Prepare the sql statement error: %d.", errCode);
989 return errCode;
990 }
991 errCode = SQLiteUtils::BindTextToStatement(statement, 1, tableName);
992 if (errCode != E_OK) {
993 LOGE("[RDBExecutor] Bind table name failed: %d.", errCode);
994 SQLiteUtils::ResetStatement(statement, true, errCode);
995 return errCode;
996 }
997 if (SQLiteUtils::StepWithRetry(statement) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
998 LOGE("[RDBExecutor] local exists user table which shared table name is same as.");
999 SQLiteUtils::ResetStatement(statement, true, errCode);
1000 return -E_INVALID_ARGS;
1001 }
1002 SQLiteUtils::ResetStatement(statement, true, errCode);
1003 return E_OK;
1004 }
1005
GetCloudDeleteSql(const std::string & table)1006 std::string SQLiteSingleVerRelationalStorageExecutor::GetCloudDeleteSql(const std::string &table)
1007 {
1008 std::string logTable = DBCommon::GetLogTableName(table);
1009 int cursor = GetCursor(table);
1010 std::string sql;
1011 sql += " cloud_gid = '', version = '', ";
1012 if (isLogicDelete_) {
1013 // cursor already increased by DeleteCloudData, can be assigned directly here
1014 // 1001 which is logicDelete|cloudForcePush|local|delete
1015 sql += "flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1016 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE) |
1017 static_cast<uint32_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) + ", cursor = " + std::to_string(cursor) + " ";
1018 } else {
1019 sql += "data_key = -1, flag = flag&" + std::string(CONSISTENT_FLAG) + "|" +
1020 std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) + ", sharing_resource = ''";
1021 int errCode = SetCursor(table, cursor + 1);
1022 if (errCode == E_OK) {
1023 sql += ", cursor = " + std::to_string(cursor + 1) + " ";
1024 } else {
1025 LOGW("[RDBExecutor] Increase cursor failed when delete log: %d.", errCode);
1026 }
1027 }
1028 return sql;
1029 }
1030
RemoveDataAndLog(const std::string & tableName,int64_t dataKey)1031 int SQLiteSingleVerRelationalStorageExecutor::RemoveDataAndLog(const std::string &tableName, int64_t dataKey)
1032 {
1033 int errCode = E_OK;
1034 std::string removeDataSql = "DELETE FROM " + tableName + " WHERE " + DBConstant::SQLITE_INNER_ROWID + " = " +
1035 std::to_string(dataKey);
1036 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeDataSql);
1037 if (errCode != E_OK) {
1038 LOGE("[RDBExecutor] remove data failed %d", errCode);
1039 return errCode;
1040 }
1041 std::string removeLogSql = "DELETE FROM " + DBCommon::GetLogTableName(tableName) + " WHERE data_key = " +
1042 std::to_string(dataKey);
1043 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, removeLogSql);
1044 if (errCode != E_OK) {
1045 LOGE("[RDBExecutor] remove log failed %d", errCode);
1046 }
1047 return errCode;
1048 }
1049
GetLocalDataKey(size_t index,const DownloadData & downloadData)1050 int64_t SQLiteSingleVerRelationalStorageExecutor::GetLocalDataKey(size_t index,
1051 const DownloadData &downloadData)
1052 {
1053 if (index >= downloadData.existDataKey.size()) {
1054 LOGW("[RDBExecutor] index out of range when get local data key."); // should not happen
1055 return -1; // -1 means not exist
1056 }
1057 return downloadData.existDataKey[index];
1058 }
1059
BindStmtWithCloudGidInner(const std::string & gid,int64_t rowid,sqlite3_stmt * & stmt,int & fillGidCount)1060 int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid,
1061 sqlite3_stmt *&stmt, int &fillGidCount)
1062 {
1063 int errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gid); // 1 means the gid index
1064 if (errCode != E_OK) {
1065 return errCode;
1066 }
1067 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means rowid
1068 if (errCode != E_OK) {
1069 return errCode;
1070 }
1071 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1072 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1073 errCode = E_OK;
1074 fillGidCount++;
1075 SQLiteUtils::ResetStatement(stmt, false, errCode);
1076 } else {
1077 LOGE("[RDBExecutor] Update cloud log failed: %d.", errCode);
1078 }
1079 return errCode;
1080 }
1081
RenewTableTrigger(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)1082 int SQLiteSingleVerRelationalStorageExecutor::RenewTableTrigger(DistributedTableMode mode,
1083 const TableInfo &tableInfo, TableSyncType syncType)
1084 {
1085 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
1086 return tableManager->AddRelationalLogTableTrigger(dbHandle_, tableInfo, "");
1087 }
1088
DoCleanAssetId(const std::string & tableName,const RelationalSchemaObject & localSchema)1089 int SQLiteSingleVerRelationalStorageExecutor::DoCleanAssetId(const std::string &tableName,
1090 const RelationalSchemaObject &localSchema)
1091 {
1092 std::vector<int64_t> dataKeys;
1093 std::string logTableName = DBCommon::GetLogTableName(tableName);
1094 int errCode = GetCleanCloudDataKeys(logTableName, dataKeys, false);
1095 if (errCode != E_OK) {
1096 LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
1097 return errCode;
1098 }
1099 std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
1100 errCode = CleanAssetId(tableName, fieldInfos, dataKeys);
1101 if (errCode != E_OK) {
1102 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d.", errCode);
1103 }
1104 return errCode;
1105 }
1106
CleanAssetId(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys)1107 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetId(const std::string &tableName,
1108 const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys)
1109 {
1110 int errCode = E_OK;
1111 for (const auto &fieldInfo : fieldInfos) {
1112 if (fieldInfo.IsAssetType()) {
1113 Assets assets;
1114 errCode = GetAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1115 if (errCode != E_OK) {
1116 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
1117 return errCode;
1118 }
1119 errCode = UpdateAssetIdOnUserTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
1120 if (errCode != E_OK) {
1121 LOGE("[Storage Executor] failed to save clean asset id on table, %d.", errCode);
1122 return errCode;
1123 }
1124 } else if (fieldInfo.IsAssetsType()) {
1125 errCode = GetAssetsAndUpdateAssetsId(tableName, fieldInfo.GetFieldName(), dataKeys);
1126 if (errCode != E_OK) {
1127 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
1128 return errCode;
1129 }
1130 }
1131 }
1132 return errCode;
1133 }
1134
UpdateAssetIdOnUserTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)1135 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetIdOnUserTable(const std::string &tableName,
1136 const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
1137 {
1138 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1139 return E_OK;
1140 }
1141 int errCode = E_OK;
1142 int ret = E_OK;
1143 sqlite3_stmt *stmt = nullptr;
1144 size_t index = 0;
1145 for (const auto &rowId : dataKeys) {
1146 if (rowId == -1) { // -1 means data is deleted
1147 continue;
1148 }
1149 if (assets[index].name.empty()) { // LCOV_EXCL_BR_LINE
1150 index++;
1151 continue;
1152 }
1153 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1154 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1155 errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1156 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1157 LOGE("Get statement failed, %d", errCode);
1158 return errCode;
1159 }
1160 assets[index].assetId = "";
1161 assets[index].status &= ~AssetStatus::UPLOADING;
1162 errCode = BindAssetToBlobStatement(assets[index], 1, stmt); // 1 means sqlite statement index
1163 index++;
1164 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1165 LOGE("Bind asset to blob statement failed, %d", errCode);
1166 goto END;
1167 }
1168 errCode = SQLiteUtils::StepWithRetry(stmt);
1169 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1170 errCode = E_OK;
1171 } else {
1172 LOGE("Step statement failed, %d", errCode);
1173 goto END;
1174 }
1175 SQLiteUtils::ResetStatement(stmt, true, ret);
1176 }
1177 return errCode != E_OK ? errCode : ret;
1178 END:
1179 SQLiteUtils::ResetStatement(stmt, true, ret);
1180 return errCode != E_OK ? errCode : ret;
1181 }
1182
GetAssetsAndUpdateAssetsId(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys)1183 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsAndUpdateAssetsId(const std::string &tableName,
1184 const std::string &fieldName, const std::vector<int64_t> &dataKeys)
1185 {
1186 int errCode = E_OK;
1187 int ret = E_OK;
1188 sqlite3_stmt *selectStmt = nullptr;
1189 for (const auto &rowId : dataKeys) {
1190 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1191 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1192 errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1193 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1194 LOGE("Get select assets statement failed, %d.", errCode);
1195 goto END;
1196 }
1197 Assets assets;
1198 errCode = GetAssetsByRowId(selectStmt, assets);
1199 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1200 LOGE("Get assets by rowId failed, %d.", errCode);
1201 goto END;
1202 }
1203 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1204 if (assets.empty()) { // LCOV_EXCL_BR_LINE
1205 continue;
1206 }
1207 for (auto &asset : assets) {
1208 asset.assetId = "";
1209 asset.status &= ~AssetStatus::UPLOADING;
1210 }
1211 std::vector<uint8_t> assetsValue;
1212 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsValue);
1213 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1214 LOGE("[CleanAssetsIdOnUserTable] failed to transfer assets to blob, %d.", errCode);
1215 return errCode;
1216 }
1217 errCode = CleanAssetsIdOnUserTable(tableName, fieldName, rowId, assetsValue);
1218 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1219 LOGE("[CleanAssetsIdOnUserTable] clean assets id on user table failed, %d", errCode);
1220 return errCode;
1221 }
1222 }
1223 return errCode != E_OK ? errCode : ret;
1224 END:
1225 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1226 return errCode != E_OK ? errCode : ret;
1227 }
1228
CleanAssetsIdOnUserTable(const std::string & tableName,const std::string & fieldName,const int64_t rowId,const std::vector<uint8_t> & assetsValue)1229 int SQLiteSingleVerRelationalStorageExecutor::CleanAssetsIdOnUserTable(const std::string &tableName,
1230 const std::string &fieldName, const int64_t rowId, const std::vector<uint8_t> &assetsValue)
1231 {
1232 std::string cleanAssetIdSql = "UPDATE " + tableName + " SET " + fieldName + " = ? WHERE " +
1233 std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(rowId) + ";";
1234 sqlite3_stmt *stmt = nullptr;
1235 int errCode = SQLiteUtils::GetStatement(dbHandle_, cleanAssetIdSql, stmt);
1236 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1237 LOGE("Get statement failed, %d", errCode);
1238 SQLiteUtils::ResetStatement(stmt, true, errCode);
1239 return errCode;
1240 }
1241 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, assetsValue, false);
1242 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1243 SQLiteUtils::ResetStatement(stmt, true, errCode);
1244 return errCode;
1245 }
1246 errCode = SQLiteUtils::StepWithRetry(stmt);
1247 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // LCOV_EXCL_BR_LINE
1248 errCode = E_OK;
1249 }
1250 SQLiteUtils::ResetStatement(stmt, true, errCode);
1251 return errCode;
1252 }
1253
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1254 std::pair<int, uint32_t> SQLiteSingleVerRelationalStorageExecutor::GetAssetsByGidOrHashKey(
1255 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
1256 {
1257 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
1258 auto &[errCode, status] = res;
1259 std::vector<Field> assetFields;
1260 std::string sql = "SELECT";
1261 for (const auto &field: tableSchema.fields) {
1262 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
1263 assetFields.emplace_back(field);
1264 sql += " b." + field.colName + ",";
1265 }
1266 }
1267 if (assetFields.empty()) {
1268 return { -E_NOT_FOUND, status };
1269 }
1270 sql += "a.cloud_gid, a.status ";
1271 sql += CloudStorageUtils::GetLeftJoinLogSql(tableSchema.name) + " WHERE (a." + FLAG_NOT_LOGIC_DELETE + ") AND (" +
1272 (gid.empty() ? "a.hash_key = ?);" : " a.cloud_gid = ? OR a.hash_key = ?);");
1273 sqlite3_stmt *stmt = nullptr;
1274 errCode = InitGetAssetStmt(sql, gid, hashKey, stmt);
1275 if (errCode != E_OK) {
1276 return res;
1277 }
1278 errCode = SQLiteUtils::StepWithRetry(stmt);
1279 int index = 0;
1280 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1281 for (const auto &field: assetFields) {
1282 Type cloudValue;
1283 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, index++, cloudValue);
1284 if (errCode != E_OK) {
1285 break;
1286 }
1287 errCode = PutVBucketByType(assets, field, cloudValue);
1288 if (errCode != E_OK) {
1289 break;
1290 }
1291 }
1292 std::string curGid;
1293 errCode = SQLiteUtils::GetColumnTextValue(stmt, index++, curGid);
1294 if (errCode == E_OK && CloudStorageUtils::IsCloudGidMismatch(gid, curGid)) {
1295 // Gid is different, there may be duplicate primary keys in the cloud
1296 errCode = -E_CLOUD_GID_MISMATCH;
1297 }
1298 status = static_cast<uint32_t>(sqlite3_column_int(stmt, index++));
1299 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1300 errCode = -E_NOT_FOUND;
1301 } else {
1302 LOGE("step get asset stmt failed %d.", errCode);
1303 }
1304 SQLiteUtils::ResetStatement(stmt, true, errCode);
1305 return res;
1306 }
1307
InitGetAssetStmt(const std::string & sql,const std::string & gid,const Bytes & hashKey,sqlite3_stmt * & stmt)1308 int SQLiteSingleVerRelationalStorageExecutor::InitGetAssetStmt(const std::string &sql, const std::string &gid,
1309 const Bytes &hashKey, sqlite3_stmt *&stmt)
1310 {
1311 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1312 if (errCode != E_OK) {
1313 LOGE("Get asset statement failed, %d.", errCode);
1314 return errCode;
1315 }
1316 int index = 1;
1317 if (!gid.empty()) {
1318 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
1319 if (errCode != E_OK) {
1320 LOGE("bind gid failed %d.", errCode);
1321 SQLiteUtils::ResetStatement(stmt, true, errCode);
1322 return errCode;
1323 }
1324 }
1325 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, hashKey);
1326 if (errCode != E_OK) {
1327 LOGE("bind hash failed %d.", errCode);
1328 SQLiteUtils::ResetStatement(stmt, true, errCode);
1329 }
1330 return errCode;
1331 }
1332
FillHandleWithOpType(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid,const TableSchema & tableSchema)1333 int SQLiteSingleVerRelationalStorageExecutor::FillHandleWithOpType(const OpType opType, const CloudSyncData &data,
1334 bool fillAsset, bool ignoreEmptyGid, const TableSchema &tableSchema)
1335 {
1336 int errCode = E_OK;
1337 switch (opType) {
1338 case OpType::UPDATE_VERSION: // fallthrough
1339 case OpType::INSERT_VERSION: {
1340 errCode = FillCloudVersionForUpload(opType, data);
1341 break;
1342 }
1343 case OpType::SET_UPLOADING: {
1344 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1345 if (errCode != E_OK) {
1346 LOGE("Failed to set uploading for ins data, %d.", errCode);
1347 return errCode;
1348 }
1349 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1350 break;
1351 }
1352 case OpType::INSERT: {
1353 errCode = UpdateCloudLogGid(data, ignoreEmptyGid);
1354 if (errCode != E_OK) {
1355 LOGE("Failed to fill cloud log gid, %d.", errCode);
1356 return errCode;
1357 }
1358 if (fillAsset) {
1359 errCode = FillCloudAssetForUpload(opType, tableSchema, data.insData);
1360 if (errCode != E_OK) {
1361 LOGE("Failed to fill asset for ins, %d.", errCode);
1362 return errCode;
1363 }
1364 }
1365 errCode = FillCloudVersionForUpload(OpType::INSERT_VERSION, data);
1366 break;
1367 }
1368 case OpType::UPDATE: {
1369 if (fillAsset && !data.updData.assets.empty()) {
1370 errCode = FillCloudAssetForUpload(opType, tableSchema, data.updData);
1371 if (errCode != E_OK) {
1372 LOGE("Failed to fill asset for upd, %d.", errCode);
1373 return errCode;
1374 }
1375 }
1376 errCode = FillCloudVersionForUpload(OpType::UPDATE_VERSION, data);
1377 break;
1378 }
1379 default:
1380 break;
1381 }
1382 return errCode;
1383 }
1384
GetAssetsByRowId(sqlite3_stmt * & selectStmt,Assets & assets)1385 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets)
1386 {
1387 int errCode = SQLiteUtils::StepWithRetry(selectStmt);
1388 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1389 std::vector<uint8_t> blobValue;
1390 errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
1391 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1392 LOGE("Get column blob value failed %d.", errCode);
1393 return errCode;
1394 }
1395 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1396 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1397 LOGE("Transfer blob to assets failed %d", errCode);
1398 }
1399 return errCode;
1400 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1401 return E_OK;
1402 } else {
1403 LOGE("Step select statement failed %d.", errCode);
1404 return errCode;
1405 }
1406 }
1407
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1408 void SQLiteSingleVerRelationalStorageExecutor::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1409 {
1410 assetLoader_ = loader;
1411 }
1412
ExecuteFillDownloadAssetStatement(sqlite3_stmt * & stmt,int beginIndex,const std::string & cloudGid)1413 int SQLiteSingleVerRelationalStorageExecutor::ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt,
1414 int beginIndex, const std::string &cloudGid)
1415 {
1416 int errCode = SQLiteUtils::BindTextToStatement(stmt, beginIndex, cloudGid);
1417 if (errCode != E_OK) {
1418 LOGE("Bind cloud gid to statement failed %d.", errCode);
1419 int ret = E_OK;
1420 SQLiteUtils::ResetStatement(stmt, true, ret);
1421 return errCode;
1422 }
1423 errCode = SQLiteUtils::StepWithRetry(stmt);
1424 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1425 errCode = E_OK;
1426 } else {
1427 LOGE("Fill cloud asset failed: %d.", errCode);
1428 }
1429 int ret = E_OK;
1430 SQLiteUtils::ResetStatement(stmt, true, ret);
1431 return errCode != E_OK ? errCode : ret;
1432 }
1433
CleanDownloadChangedAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType)1434 int SQLiteSingleVerRelationalStorageExecutor::CleanDownloadChangedAssets(
1435 const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType)
1436 {
1437 if (assetLoader_ == nullptr) {
1438 LOGE("assetLoader may be not set.");
1439 return -E_NOT_SET;
1440 }
1441 std::vector<Asset> toDeleteAssets;
1442 CloudStorageUtils::GetToBeRemoveAssets(vBucket, assetOpType, toDeleteAssets);
1443 if (toDeleteAssets.empty()) {
1444 return E_OK;
1445 }
1446 DBStatus ret = assetLoader_->RemoveLocalAssets(toDeleteAssets);
1447 if (ret != OK) {
1448 LOGE("remove local assets failed %d.", ret);
1449 return -E_REMOVE_ASSETS_FAILED;
1450 }
1451 return E_OK;
1452 }
1453
GetAndBindFillUploadAssetStatement(const std::string & tableName,const VBucket & assets,sqlite3_stmt * & statement)1454 int SQLiteSingleVerRelationalStorageExecutor::GetAndBindFillUploadAssetStatement(const std::string &tableName,
1455 const VBucket &assets, sqlite3_stmt *&statement)
1456 {
1457 std::string sql = "UPDATE '" + tableName + "' SET ";
1458 for (const auto &item: assets) {
1459 sql += item.first + " = ?,";
1460 }
1461 sql.pop_back();
1462 sql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = ?;";
1463 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1464 if (errCode != E_OK) {
1465 return errCode;
1466 }
1467 int bindIndex = 1;
1468 for (const auto &item: assets) {
1469 Field field = {
1470 .colName = item.first, .type = static_cast<int32_t>(item.second.index())
1471 };
1472 errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](bindIndex++, assets, field, statement);
1473 if (errCode != E_OK) {
1474 return errCode;
1475 }
1476 }
1477 return errCode;
1478 }
1479
OnlyUpdateAssetId(const std::string & tableName,const TableSchema & tableSchema,const VBucket & vBucket,int64_t dataKey,OpType opType)1480 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateAssetId(const std::string &tableName,
1481 const TableSchema &tableSchema, const VBucket &vBucket, int64_t dataKey, OpType opType)
1482 {
1483 if (opType != OpType::ONLY_UPDATE_GID && opType != OpType::NOT_HANDLE &&
1484 opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1485 return E_OK;
1486 }
1487 if (CloudStorageUtils::IsSharedTable(tableSchema)) {
1488 // this is shared table, not need to update asset id.
1489 return E_OK;
1490 }
1491 if (!IsNeedUpdateAssetId(tableSchema, dataKey, vBucket)) {
1492 return E_OK;
1493 }
1494 int errCode = UpdateAssetId(tableSchema, dataKey, vBucket);
1495 if (errCode != E_OK) {
1496 LOGE("[Storage Executor] failed to update assetId on table, %d.", errCode);
1497 }
1498 return errCode;
1499 }
1500
UpdateLocalAssetId(const VBucket & vBucket,const std::string & fieldName,Asset & asset)1501 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName,
1502 Asset &asset)
1503 {
1504 for (const auto &[col, value] : vBucket) {
1505 if (value.index() == TYPE_INDEX<Asset> && col == fieldName) {
1506 asset = std::get<Asset>(value);
1507 }
1508 }
1509 }
1510
UpdateLocalAssetsId(const VBucket & vBucket,const std::string & fieldName,Assets & assets)1511 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName,
1512 Assets &assets)
1513 {
1514 for (const auto &[col, value] : vBucket) {
1515 if (value.index() == TYPE_INDEX<Assets> && col == fieldName) {
1516 assets = std::get<Assets>(value);
1517 }
1518 }
1519 }
1520
UpdateLocalAssetsIdInner(const Assets & cloudAssets,Assets & assets)1521 void SQLiteSingleVerRelationalStorageExecutor::UpdateLocalAssetsIdInner(const Assets &cloudAssets, Assets &assets)
1522 {
1523 for (const auto &cloudAsset : cloudAssets) {
1524 for (auto &asset : assets) {
1525 if (asset.name == cloudAsset.name) {
1526 asset.assetId = cloudAsset.assetId;
1527 }
1528 }
1529 }
1530 }
1531
BindAssetToBlobStatement(const Asset & asset,int index,sqlite3_stmt * & stmt)1532 int SQLiteSingleVerRelationalStorageExecutor::BindAssetToBlobStatement(const Asset &asset, int index,
1533 sqlite3_stmt *&stmt)
1534 {
1535 std::vector<uint8_t> blobValue;
1536 int errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, blobValue);
1537 if (errCode != E_OK) {
1538 LOGE("Transfer asset to blob failed, %d.", errCode);
1539 return errCode;
1540 }
1541 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1542 if (errCode != E_OK) {
1543 LOGE("Bind asset blob to statement failed, %d.", errCode);
1544 }
1545 return errCode;
1546 }
1547
BindAssetsToBlobStatement(const Assets & assets,int index,sqlite3_stmt * & stmt)1548 int SQLiteSingleVerRelationalStorageExecutor::BindAssetsToBlobStatement(const Assets &assets, int index,
1549 sqlite3_stmt *&stmt)
1550 {
1551 std::vector<uint8_t> blobValue;
1552 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, blobValue);
1553 if (errCode != E_OK) {
1554 LOGE("Transfer asset to blob failed, %d.", errCode);
1555 return errCode;
1556 }
1557 errCode = SQLiteUtils::BindBlobToStatement(stmt, index, blobValue, false);
1558 if (errCode != E_OK) {
1559 LOGE("Bind asset blob to statement failed, %d.", errCode);
1560 }
1561 return errCode;
1562 }
1563
GetAssetOnTableInner(sqlite3_stmt * & stmt,Asset & asset)1564 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTableInner(sqlite3_stmt *&stmt, Asset &asset)
1565 {
1566 int errCode = SQLiteUtils::StepWithRetry(stmt);
1567 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1568 std::vector<uint8_t> blobValue;
1569 errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1570 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1571 LOGE("[RDBExecutor][GetAssetOnTableInner] Get column blob value failed, %d.", errCode);
1572 return errCode;
1573 }
1574 errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
1575 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1576 LOGE("[RDBExecutor] Transfer blob to asset failed, %d.", errCode);
1577 }
1578 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1579 errCode = E_OK;
1580 } else {
1581 LOGE("[RDBExecutor] Step failed when get asset from table, errCode = %d.", errCode);
1582 }
1583 return errCode;
1584 }
1585
GetAssetOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Asset & asset)1586 int SQLiteSingleVerRelationalStorageExecutor::GetAssetOnTable(const std::string &tableName,
1587 const std::string &fieldName, const int64_t dataKey, Asset &asset)
1588 {
1589 sqlite3_stmt *selectStmt = nullptr;
1590 std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
1591 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1592 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
1593 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1594 LOGE("Get select asset statement failed, %d.", errCode);
1595 return errCode;
1596 }
1597 errCode = GetAssetOnTableInner(selectStmt, asset);
1598 int ret = E_OK;
1599 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1600 return errCode != E_OK ? errCode : ret;
1601 }
1602
GetAssetsOnTableInner(sqlite3_stmt * & stmt,Assets & assets)1603 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTableInner(sqlite3_stmt *&stmt, Assets &assets)
1604 {
1605 int errCode = SQLiteUtils::StepWithRetry(stmt);
1606 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { // LCOV_EXCL_BR_LINE
1607 std::vector<uint8_t> blobValue;
1608 errCode = SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue);
1609 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1610 LOGE("[RDBExecutor][GetAssetsOnTableInner] Get column blob value failed, %d.", errCode);
1611 return errCode;
1612 }
1613 errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets);
1614 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1615 LOGE("[RDBExecutor] Transfer blob to assets failed, %d.", errCode);
1616 }
1617 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1618 errCode = E_OK;
1619 } else {
1620 LOGE("[RDBExecutor] Step failed when get assets from table, errCode = %d.", errCode);
1621 }
1622 return errCode;
1623 }
1624
GetAssetsOnTable(const std::string & tableName,const std::string & fieldName,const int64_t dataKey,Assets & assets)1625 int SQLiteSingleVerRelationalStorageExecutor::GetAssetsOnTable(const std::string &tableName,
1626 const std::string &fieldName, const int64_t dataKey, Assets &assets)
1627 {
1628 sqlite3_stmt *selectStmt = nullptr;
1629 std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
1630 "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1631 int errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
1632 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
1633 LOGE("Get select assets statement failed, %d.", errCode);
1634 return errCode;
1635 }
1636 errCode = GetAssetsOnTableInner(selectStmt, assets);
1637 int ret = E_OK;
1638 SQLiteUtils::ResetStatement(selectStmt, true, ret);
1639 return errCode != E_OK ? errCode : ret;
1640 }
1641
BindAssetFiledToBlobStatement(const TableSchema & tableSchema,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord,sqlite3_stmt * & stmt)1642 int SQLiteSingleVerRelationalStorageExecutor::BindAssetFiledToBlobStatement(const TableSchema &tableSchema,
1643 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt)
1644 {
1645 int assetIndex = 0;
1646 int assetsIndex = 0;
1647 for (const auto &field : tableSchema.fields) {
1648 if (field.type == TYPE_INDEX<Asset>) {
1649 if (assetOfOneRecord[assetIndex].name.empty()) {
1650 continue;
1651 }
1652 int errCode = BindAssetToBlobStatement(assetOfOneRecord[assetIndex], assetIndex + assetsIndex + 1, stmt);
1653 if (errCode != E_OK) {
1654 LOGE("Bind asset to blob statement failed, %d.", errCode);
1655 return errCode;
1656 }
1657 assetIndex++;
1658 } else if (field.type == TYPE_INDEX<Assets>) {
1659 if (assetsOfOneRecord[assetsIndex].empty()) {
1660 continue;
1661 }
1662 int errCode = BindAssetsToBlobStatement(assetsOfOneRecord[assetsIndex], assetIndex + assetsIndex + 1, stmt);
1663 if (errCode != E_OK) {
1664 LOGE("Bind assets to blob statement failed, %d.", errCode);
1665 return errCode;
1666 }
1667 assetsIndex++;
1668 }
1669 }
1670 return E_OK;
1671 }
1672
UpdateAssetsIdForOneRecord(const TableSchema & tableSchema,const std::string & sql,const std::vector<Asset> & assetOfOneRecord,const std::vector<Assets> & assetsOfOneRecord)1673 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetsIdForOneRecord(const TableSchema &tableSchema,
1674 const std::string &sql, const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord)
1675 {
1676 int errCode = E_OK;
1677 int ret = E_OK;
1678 sqlite3_stmt *stmt = nullptr;
1679 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1680 if (errCode != E_OK) {
1681 LOGE("Get update asset statement failed, %d.", errCode);
1682 return errCode;
1683 }
1684 errCode = BindAssetFiledToBlobStatement(tableSchema, assetOfOneRecord, assetsOfOneRecord, stmt);
1685 if (errCode != E_OK) {
1686 LOGE("Asset field Bind asset to blob statement failed, %d.", errCode);
1687 SQLiteUtils::ResetStatement(stmt, true, ret);
1688 return errCode != E_OK ? errCode : ret;
1689 }
1690 errCode = SQLiteUtils::StepWithRetry(stmt);
1691 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1692 errCode = E_OK;
1693 } else {
1694 LOGE("Step statement failed, %d", errCode);
1695 }
1696 SQLiteUtils::ResetStatement(stmt, true, ret);
1697 return errCode != E_OK ? errCode : ret;
1698 }
1699
UpdateAssetId(const TableSchema & tableSchema,int64_t dataKey,const VBucket & vBucket)1700 int SQLiteSingleVerRelationalStorageExecutor::UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey,
1701 const VBucket &vBucket)
1702 {
1703 int errCode = E_OK;
1704 std::vector<Asset> assetOfOneRecord;
1705 std::vector<Assets> assetsOfOneRecord;
1706 std::string updateAssetIdSql = "UPDATE " + tableSchema.name + " SET";
1707 for (const auto &field : tableSchema.fields) {
1708 if (field.type == TYPE_INDEX<Asset>) {
1709 Asset asset;
1710 UpdateLocalAssetId(vBucket, field.colName, asset);
1711 assetOfOneRecord.push_back(asset);
1712 if (!asset.name.empty()) {
1713 updateAssetIdSql += " " + field.colName + " = ?,";
1714 }
1715 }
1716 if (field.type == TYPE_INDEX<Assets>) {
1717 Assets assets;
1718 UpdateLocalAssetsId(vBucket, field.colName, assets);
1719 assetsOfOneRecord.push_back(assets);
1720 if (!assets.empty()) {
1721 updateAssetIdSql += " " + field.colName + " = ?,";
1722 }
1723 }
1724 }
1725 if (updateAssetIdSql == "UPDATE " + tableSchema.name + " SET") {
1726 return E_OK;
1727 }
1728 updateAssetIdSql.pop_back();
1729 updateAssetIdSql += " WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " = " + std::to_string(dataKey) + ";";
1730 errCode = UpdateAssetsIdForOneRecord(tableSchema, updateAssetIdSql, assetOfOneRecord, assetsOfOneRecord);
1731 if (errCode != E_OK) {
1732 LOGE("[Storage Executor] failed to update asset id on table, %d.", errCode);
1733 }
1734 return errCode;
1735 }
1736
SetPutDataMode(PutDataMode mode)1737 void SQLiteSingleVerRelationalStorageExecutor::SetPutDataMode(PutDataMode mode)
1738 {
1739 putDataMode_ = mode;
1740 }
1741
SetMarkFlagOption(MarkFlagOption option)1742 void SQLiteSingleVerRelationalStorageExecutor::SetMarkFlagOption(MarkFlagOption option)
1743 {
1744 markFlagOption_ = option;
1745 }
1746
GetDataFlag()1747 int64_t SQLiteSingleVerRelationalStorageExecutor::GetDataFlag()
1748 {
1749 if (putDataMode_ != PutDataMode::USER) {
1750 return static_cast<int64_t>(LogInfoFlag::FLAG_CLOUD) |
1751 static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1752 }
1753 uint32_t flag = static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL);
1754 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1755 flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1756 }
1757 flag |= static_cast<int64_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY);
1758 return static_cast<int64_t>(flag);
1759 }
1760
GetUpdateDataFlagSql()1761 std::string SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataFlagSql()
1762 {
1763 if (putDataMode_ == PutDataMode::SYNC) {
1764 return UPDATE_FLAG_CLOUD;
1765 }
1766 if (markFlagOption_ == MarkFlagOption::SET_WAIT_COMPENSATED_SYNC) {
1767 return UPDATE_FLAG_WAIT_COMPENSATED_SYNC;
1768 }
1769 return UPDATE_FLAG_CLOUD;
1770 }
1771
GetDev()1772 std::string SQLiteSingleVerRelationalStorageExecutor::GetDev()
1773 {
1774 return putDataMode_ == PutDataMode::SYNC ? "cloud" : "";
1775 }
1776
GetUpdateField(const VBucket & vBucket,const TableSchema & tableSchema)1777 std::vector<Field> SQLiteSingleVerRelationalStorageExecutor::GetUpdateField(const VBucket &vBucket,
1778 const TableSchema &tableSchema)
1779 {
1780 std::set<std::string> useFields;
1781 std::vector<Field> fields;
1782 if (putDataMode_ == PutDataMode::SYNC) {
1783 for (const auto &field : tableSchema.fields) {
1784 useFields.insert(field.colName);
1785 }
1786 fields = tableSchema.fields;
1787 } else {
1788 for (const auto &field : vBucket) {
1789 if (field.first.empty() || field.first[0] == '#') {
1790 continue;
1791 }
1792 useFields.insert(field.first);
1793 }
1794 for (const auto &field : tableSchema.fields) {
1795 if (useFields.find(field.colName) == useFields.end()) {
1796 continue;
1797 }
1798 fields.push_back(field);
1799 }
1800 }
1801 return fields;
1802 }
1803
UpdateRecordFlag(const std::string & tableName,const std::string & sql,const LogInfo & logInfo)1804 int SQLiteSingleVerRelationalStorageExecutor::UpdateRecordFlag(const std::string &tableName, const std::string &sql,
1805 const LogInfo &logInfo)
1806 {
1807 bool useHashKey = false;
1808 if (logInfo.cloudGid.empty() && logInfo.dataKey == DBConstant::DEFAULT_ROW_ID) {
1809 if (logInfo.hashKey.empty()) {
1810 LOGE("[RDBExecutor] Update record flag failed with invalid args!");
1811 return -E_INVALID_ARGS;
1812 }
1813 useHashKey = true;
1814 }
1815 sqlite3_stmt *stmt = nullptr;
1816 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1817 if (errCode != E_OK) {
1818 LOGE("[Storage Executor] Get stmt failed when update record flag, %d", errCode);
1819 return errCode;
1820 }
1821 int ret = E_OK;
1822 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, logInfo.timestamp); // 1 is timestamp
1823 if (errCode != E_OK) {
1824 LOGE("[Storage Executor] Bind timestamp to update record flag stmt failed, %d", errCode);
1825 SQLiteUtils::ResetStatement(stmt, true, ret);
1826 return errCode;
1827 }
1828 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, logInfo.timestamp); // 2 is timestamp
1829 if (errCode != E_OK) {
1830 LOGE("[Storage Executor] Bind timestamp to update record status stmt failed, %d", errCode);
1831 SQLiteUtils::ResetStatement(stmt, true, ret);
1832 return errCode;
1833 }
1834 if (useHashKey) {
1835 errCode = SQLiteUtils::BindBlobToStatement(stmt, 3, logInfo.hashKey); // 3 is hash_key
1836 if (errCode != E_OK) {
1837 LOGE("[Storage Executor] Bind hashKey to update record flag stmt failed, %d", errCode);
1838 SQLiteUtils::ResetStatement(stmt, true, ret);
1839 return errCode;
1840 }
1841 }
1842 errCode = SQLiteUtils::StepWithRetry(stmt);
1843 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1844 errCode = E_OK;
1845 } else {
1846 LOGE("[Storage Executor]Step update record flag stmt failed, %d", errCode);
1847 }
1848 SQLiteUtils::ResetStatement(stmt, true, ret);
1849 return errCode == E_OK ? ret : errCode;
1850 }
1851
MarkFlagAsUploadFinished(const std::string & tableName,const Key & hashKey,Timestamp timestamp)1852 void SQLiteSingleVerRelationalStorageExecutor::MarkFlagAsUploadFinished(const std::string &tableName,
1853 const Key &hashKey, Timestamp timestamp)
1854 {
1855 sqlite3_stmt *stmt = nullptr;
1856 int errCode = SQLiteUtils::GetStatement(dbHandle_, CloudStorageUtils::GetUpdateUploadFinishedSql(tableName),
1857 stmt);
1858 int index = 1;
1859 errCode = SQLiteUtils::BindInt64ToStatement(stmt, index++, timestamp);
1860 if (errCode != E_OK) {
1861 SQLiteUtils::ResetStatement(stmt, true, errCode);
1862 LOGW("[Storage Executor] Bind timestamp to update record flag for upload finished stmt failed, %d", errCode);
1863 return;
1864 }
1865 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, hashKey);
1866 if (errCode != E_OK) {
1867 SQLiteUtils::ResetStatement(stmt, true, errCode);
1868 LOGW("[Storage Executor] Bind hashKey to update record flag for upload finished stmt failed, %d", errCode);
1869 return;
1870 }
1871 errCode = SQLiteUtils::StepWithRetry(stmt);
1872 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1873 errCode = E_OK;
1874 } else {
1875 LOGE("[Storage Executor]Step update record flag for upload finished stmt failed, %d", errCode);
1876 }
1877 SQLiteUtils::ResetStatement(stmt, true, errCode);
1878 }
1879
GetWaitCompensatedSyncDataPk(const TableSchema & table,std::vector<VBucket> & data)1880 int SQLiteSingleVerRelationalStorageExecutor::GetWaitCompensatedSyncDataPk(const TableSchema &table,
1881 std::vector<VBucket> &data)
1882 {
1883 std::string sql = "SELECT ";
1884 std::vector<Field> pkFields;
1885 for (const auto &field : table.fields) {
1886 if (!field.primary) {
1887 continue;
1888 }
1889 sql += "b." + field.colName + ",";
1890 pkFields.push_back(field);
1891 }
1892 if (pkFields.empty()) {
1893 // ignore no pk table
1894 return E_OK;
1895 }
1896 sql.pop_back();
1897 sql += CloudStorageUtils::GetLeftJoinLogSql(table.name) + " WHERE " + FLAG_IS_WAIT_COMPENSATED_SYNC;
1898 sqlite3_stmt *stmt = nullptr;
1899 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1900 if (errCode != E_OK) {
1901 LOGE("[RDBExecutor] Get stmt failed when get wait compensated sync pk! errCode = %d..", errCode);
1902 return errCode;
1903 }
1904 do {
1905 errCode = SQLiteUtils::StepWithRetry(stmt);
1906 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1907 VBucket pkData;
1908 errCode = GetRecordFromStmt(stmt, pkFields, 0, pkData);
1909 if (errCode != E_OK) {
1910 LOGE("[RDBExecutor] Get record failed when get wait compensated sync pk! errCode = %d.", errCode);
1911 break;
1912 }
1913 data.push_back(pkData);
1914 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1915 errCode = E_OK;
1916 break;
1917 } else {
1918 LOGE("[RDBExecutor] Step failed when get wait compensated sync pk! errCode = %d.", errCode);
1919 break;
1920 }
1921 } while (errCode == E_OK);
1922 int ret = E_OK;
1923 SQLiteUtils::ResetStatement(stmt, true, ret);
1924 return errCode == E_OK ? ret : errCode;
1925 }
1926
ClearUnLockingStatus(const std::string & tableName)1927 int SQLiteSingleVerRelationalStorageExecutor::ClearUnLockingStatus(const std::string &tableName)
1928 {
1929 std::string sql;
1930 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET status = (CASE WHEN status == 1 "+
1931 "AND (cloud_gid = '' AND flag & 0x01 != 0) THEN 0 ELSE status END);";
1932 sqlite3_stmt *stmt = nullptr;
1933 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1934 if (errCode != E_OK) {
1935 LOGE("[RDBExecutor] Get stmt failed when clear unlocking status errCode = %d.", errCode);
1936 return errCode;
1937 }
1938 int ret = E_OK;
1939 errCode = SQLiteUtils::StepWithRetry(stmt);
1940 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1941 errCode = E_OK;
1942 } else {
1943 LOGE("[Storage Executor]Step update record status stmt failed, %d", errCode);
1944 }
1945 SQLiteUtils::ResetStatement(stmt, true, ret);
1946 return errCode == E_OK ? ret : errCode;
1947 }
1948
GetRecordFromStmt(sqlite3_stmt * stmt,const std::vector<Field> & fields,int startIndex,VBucket & record)1949 int SQLiteSingleVerRelationalStorageExecutor::GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields,
1950 int startIndex, VBucket &record)
1951 {
1952 int errCode = E_OK;
1953 for (const auto &field : fields) {
1954 Type cloudValue;
1955 errCode = SQLiteRelationalUtils::GetCloudValueByType(stmt, field.type, startIndex, cloudValue);
1956 if (errCode != E_OK) {
1957 break;
1958 }
1959 errCode = PutVBucketByType(record, field, cloudValue);
1960 if (errCode != E_OK) {
1961 break;
1962 }
1963 startIndex++;
1964 }
1965 return errCode;
1966 }
1967
BindShareValueToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)1968 int SQLiteSingleVerRelationalStorageExecutor::BindShareValueToInsertLogStatement(const VBucket &vBucket,
1969 const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
1970 {
1971 int errCode = E_OK;
1972 std::string version;
1973 if (putDataMode_ == PutDataMode::SYNC) {
1974 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, vBucket, version);
1975 if ((errCode != E_OK && errCode != -E_NOT_FOUND)) {
1976 LOGE("get version for insert log statement failed, %d", errCode);
1977 return -E_CLOUD_ERROR;
1978 }
1979 }
1980 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 10, version); // 10 is version
1981 if (errCode != E_OK) {
1982 LOGE("Bind version to insert log statement failed, %d", errCode);
1983 return errCode;
1984 }
1985
1986 std::string shareUri;
1987 if (putDataMode_ == PutDataMode::SYNC) {
1988 errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
1989 vBucket, shareUri);
1990 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1991 LOGE("get shareUri for insert log statement failed, %d", errCode);
1992 return -E_CLOUD_ERROR;
1993 }
1994 }
1995
1996 errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 11, shareUri); // 11 is sharing_resource
1997 if (errCode != E_OK) {
1998 LOGE("Bind shareUri to insert log statement failed, %d", errCode);
1999 }
2000 return errCode;
2001 }
2002 } // namespace DistributedDB
2003 #endif