1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef NATIVE_RDB_RDB_STORE_IMPL_H 17 #define NATIVE_RDB_RDB_STORE_IMPL_H 18 19 #include <list> 20 #include <map> 21 #include <memory> 22 #include <mutex> 23 #include <shared_mutex> 24 #include <thread> 25 26 #include "concurrent_map.h" 27 #include "connection_pool.h" 28 #include "data_ability_observer_stub.h" 29 #include "dataobs_mgr_client.h" 30 #include "rdb_errno.h" 31 #include "rdb_service.h" 32 #include "rdb_store.h" 33 #include "rdb_store_config.h" 34 #include "rdb_types.h" 35 #include "refbase.h" 36 #include "sqlite_statement.h" 37 #include "value_object.h" 38 39 namespace OHOS { 40 class ExecutorPool; 41 } 42 43 namespace OHOS::NativeRdb { 44 class DelayNotify; 45 class RdbStoreLocalObserver { 46 public: RdbStoreLocalObserver(DistributedRdb::RdbStoreObserver * observer)47 explicit RdbStoreLocalObserver(DistributedRdb::RdbStoreObserver *observer) : observer_(observer){}; ~RdbStoreLocalObserver()48 virtual ~RdbStoreLocalObserver(){}; OnChange()49 void OnChange() 50 { 51 observer_->OnChange(); 52 } getObserver()53 DistributedRdb::RdbStoreObserver *getObserver() 54 { 55 return observer_; 56 } 57 58 private: 59 DistributedRdb::RdbStoreObserver *observer_ = nullptr; 60 }; 61 62 class RdbStoreLocalSharedObserver : public AAFwk::DataAbilityObserverStub { 63 public: RdbStoreLocalSharedObserver(DistributedRdb::RdbStoreObserver * observer)64 explicit RdbStoreLocalSharedObserver(DistributedRdb::RdbStoreObserver *observer) : observer_(observer){}; ~RdbStoreLocalSharedObserver()65 virtual ~RdbStoreLocalSharedObserver(){}; OnChange()66 void OnChange() override 67 { 68 observer_->OnChange(); 69 } getObserver()70 DistributedRdb::RdbStoreObserver *getObserver() 71 { 72 return observer_; 73 } 74 75 private: 76 DistributedRdb::RdbStoreObserver *observer_ = nullptr; 77 }; 78 79 class RdbStoreImpl : public RdbStore { 80 public: 81 RdbStoreImpl(const RdbStoreConfig &config); 82 RdbStoreImpl(const RdbStoreConfig &config, int &errCode); 83 ~RdbStoreImpl() override; 84 std::pair<int, int64_t> Insert(const std::string &table, const Row &row, Resolution resolution) override; 85 std::pair<int, int64_t> BatchInsert(const std::string& table, const ValuesBuckets& values) override; 86 std::pair<int, int> Update(const std::string &table, const Row &row, const std::string &where, const Values &args, 87 Resolution resolution) override; 88 int Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args) override; 89 std::shared_ptr<AbsSharedResultSet> QuerySql(const std::string &sql, const Values &args) override; 90 std::shared_ptr<ResultSet> QueryByStep(const std::string &sql, const Values &args, bool preCount) override; 91 std::shared_ptr<ResultSet> RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates, 92 const Fields &columns, int &errCode) override; 93 std::pair<int32_t, std::shared_ptr<ResultSet>> QuerySharingResource(const AbsRdbPredicates &predicates, 94 const Fields &columns) override; 95 int ExecuteSql(const std::string &sql, const Values &args) override; 96 std::pair<int32_t, ValueObject> Execute(const std::string &sql, const Values &args, int64_t trxId) override; 97 int ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args) override; 98 int ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args) override; 99 int ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args) override; 100 int ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args) override; 101 int GetVersion(int &version) override; 102 int SetVersion(int version) override; 103 int BeginTransaction() override; 104 std::pair<int, int64_t> BeginTrans() override; 105 int RollBack() override; 106 int RollBack(int64_t trxId) override; 107 int Commit() override; 108 int Commit(int64_t trxId) override; 109 bool IsInTransaction() override; 110 bool IsOpen() const override; 111 std::string GetPath() override; 112 bool IsReadOnly() const override; 113 bool IsMemoryRdb() const override; 114 bool IsHoldingConnection() override; 115 bool IsSlaveDiffFromMaster() const override; 116 int Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey) override; 117 int Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey) override; 118 int Count(int64_t &outValue, const AbsRdbPredicates &predicates) override; 119 int SetDistributedTables(const std::vector<std::string> &tables, int32_t type, 120 const DistributedRdb::DistributedConfig &distributedConfig) override; 121 std::string ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode) override; 122 int Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &async) override; 123 int Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async) override; 124 int Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async) override; 125 int Subscribe(const SubscribeOption &option, RdbStoreObserver *observer) override; 126 int UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer) override; 127 int SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer) override; 128 int UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer) override; 129 int RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer) override; 130 int UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer) override; 131 int Notify(const std::string &event) override; 132 int SetSearchable(bool isSearchable) override; 133 ModifyTime GetModifyTime(const std::string &table, const std::string &columnName, 134 std::vector<PRIKey> &keys) override; 135 int GetRebuilt(RebuiltType &rebuilt) override; 136 int CleanDirtyData(const std::string &table, uint64_t cursor) override; 137 std::pair<int32_t, int32_t> Attach(const RdbStoreConfig &config, const std::string &attachName, 138 int32_t waitTime) override; 139 std::pair<int32_t, int32_t> Detach(const std::string &attachName, int32_t waitTime) override; 140 int ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock) override; 141 int32_t GetDbType() const override; 142 std::pair<int32_t, uint32_t> LockCloudContainer() override; 143 int32_t UnlockCloudContainer() override; 144 int InterruptBackup() override; 145 int32_t GetBackupStatus() const override; 146 std::pair<int32_t, std::shared_ptr<Transaction>> CreateTransaction(int32_t type) override; 147 148 // not virtual functions / 149 const RdbStoreConfig &GetConfig(); 150 int ConfigLocale(const std::string &localeStr); 151 std::string GetName(); 152 std::string GetFileType(); 153 int32_t ExchangeSlaverToMaster(); 154 155 protected: 156 std::string GetLogTableName(const std::string &tableName) override; 157 158 private: 159 using Stmt = std::shared_ptr<Statement>; 160 using RdbParam = DistributedRdb::RdbSyncerParam; 161 using Options = DistributedRdb::RdbService::Option; 162 using Memo = DistributedRdb::PredicatesMemo; 163 class CloudTables { 164 public: 165 int32_t AddTables(const std::vector<std::string> &tables); 166 int32_t RmvTables(const std::vector<std::string> &tables); 167 bool Change(const std::string &table); 168 std::set<std::string> Steal(); 169 170 private: 171 std::mutex mutex_; 172 std::set<std::string> tables_; 173 std::set<std::string> changes_; 174 }; 175 176 static void AfterOpen(const RdbParam ¶m, int32_t retry = 0); 177 int InnerOpen(); 178 void InitSyncerParam(const RdbStoreConfig &config, bool created); 179 int ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute = false, 180 const std::vector<ValueObject> &bindArgs = {}); 181 std::pair<int32_t, ValueObject> HandleDifferentSqlTypes(std::shared_ptr<Statement> statement, 182 const std::string &sql, const ValueObject &object, int sqlType); 183 int CheckAttach(const std::string &sql); 184 std::pair<int32_t, Stmt> BeginExecuteSql(const std::string &sql); 185 int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath); 186 void DoCloudSync(const std::string &table); 187 static int InnerSync(const RdbParam ¶m, const Options &option, const Memo &predicates, 188 const AsyncDetail &async); 189 int InnerBackup(const std::string &databasePath, 190 const std::vector<uint8_t> &destEncryptKey = std::vector<uint8_t>()); 191 ModifyTime GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys); 192 Uri GetUri(const std::string &event); 193 int SubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer); 194 int SubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer); 195 int32_t SubscribeLocalDetail(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer); 196 int SubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer); 197 int UnSubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer); 198 int UnSubscribeLocalAll(const SubscribeOption &option); 199 int UnSubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer); 200 int UnSubscribeLocalSharedAll(const SubscribeOption &option); 201 int32_t UnsubscribeLocalDetail(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer); 202 int UnSubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer); 203 int RegisterDataChangeCallback(); 204 void InitDelayNotifier(); 205 std::pair<int32_t, std::shared_ptr<Connection>> CreateWritableConn(); 206 std::vector<ValueObject> CreateBackupBindArgs(const std::string &databasePath, 207 const std::vector<uint8_t> &destEncryptKey); 208 std::pair<int32_t, Stmt> GetStatement(const std::string &sql, std::shared_ptr<Connection> conn) const; 209 std::pair<int32_t, Stmt> GetStatement(const std::string &sql, bool read = false) const; 210 int AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath, 211 const std::vector<uint8_t> &key, int32_t waitTime); 212 int SetDefaultEncryptSql(const std::shared_ptr<Statement> &statement, std::string sql, 213 const RdbStoreConfig &config); 214 int SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config); 215 int GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys); 216 int GetSlaveName(const std::string &dbName, std::string &backupFilePath); 217 void NotifyDataChange(); 218 bool TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore = false); 219 int GetDestPath(const std::string &backupPath, std::string &destPath); 220 221 static constexpr char SCHEME_RDB[] = "rdb://"; 222 static constexpr uint32_t EXPANSION = 2; 223 static inline constexpr uint32_t INTERVAL = 10; 224 static inline constexpr uint32_t RETRY_INTERVAL = 5; // s 225 static inline constexpr int32_t MAX_RETRY_TIMES = 5; 226 static constexpr const char *ROW_ID = "ROWID"; 227 228 bool isOpen_ = false; 229 bool isReadOnly_ = false; 230 bool isMemoryRdb_; 231 uint32_t rebuild_ = RebuiltType::NONE; 232 SlaveStatus slaveStatus_ = SlaveStatus::UNDEFINED; 233 int64_t vSchema_ = 0; 234 std::atomic<int64_t> newTrxId_ = 1; 235 const RdbStoreConfig config_; 236 DistributedRdb::RdbSyncerParam syncerParam_; 237 std::string path_; 238 std::string name_; 239 std::string fileType_; 240 mutable std::shared_mutex rwMutex_; 241 std::mutex mutex_; 242 std::shared_ptr<ConnectionPool> connectionPool_ = nullptr; 243 std::shared_ptr<DelayNotify> delayNotifier_ = nullptr; 244 std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>(); 245 std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_; 246 std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_; 247 ConcurrentMap<std::string, std::string> attachedInfo_; 248 ConcurrentMap<int64_t, std::shared_ptr<Connection>> trxConnMap_ = {}; 249 std::list<std::weak_ptr<Transaction>> transactions_; 250 }; 251 } // namespace OHOS::NativeRdb 252 #endif 253