1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SQLITE_RELATIONAL_STORE_H 16 #define SQLITE_RELATIONAL_STORE_H 17 #ifdef RELATIONAL_STORE 18 19 #include <functional> 20 #include <memory> 21 #include <vector> 22 23 #include "irelational_store.h" 24 #include "sqlite_single_relational_storage_engine.h" 25 #include "isyncer.h" 26 #include "sync_able_engine.h" 27 #include "relational_sync_able_storage.h" 28 #include "runtime_context.h" 29 #include "cloud/cloud_syncer.h" 30 31 namespace DistributedDB { 32 using RelationalObserverAction = 33 std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>; 34 class SQLiteRelationalStore : public IRelationalStore { 35 public: 36 SQLiteRelationalStore() = default; 37 ~SQLiteRelationalStore() override; 38 39 // Delete the copy and assign constructors 40 DISABLE_COPY_ASSIGN_MOVE(SQLiteRelationalStore); 41 42 RelationalStoreConnection *GetDBConnection(int &errCode) override; 43 int Open(const RelationalDBProperties &properties) override; 44 void OnClose(const std::function<void(void)> ¬ifier); 45 46 SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode) const; 47 void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 48 49 int Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId); 50 51 int32_t GetCloudSyncTaskCount(); 52 53 int CleanCloudData(ClearMode mode); 54 55 void ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection); 56 57 void WakeUpSyncer() override; 58 59 // for test mock GetStorageEngine()60 const RelationalSyncAbleStorage *GetStorageEngine() 61 { 62 return storageEngine_; 63 } 64 65 int CreateDistributedTable(const std::string &tableName, TableSyncType syncType, bool trackerSchemaChanged = false); 66 67 int RemoveDeviceData(); 68 int RemoveDeviceData(const std::string &device, const std::string &tableName); 69 70 int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer, 71 const RelationalObserverAction &action); 72 int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer); 73 int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier); 74 75 std::string GetStorePath() const override; 76 77 RelationalDBProperties GetProperties() const override; 78 79 void StopSync(uint64_t connectionId); 80 81 void Dump(int fd) override; 82 83 int RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout, 84 uint64_t connectionId, std::shared_ptr<ResultSet> &result); 85 86 int SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb); 87 88 int PrepareAndSetCloudDbSchema(const DataBaseSchema &schema); 89 90 int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 91 92 int ChkSchema(const TableName &tableName); 93 94 int Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId); 95 96 int SetTrackerTable(const TrackerSchema &trackerSchema); 97 98 int ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records); 99 100 int CleanTrackerData(const std::string &tableName, int64_t cursor); 101 102 int SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty); 103 104 int Pragma(PragmaCmd cmd, PragmaData &pragmaData); 105 106 int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 107 108 int SetCloudSyncConfig(const CloudSyncConfig &config); 109 110 SyncProcess GetCloudTaskStatus(uint64_t taskId); 111 private: 112 void ReleaseResources(); 113 114 // 1 store 1 connection 115 void DecreaseConnectionCounter(uint64_t connectionId); 116 int CheckDBMode(); 117 int GetSchemaFromMeta(RelationalSchemaObject &schema); 118 int SaveSchemaToMeta(); 119 int CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet); 120 int SaveTableModeToMeta(DistributedTableMode mode); 121 int CheckProperties(RelationalDBProperties properties); 122 123 int SaveLogTableVersionToMeta(); 124 125 int CleanDistributedDeviceTable(); 126 127 int StopLifeCycleTimer(); 128 int StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier); 129 void HeartBeat(); 130 int ResetLifeCycleTimer(); 131 132 void IncreaseConnectionCounter(); 133 int InitStorageEngine(const RelationalDBProperties &properties); 134 135 int EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList); 136 137 std::string GetDevTableName(const std::string &device, const std::string &hashDev) const; 138 139 int GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const; 140 141 int RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device, 142 const std::string &tableName, bool isNeedHash); 143 144 int GetExistDevices(std::set<std::string> &hashDevices) const; 145 146 std::vector<std::string> GetAllDistributedTableName(); 147 148 int CheckBeforeSync(const CloudSyncOption &option); 149 150 int CheckQueryValid(const CloudSyncOption &option); 151 152 int CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object, bool isFromTable); 153 154 int CheckTableName(const std::vector<std::string> &tableNames); 155 156 void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, 157 CloudSyncer::CloudTaskInfo &info); 158 159 int CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle, std::set<std::string> &clearWaterMarkTable); 160 161 int InitTrackerSchemaFromMeta(); 162 163 void AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields, 164 std::vector<Field> &addFields); 165 166 bool CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo, std::vector<Field> &addFields); 167 168 bool PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames, 169 std::map<std::string, std::vector<Field>> &updateTableNames, 170 std::map<std::string, std::string> &alterTableNames); 171 172 int ExecuteCreateSharedTable(const DataBaseSchema &schema); 173 174 int CheckParamForUpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records); 175 176 int CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records); 177 178 int InitSQLiteStorageEngine(const RelationalDBProperties &properties); 179 180 static int ReFillSyncInfoTable(const std::vector<std::string> &actualTable, CloudSyncer::CloudTaskInfo &info); 181 182 int CheckCloudSchema(const DataBaseSchema &schema); 183 184 // use for sync Interactive 185 std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr; // For storage operate sync function 186 // use ref obj same as kv 187 RelationalSyncAbleStorage *storageEngine_ = nullptr; // For storage operate data 188 std::shared_ptr<SQLiteSingleRelationalStorageEngine> sqliteStorageEngine_; 189 CloudSyncer *cloudSyncer_ = nullptr; 190 191 std::mutex connectMutex_; 192 std::atomic<int> connectionCount_ = 0; 193 std::vector<std::function<void(void)>> closeNotifiers_; 194 195 mutable std::mutex initalMutex_; 196 bool isInitialized_ = false; 197 198 // lifeCycle 199 std::mutex lifeCycleMutex_; 200 DatabaseLifeCycleNotifier lifeCycleNotifier_; 201 TimerId lifeTimerId_ {}; 202 }; 203 } // namespace DistributedDB 204 #endif 205 #endif // SQLITE_RELATIONAL_STORE_H