1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H 16 #define SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H 17 #ifdef RELATIONAL_STORE 18 19 #include <atomic> 20 #include "cloud/asset_operation_utils.h" 21 #include "cloud/cloud_db_constant.h" 22 #include "cloud/cloud_store_types.h" 23 #include "cloud/cloud_upload_recorder.h" 24 #include "data_transformer.h" 25 #include "db_types.h" 26 #include "icloud_sync_storage_interface.h" 27 #include "macro_utils.h" 28 #include "query_object.h" 29 #include "relational_row_data.h" 30 #include "relational_store_delegate.h" 31 #include "relational_sync_data_inserter.h" 32 #include "sqlite_single_ver_relational_continue_token.h" 33 #include "sqlite_storage_executor.h" 34 #include "sqlite_utils.h" 35 #include "tracker_table.h" 36 37 namespace DistributedDB { 38 class SQLiteSingleVerRelationalStorageExecutor : public SQLiteStorageExecutor { 39 public: 40 enum class PutDataMode { 41 SYNC, 42 USER 43 }; 44 enum class MarkFlagOption { 45 DEFAULT, 46 SET_WAIT_COMPENSATED_SYNC 47 }; 48 49 struct UpdateCursorContext { 50 int errCode = E_OK; 51 int cursor; 52 }; 53 54 SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable, DistributedTableMode mode); 55 ~SQLiteSingleVerRelationalStorageExecutor() override = default; 56 57 // Delete the copy and assign constructors 58 DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerRelationalStorageExecutor); 59 60 // The parameter "identity" is a hash string that identifies a device 61 int CreateDistributedTable(DistributedTableMode mode, bool isUpgraded, const std::string &identity, 62 TableInfo &table, TableSyncType syncType); 63 64 int UpgradeDistributedTable(const std::string &tableName, DistributedTableMode mode, bool &schemaChanged, 65 RelationalSchemaObject &schema, TableSyncType syncType); 66 67 int StartTransaction(TransactType type); 68 int Commit(); 69 int Rollback(); 70 71 // For Get sync data 72 int GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength, const DataSizeSpecInfo &sizeInfo, 73 std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt, const TableInfo &tableInfo); 74 75 // operation of meta data 76 int GetKvData(const Key &key, Value &value) const; 77 int PutKvData(const Key &key, const Value &value) const; 78 int DeleteMetaData(const std::vector<Key> &keys) const; 79 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const; 80 int GetAllMetaKeys(std::vector<Key> &keys) const; 81 82 // For Put sync data 83 int SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans = true); 84 85 int CheckDBModeForRelational(); 86 87 int DeleteDistributedDeviceTable(const std::string &device, const std::string &tableName); 88 89 int DeleteDistributedAllDeviceTableLog(const std::string &tableName); 90 91 int DeleteDistributedDeviceTableLog(const std::string &device, const std::string &tableName); 92 93 int DeleteDistributedLogTable(const std::string &tableName); 94 95 int CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames, 96 std::vector<std::string> &missingTables); 97 98 int CreateDistributedDeviceTable(const std::string &device, const TableInfo &baseTbl, const StoreInfo &info); 99 100 int CheckQueryObjectLegal(const TableInfo &table, QueryObject &query, const std::string &schemaVersion); 101 102 int CheckQueryObjectLegal(const QuerySyncObject &query); 103 104 int GetMaxTimestamp(const std::vector<std::string> &tablesName, Timestamp &maxTimestamp) const; 105 106 int ExecuteQueryBySqlStmt(const std::string &sql, const std::vector<std::string> &bindArgs, int packetSize, 107 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data); 108 109 int SaveSyncDataItems(RelationalSyncDataInserter &inserter); 110 111 int CheckEncryptedOrCorrupted() const; 112 113 int GetExistsDeviceList(std::set<std::string> &devices) const; 114 115 int GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush, bool isCompensatedTask, 116 QuerySyncObject &query, int64_t &count); 117 118 int GetAllUploadCount(const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, bool isCompensatedTask, 119 QuerySyncObject &query, int64_t &count); 120 121 int UpdateCloudLogGid(const CloudSyncData &cloudDataResult, bool ignoreEmptyGid); 122 123 int GetSyncCloudData(const CloudUploadRecorder &uploadRecorder, CloudSyncData &cloudDataResult, 124 SQLiteSingleVerRelationalContinueToken &token); 125 126 int GetSyncCloudGid(QuerySyncObject &query, const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy, 127 bool isCompensatedTask, std::vector<std::string> &cloudGid); 128 129 void SetLocalSchema(const RelationalSchemaObject &localSchema); 130 131 int GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema, const VBucket &vBucket, 132 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 133 134 int PutCloudSyncData(const std::string &tableName, const TableSchema &tableSchema, 135 const TrackerTable &trackerTable, DownloadData &downloadData); 136 137 int FillCloudAssetForDownload(const TableSchema &tableSchema, VBucket &vBucket, bool isDownloadSuccess); 138 int DoCleanInner(ClearMode mode, const std::vector<std::string> &tableNameList, 139 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets, 140 std::vector<std::string> ¬ifyTableList); 141 142 int FillCloudAssetForUpload(OpType opType, const TableSchema &tableSchema, const CloudSyncBatch &data); 143 int FillCloudVersionForUpload(const OpType opType, const CloudSyncData &data); 144 145 int SetLogTriggerStatus(bool status); 146 int SetCursorIncFlag(bool flag); 147 148 int AnalysisTrackerTable(const TrackerTable &trackerTable, TableInfo &tableInfo); 149 int CreateTrackerTable(const TrackerTable &trackerTable, bool isUpgrade); 150 int GetOrInitTrackerSchemaFromMeta(RelationalSchemaObject &schema); 151 int ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records); 152 153 int GetClearWaterMarkTables(const std::vector<TableReferenceProperty> &tableReferenceProperty, 154 const RelationalSchemaObject &schema, std::set<std::string> &clearWaterMarkTables); 155 int CreateTempSyncTrigger(const TrackerTable &trackerTable, bool flag); 156 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties); 157 int ClearAllTempSyncTrigger(); 158 int CleanTrackerData(const std::string &tableName, int64_t cursor, bool isOnlyTrackTable); 159 int CreateSharedTable(const TableSchema &schema); 160 int DeleteTable(const std::vector<std::string> &tableNames); 161 int UpdateSharedTable(const std::map<std::string, std::vector<Field>> &updateTableNames); 162 int AlterTableName(const std::map<std::string, std::string> &tableNames); 163 int DeleteTableTrigger(const std::string &missTable) const; 164 165 int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 166 const std::map<std::string, std::vector<TableReferenceProperty>> &tableReference, 167 std::map<int64_t, Entries> &referenceGid); 168 169 int CheckIfExistUserTable(const std::string &tableName); 170 171 void SetLogicDelete(bool isLogicDelete); 172 int RenewTableTrigger(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType); 173 174 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 175 const Bytes &hashKey, VBucket &assets); 176 177 int FillHandleWithOpType(const OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid, 178 const TableSchema &tableSchema); 179 180 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 181 182 int CleanResourceForDroppedTable(const std::string &tableName); 183 184 void SetPutDataMode(PutDataMode mode); 185 186 void SetMarkFlagOption(MarkFlagOption option); 187 188 int UpgradedLogForExistedData(TableInfo &tableInfo, bool schemaChanged); 189 190 int UpdateRecordFlag(const std::string &tableName, const std::string &sql, const LogInfo &logInfo); 191 192 void MarkFlagAsUploadFinished(const std::string &tableName, const Key &hashKey, Timestamp timestamp); 193 194 int CleanUploadFinishedFlag(const std::string &tableName); 195 196 int GetWaitCompensatedSyncDataPk(const TableSchema &table, std::vector<VBucket> &data); 197 198 int ClearUnLockingStatus(const std::string &tableName); 199 200 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 201 const std::set<std::string> &gidFilters); 202 203 int CheckInventoryData(const std::string &tableName); 204 205 int UpdateRecordStatus(const std::string &tableName, const std::string &status, const Key &hashKey); 206 207 void SetUploadConfig(int32_t maxUploadCount, int32_t maxUploadSize); 208 209 int InitCursorToMeta(const std::string &tableName); 210 211 void SetTableSchema(const TableSchema &tableSchema); 212 213 int CompareSchemaTableColumns(const std::string &tableName); 214 private: 215 int DoCleanLogs(const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema); 216 217 int DoCleanLogAndData(const std::vector<std::string> &tableNameList, 218 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets); 219 220 int CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode); 221 222 int CleanCloudDataAndLogOnUserTable(const std::string &tableName, const std::string &logTableName, 223 const RelationalSchemaObject &localSchema); 224 225 int ChangeCloudDataFlagOnLogTable(const std::string &logTableName); 226 227 int SetDataOnUserTableWithLogicDelete(const std::string &tableName, const std::string &logTableName); 228 229 static void UpdateCursor(sqlite3_context *ctx, int argc, sqlite3_value **argv); 230 231 int CreateFuncUpdateCursor(UpdateCursorContext &context, 232 void(*updateCursorFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const; 233 234 int GetCursor(const std::string &tableName); 235 236 int SetCursor(const std::string &tableName, int cursor); 237 238 int IncreaseCursorOnAssetData(const std::string &tableName, const std::string &gid); 239 240 int GetCleanCloudDataKeys(const std::string &logTableName, std::vector<int64_t> &dataKeys, 241 bool distinguishCloudFlag); 242 243 int GetCloudAssets(const std::string &tableName, const std::vector<FieldInfo> &fieldInfos, 244 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 245 246 int GetAssetOnTable(const std::string &tableName, const std::string &fieldName, 247 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 248 249 int GetCloudAssetsOnTable(const std::string &tableName, const std::string &fieldName, 250 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 251 252 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem, bool isGettingDeletedData) const; 253 254 int GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt, DataItem &itemGet); 255 256 int CheckDataConflictDefeated(const DataItem &item, sqlite3_stmt *queryStmt, bool &isDefeated); 257 258 int SaveSyncDataItem(RelationalSyncDataInserter &inserter, SaveSyncDataStmt &saveStmt, DataItem &item); 259 260 int SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt, RelationalSyncDataInserter &inserter, 261 int64_t &rowid); 262 263 int DeleteSyncDataItem(const DataItem &dataItem, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt); 264 265 int GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem, LogInfo &logInfoGet); 266 267 int SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt, const DataItem &dataItem, int64_t rowid); 268 269 int AlterAuxTableForUpgrade(const TableInfo &oldTableInfo, const TableInfo &newTableInfo); 270 271 int DeleteSyncLog(const DataItem &item, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmLogStmt); 272 int ProcessMissQueryData(const DataItem &item, RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, 273 sqlite3_stmt *&rmLogStmt); 274 int GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item); 275 int GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData, sqlite3_stmt *queryStmt, DataItem &item, 276 Timestamp &queryTime); 277 int GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item, Timestamp &missQueryTime); 278 279 void SetTableInfo(const TableInfo &tableInfo); // When put or get sync data, must call the func first. 280 281 int GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName, const std::string &calPrimaryKeyHash, 282 TableInfo &tableInfo); 283 int CleanExtendAndCursorForDeleteData(const std::string &tableName); 284 285 int GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder, sqlite3_stmt *statement, 286 CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize); 287 288 int PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue); 289 290 int ExecutePutCloudData(const std::string &tableName, const TableSchema &tableSchema, 291 const TrackerTable &trackerTable, DownloadData &downloadData, std::map<int, int> &statisticMap); 292 293 std::string GetInsertSqlForCloudSync(const TableSchema &tableSchema); 294 295 int GetPrimaryKeyHashValue(const VBucket &vBucket, const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, 296 bool allowEmpty = false); 297 298 int GetQueryLogStatement(const TableSchema &tableSchema, const VBucket &vBucket, const std::string &querySql, 299 const Key &hashKey, sqlite3_stmt *&selectStmt); 300 301 int GetQueryLogSql(const std::string &tableName, const VBucket &vBucket, const std::set<std::string> &pkSet, 302 std::string &querySql); 303 304 int GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket, std::set<std::string> &pkSet, 305 std::vector<Field> &assetFields, std::string &querySql); 306 307 int GetFillDownloadAssetStatement(const std::string &tableName, const VBucket &vBucket, 308 const std::vector<Field> &fields, sqlite3_stmt *&statement); 309 310 int InitFillUploadAssetStatement(OpType opType, const TableSchema &tableSchema, const CloudSyncBatch &data, 311 const int &index, sqlite3_stmt *&statement); 312 313 int GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo); 314 315 int GetInfoByStatement(sqlite3_stmt *statement, const std::vector<Field> &assetFields, 316 const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 317 318 int InsertCloudData(VBucket &vBucket, const TableSchema &tableSchema, const TrackerTable &trackerTable, 319 int64_t dataKey); 320 321 int InsertLogRecord(const TableSchema &tableSchema, const TrackerTable &trackerTable, VBucket &vBucket); 322 323 int BindOneField(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *updateStmt); 324 325 int BindValueToUpsertStatement(const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *upsertStmt); 326 327 int BindStatusSubQueryHashKeyStatement(sqlite3_stmt *insertLogStmt, std::vector<uint8_t> &hashKey); 328 329 int BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 330 const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt); 331 332 int BindShareValueToInsertLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 333 sqlite3_stmt *insertLogStmt); 334 335 int BindValueToInsertLogStatement(VBucket &vBucket, const TableSchema &tableSchema, 336 const TrackerTable &trackerTable, sqlite3_stmt *insertLogStmt); 337 338 std::string GetWhereConditionForDataTable(const std::string &gidStr, const std::set<std::string> &pkSet, 339 const std::string &tableName, bool queryByPk = true); 340 341 int GetUpdateSqlForCloudSync(const std::vector<Field> &updateFields, const TableSchema &tableSchema, 342 const std::string &gidStr, const std::set<std::string> &pkSet, std::string &updateSql); 343 344 int GetUpdateDataTableStatement(const VBucket &vBucket, const TableSchema &tableSchema, sqlite3_stmt *&updateStmt); 345 346 int UpdateCloudData(VBucket &vBucket, const TableSchema &tableSchema); 347 348 int GetUpdateLogRecordStatement(const TableSchema &tableSchema, const VBucket &vBucket, OpType opType, 349 std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt); 350 int AppendUpdateLogRecordWhereSqlCondition(const TableSchema &tableSchema, const VBucket &vBucket, 351 std::string &sql); 352 353 int UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema, OpType opType); 354 355 int BindValueToUpdateLogStatement(const VBucket &vBucket, const TableSchema &tableSchema, 356 const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty, sqlite3_stmt *updateLogStmt); 357 358 int GetDeleteStatementForCloudSync(const TableSchema &tableSchema, const std::set<std::string> &pkSet, 359 const VBucket &vBucket, sqlite3_stmt *&deleteStmt); 360 361 int DeleteCloudData(const std::string &tableName, const VBucket &vBucket, const TableSchema &tableSchema, 362 const TrackerTable &trackerTable); 363 364 int IsTableOnceDropped(const std::string &tableName, bool &onceDropped); 365 366 int OnlyUpdateLogTable(const VBucket &vBucket, const TableSchema &tableSchema, OpType opType); 367 368 int BindUpdateVersionStatement(const VBucket &vBucket, const Bytes &hashKey, sqlite3_stmt *&stmt); 369 int DoCleanShareTableDataAndLog(const std::vector<std::string> &tableNameList); 370 371 int GetReferenceGidInner(const std::string &sourceTable, const std::string &targetTable, 372 const CloudSyncBatch &syncBatch, const std::vector<TableReferenceProperty> &targetTableReference, 373 std::map<int64_t, Entries> &referenceGid); 374 375 int GetReferenceGidByStmt(sqlite3_stmt *statement, const CloudSyncBatch &syncBatch, 376 const std::string &targetTable, std::map<int64_t, Entries> &referenceGid); 377 378 static std::string GetReferenceGidSql(const std::string &sourceTable, const std::string &targetTable, 379 const std::vector<std::string> &sourceFields, const std::vector<std::string> &targetFields); 380 381 static std::pair<std::vector<std::string>, std::vector<std::string>> SplitReferenceByField( 382 const std::vector<TableReferenceProperty> &targetTableReference); 383 384 int BindStmtWithCloudGid(const CloudSyncData &cloudDataResult, bool ignoreEmptyGid, sqlite3_stmt *&stmt); 385 386 std::string GetCloudDeleteSql(const std::string &table); 387 388 int RemoveDataAndLog(const std::string &tableName, int64_t dataKey); 389 390 int64_t GetLocalDataKey(size_t index, const DownloadData &downloadData); 391 392 int BindStmtWithCloudGidInner(const std::string &gid, int64_t rowid, 393 sqlite3_stmt *&stmt, int &fillGidCount); 394 395 int DoCleanAssetId(const std::string &tableName, const RelationalSchemaObject &localSchema); 396 397 int CleanAssetId(const std::string &tableName, const std::vector<FieldInfo> &fieldInfos, 398 const std::vector<int64_t> &dataKeys); 399 400 int UpdateAssetIdOnUserTable(const std::string &tableName, const std::string &fieldName, 401 const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets); 402 403 int GetAssetsAndUpdateAssetsId(const std::string &tableName, const std::string &fieldName, 404 const std::vector<int64_t> &dataKeys); 405 406 int CleanAssetsIdOnUserTable(const std::string &tableName, const std::string &fieldName, const int64_t rowId, 407 const std::vector<uint8_t> &assetsValue); 408 409 int InitGetAssetStmt(const std::string &sql, const std::string &gid, const Bytes &hashKey, 410 sqlite3_stmt *&stmt); 411 412 int GetAssetsByRowId(sqlite3_stmt *&selectStmt, Assets &assets); 413 414 int ExecuteFillDownloadAssetStatement(sqlite3_stmt *&stmt, int beginIndex, const std::string &cloudGid); 415 416 int CleanDownloadChangedAssets(const VBucket &vBucket, const AssetOperationUtils::RecordAssetOpType &assetOpType); 417 418 int GetAndBindFillUploadAssetStatement(const std::string &tableName, const VBucket &assets, 419 sqlite3_stmt *&statement); 420 421 int OnlyUpdateAssetId(const std::string &tableName, const TableSchema &tableSchema, const VBucket &vBucket, 422 int64_t dataKey, OpType opType); 423 424 void UpdateLocalAssetId(const VBucket &vBucket, const std::string &fieldName, Asset &asset); 425 426 void UpdateLocalAssetsId(const VBucket &vBucket, const std::string &fieldName, Assets &assets); 427 428 void UpdateLocalAssetsIdInner(const Assets &cloudAssets, Assets &assets); 429 430 int BindAssetToBlobStatement(const Asset &asset, int index, sqlite3_stmt *&stmt); 431 432 int BindAssetsToBlobStatement(const Assets &assets, int index, sqlite3_stmt *&stmt); 433 434 int GetAssetInfoOnTable(sqlite3_stmt *&stmt, const std::vector<Field> &assetFields, VBucket &assetInfo); 435 436 int GetAssetOnTableInner(sqlite3_stmt *&stmt, Asset &asset); 437 438 int GetAssetOnTable(const std::string &tableName, const std::string &fieldName, const int64_t dataKey, 439 Asset &asset); 440 441 int GetAssetsOnTableInner(sqlite3_stmt *&stmt, Assets &assets); 442 443 int GetAssetsOnTable(const std::string &tableName, const std::string &fieldName, const int64_t dataKey, 444 Assets &assets); 445 446 int BindAssetFiledToBlobStatement(const TableSchema &tableSchema, const std::vector<Asset> &assetOfOneRecord, 447 const std::vector<Assets> &assetsOfOneRecord, sqlite3_stmt *&stmt); 448 449 int UpdateAssetsIdForOneRecord(const TableSchema &tableSchema, const std::string &sql, 450 const std::vector<Asset> &assetOfOneRecord, const std::vector<Assets> &assetsOfOneRecord); 451 452 bool IsNeedUpdateAssetId(const TableSchema &tableSchema, int64_t dataKey, const VBucket &vBucket); 453 454 bool IsNeedUpdateAssetIdInner(sqlite3_stmt *selectStmt, const VBucket &vBucket, const Field &field, 455 VBucket &assetInfo); 456 457 int UpdateAssetId(const TableSchema &tableSchema, int64_t dataKey, const VBucket &vBucket); 458 459 int64_t GetDataFlag(); 460 461 std::string GetUpdateDataFlagSql(); 462 463 std::string GetDev(); 464 465 std::vector<Field> GetUpdateField(const VBucket &vBucket, const TableSchema &tableSchema); 466 467 int GetRecordFromStmt(sqlite3_stmt *stmt, const std::vector<Field> &fields, int startIndex, VBucket &record); 468 469 int FillCloudVersionForUpload(const std::string &tableName, const CloudSyncBatch &batchData); 470 471 int QueryCount(const std::string &tableName, int64_t &count); 472 473 int GetUploadCountInner(const Timestamp ×tamp, SqliteQueryHelper &helper, std::string &sql, int64_t &count); 474 475 int LogicDeleteCloudData(const std::string &tableName, const VBucket &vBucket, 476 const TableSchema &tableSchema, const TrackerTable &trackerTable); 477 478 static constexpr const char *CONSISTENT_FLAG = "0x20"; 479 static constexpr const char *UPDATE_FLAG_CLOUD = "flag = flag & 0x20"; 480 static constexpr const char *UPDATE_FLAG_WAIT_COMPENSATED_SYNC = "flag = flag | 0x10"; 481 static constexpr const char *FLAG_IS_WAIT_COMPENSATED_SYNC = 482 "(a.flag & 0x10 != 0 and a.status = 0) or a.status = 1"; 483 484 std::string baseTblName_; 485 TableInfo table_; // Always operating table, user table when get, device table when put. 486 TableSchema tableSchema_; // for cloud table 487 RelationalSchemaObject localSchema_; 488 489 DistributedTableMode mode_; 490 491 std::map<int32_t, std::function<int(int, const VBucket &, const Field &, sqlite3_stmt *)>> bindCloudFieldFuncMap_; 492 std::map<int32_t, std::function<int(const VBucket &, const Field &, std::vector<uint8_t> &)>> toVectorFuncMap_; 493 494 std::atomic<bool> isLogicDelete_; 495 std::shared_ptr<IAssetLoader> assetLoader_; 496 497 PutDataMode putDataMode_; 498 MarkFlagOption markFlagOption_; 499 500 std::atomic<int32_t> maxUploadCount_; 501 std::atomic<int32_t> maxUploadSize_; 502 }; 503 } // namespace DistributedDB 504 #endif 505 #endif // SQLITE_SINGLE_VER_RELATIONAL_STORAGE_EXECUTOR_H