1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H 16 #define RELATIONAL_SYNC_ABLE_STORAGE_H 17 #ifdef RELATIONAL_STORE 18 19 #include "cloud/cloud_upload_recorder.h" 20 #include "cloud/schema_mgr.h" 21 #include "icloud_sync_storage_interface.h" 22 #include "lru_map.h" 23 #include "relational_db_sync_interface.h" 24 #include "relationaldb_properties.h" 25 #include "sqlite_single_relational_storage_engine.h" 26 #include "sqlite_single_ver_relational_continue_token.h" 27 #include "sync_able_engine.h" 28 29 namespace DistributedDB { 30 using RelationalObserverAction = 31 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>; 32 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface, 33 public virtual RefObject { 34 public: 35 explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine); 36 ~RelationalSyncAbleStorage() override; 37 38 // Get interface type of this kvdb. 39 int GetInterfaceType() const override; 40 41 // Get the interface ref-count, in order to access asynchronously. 42 void IncRefCount() override; 43 44 // Drop the interface ref-count. 45 void DecRefCount() override; 46 47 // Get the identifier of this rdb. 48 std::vector<uint8_t> GetIdentifier() const override; 49 50 // Get the dual tuple identifier of this rdb. 51 std::vector<uint8_t> GetDualTupleIdentifier() const override; 52 53 // Get the max timestamp of all entries in database. 54 void GetMaxTimestamp(Timestamp &stamp) const override; 55 56 // Get the max timestamp of one table. 57 int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override; 58 59 // Get meta data associated with the given key. 60 int GetMetaData(const Key &key, Value &value) const override; 61 62 // Put meta data as a key-value entry. 63 int PutMetaData(const Key &key, const Value &value) override; 64 65 int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override; 66 67 // Delete multiple meta data records in a transaction. 68 int DeleteMetaData(const std::vector<Key> &keys) override; 69 70 // Delete multiple meta data records with key prefix in a transaction. 71 int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; 72 73 // Get all meta data keys. 74 int GetAllMetaKeys(std::vector<Key> &keys) const override; 75 76 const RelationalDBProperties &GetDbProperties() const override; 77 78 // Get the data which would be synced with query condition 79 int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, 80 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, 81 std::vector<SingleVerKvEntry *> &entries) const override; 82 83 int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken, 84 const DataSizeSpecInfo &dataSizeInfo) const override; 85 86 int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries, 87 const DeviceID &deviceName) override; 88 89 int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override; 90 91 RelationalSchemaObject GetSchemaInfo() const override; 92 93 int GetSecurityOption(SecurityOption &option) const override; 94 95 void NotifyRemotePushFinished(const std::string &deviceId) const override; 96 97 // Get the timestamp when database created or imported 98 int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; 99 100 std::vector<QuerySyncObject> GetTablesQuery() override; 101 102 int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override; 103 104 int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 105 const std::string &targetID, bool isPush) const override; 106 107 int CheckAndInitQueryCondition(QueryObject &query) const override; 108 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 109 const RelationalObserverAction &action); 110 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 111 void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override; 112 113 int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; 114 115 int RegisterSchemaChangedCallback(const std::function<void()> &callback) override; 116 117 void NotifySchemaChanged(); 118 119 void RegisterHeartBeatListener(const std::function<void()> &listener); 120 121 int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override; 122 123 bool CheckCompatible(const std::string &schema, uint8_t type) const override; 124 125 int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data, 126 ContinueToken &token) const override; 127 128 int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override; 129 130 int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override; 131 132 void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override; 133 134 // recycling the write handle 135 void SetReusedHandle(StorageExecutor *handle); 136 137 int StartTransaction(TransactType type) override; 138 139 int Commit() override; 140 141 int Rollback() override; 142 143 int GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp, bool isCloudForcePush, 144 bool isCompensatedTask, int64_t &count) override; 145 146 int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> ×tampVec, 147 bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override; 148 149 int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime, 150 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 151 152 int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override; 153 154 int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush, 155 bool isCompensatedTask, std::vector<std::string> &cloudGid) override; 156 157 int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override; 158 159 int GetSchemaFromDB(RelationalSchemaObject &schema) override; 160 161 int ChkSchema(const TableName &tableName) override; 162 163 int SetCloudDbSchema(const DataBaseSchema &schema) override; 164 165 int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override; 166 167 int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override; 168 169 int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket, 170 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override; 171 172 int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override; 173 174 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 175 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override; 176 177 int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override; 178 179 int SetLogTriggerStatus(bool status) override; 180 int SetCursorIncFlag(bool flag) override; 181 182 int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override; 183 184 void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine); 185 186 std::string GetIdentify() const override; 187 188 void EraseDataChangeCallback(uint64_t connectionId); 189 190 void ReleaseContinueToken(ContinueToken &continueStmtToken) const override; 191 192 int CheckQueryValid(const QuerySyncObject &query) override; 193 194 int CreateTempSyncTrigger(const std::string &tableName) override; 195 int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override; 196 int ClearAllTempSyncTrigger() override; 197 bool IsSharedTable(const std::string &tableName) override; 198 199 std::map<std::string, std::string> GetSharedTableOriginNames(); 200 201 void SetLogicDelete(bool logicDelete); 202 203 void SetCloudTaskConfig(const CloudTaskConfig &config) override; 204 205 std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid, 206 const Bytes &hashKey, VBucket &assets) override; 207 208 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override; 209 210 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 211 212 int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override; 213 214 int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users) override; 215 216 int ClearUnLockingNoNeedCompensated() override; 217 218 int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData, 219 const std::set<std::string> &gidFilters) override; 220 221 CloudSyncConfig GetCloudSyncConfig() const override; 222 223 void SetCloudSyncConfig(const CloudSyncConfig &config); 224 225 bool IsTableExistReference(const std::string &table) override; 226 227 bool IsTableExistReferenceOrReferenceBy(const std::string &table) override; 228 229 void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override; 230 protected: 231 int FillReferenceData(CloudSyncData &syncData); 232 233 int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 234 const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo); 235 236 int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 237 DownloadData &downloadData); 238 239 virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch, 240 std::map<int64_t, Entries> &referenceGid); 241 242 int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType, 243 const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid); 244 245 int UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 246 const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock = false); 247 248 static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid, 249 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend); 250 251 private: 252 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode, 253 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 254 SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode, 255 OperatePerm perm = OperatePerm::NORMAL_PERM) const; 256 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 257 258 // get 259 int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token, 260 const DataSizeSpecInfo &dataSizeInfo) const; 261 int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize, 262 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const; 263 264 int GetTableReference(const std::string &tableName, 265 std::map<std::string, std::vector<TableReferenceProperty>> &reference); 266 267 std::pair<std::string, int> GetSourceTableName(const std::string &tableName); 268 269 std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName); 270 // put 271 int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 272 int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName); 273 void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type); 274 StoreInfo GetStoreInfo() const; 275 276 bool IsCurrentLogicDelete() const; 277 278 int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 279 const std::vector<VBucket> &records); 280 281 int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 282 const std::vector<VBucket> &records); 283 284 int GetCloudTableWithoutShared(std::vector<TableSchema> &tables); 285 286 int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle, 287 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery); 288 289 int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName, 290 bool flag = false); 291 292 bool CheckTableSupportCompensatedSync(const TableSchema &table); 293 294 void ExecuteDataChangeCallback( 295 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, 296 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt); 297 // data 298 std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr; 299 std::function<void()> onSchemaChanged_; 300 mutable std::mutex onSchemaChangedMutex_; 301 std::mutex dataChangeDeviceMutex_; 302 std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_; 303 std::function<void()> heartBeatListener_; 304 mutable std::mutex heartBeatMutex_; 305 306 LruMap<std::string, std::string> remoteDeviceSchema_; 307 StorageExecutor *reusedHandle_; 308 mutable std::mutex reusedHandleMutex_; 309 310 // cache securityOption 311 mutable std::mutex securityOptionMutex_; 312 mutable SecurityOption securityOption_; 313 mutable bool isCachedOption_; 314 315 SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr; 316 mutable std::shared_mutex transactionMutex_; // used for transaction 317 318 SchemaMgr schemaMgr_; 319 mutable std::shared_mutex schemaMgrMutex_; 320 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; 321 322 std::atomic<bool> logicDelete_ = false; 323 std::atomic<bool> allowLogicDelete_ = false; 324 325 std::function<void (void)> syncFinishFunc_; 326 std::function<void (void)> uploadStartFunc_; 327 328 mutable std::mutex configMutex_; 329 CloudSyncConfig cloudSyncConfig_; 330 331 CloudUploadRecorder uploadRecorder_; 332 }; 333 } // namespace DistributedDB 334 #endif 335 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H 336