1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef NATIVE_RDB_RDB_STORE_IMPL_H
17 #define NATIVE_RDB_RDB_STORE_IMPL_H
18 
19 #include <list>
20 #include <map>
21 #include <memory>
22 #include <mutex>
23 #include <shared_mutex>
24 #include <thread>
25 
26 #include "concurrent_map.h"
27 #include "connection_pool.h"
28 #include "data_ability_observer_stub.h"
29 #include "dataobs_mgr_client.h"
30 #include "rdb_errno.h"
31 #include "rdb_service.h"
32 #include "rdb_store.h"
33 #include "rdb_store_config.h"
34 #include "rdb_types.h"
35 #include "refbase.h"
36 #include "sqlite_statement.h"
37 #include "value_object.h"
38 
39 namespace OHOS {
40 class ExecutorPool;
41 }
42 
43 namespace OHOS::NativeRdb {
44 class DelayNotify;
45 class RdbStoreLocalObserver {
46 public:
RdbStoreLocalObserver(DistributedRdb::RdbStoreObserver * observer)47     explicit RdbStoreLocalObserver(DistributedRdb::RdbStoreObserver *observer) : observer_(observer){};
~RdbStoreLocalObserver()48     virtual ~RdbStoreLocalObserver(){};
OnChange()49     void OnChange()
50     {
51         observer_->OnChange();
52     }
getObserver()53     DistributedRdb::RdbStoreObserver *getObserver()
54     {
55         return observer_;
56     }
57 
58 private:
59     DistributedRdb::RdbStoreObserver *observer_ = nullptr;
60 };
61 
62 class RdbStoreLocalSharedObserver : public AAFwk::DataAbilityObserverStub {
63 public:
RdbStoreLocalSharedObserver(DistributedRdb::RdbStoreObserver * observer)64     explicit RdbStoreLocalSharedObserver(DistributedRdb::RdbStoreObserver *observer) : observer_(observer){};
~RdbStoreLocalSharedObserver()65     virtual ~RdbStoreLocalSharedObserver(){};
OnChange()66     void OnChange() override
67     {
68         observer_->OnChange();
69     }
getObserver()70     DistributedRdb::RdbStoreObserver *getObserver()
71     {
72         return observer_;
73     }
74 
75 private:
76     DistributedRdb::RdbStoreObserver *observer_ = nullptr;
77 };
78 
79 class RdbStoreImpl : public RdbStore {
80 public:
81     RdbStoreImpl(const RdbStoreConfig &config);
82     RdbStoreImpl(const RdbStoreConfig &config, int &errCode);
83     ~RdbStoreImpl() override;
84     std::pair<int, int64_t> Insert(const std::string &table, const Row &row, Resolution resolution) override;
85     std::pair<int, int64_t> BatchInsert(const std::string& table, const ValuesBuckets& values) override;
86     std::pair<int, int> Update(const std::string &table, const Row &row, const std::string &where, const Values &args,
87         Resolution resolution) override;
88     int Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args) override;
89     std::shared_ptr<AbsSharedResultSet> QuerySql(const std::string &sql, const Values &args) override;
90     std::shared_ptr<ResultSet> QueryByStep(const std::string &sql, const Values &args, bool preCount) override;
91     std::shared_ptr<ResultSet> RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates,
92         const Fields &columns, int &errCode) override;
93     std::pair<int32_t, std::shared_ptr<ResultSet>> QuerySharingResource(const AbsRdbPredicates &predicates,
94         const Fields &columns) override;
95     int ExecuteSql(const std::string &sql, const Values &args) override;
96     std::pair<int32_t, ValueObject> Execute(const std::string &sql, const Values &args, int64_t trxId) override;
97     int ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args) override;
98     int ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args) override;
99     int ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args) override;
100     int ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args) override;
101     int GetVersion(int &version) override;
102     int SetVersion(int version) override;
103     int BeginTransaction() override;
104     std::pair<int, int64_t> BeginTrans() override;
105     int RollBack() override;
106     int RollBack(int64_t trxId) override;
107     int Commit() override;
108     int Commit(int64_t trxId) override;
109     bool IsInTransaction() override;
110     bool IsOpen() const override;
111     std::string GetPath() override;
112     bool IsReadOnly() const override;
113     bool IsMemoryRdb() const override;
114     bool IsHoldingConnection() override;
115     bool IsSlaveDiffFromMaster() const override;
116     int Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey) override;
117     int Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey) override;
118     int Count(int64_t &outValue, const AbsRdbPredicates &predicates) override;
119     int SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
120         const DistributedRdb::DistributedConfig &distributedConfig) override;
121     std::string ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode) override;
122     int Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &async) override;
123     int Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async) override;
124     int Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async) override;
125     int Subscribe(const SubscribeOption &option, RdbStoreObserver *observer) override;
126     int UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer) override;
127     int SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer) override;
128     int UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer) override;
129     int RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer) override;
130     int UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer) override;
131     int Notify(const std::string &event) override;
132     int SetSearchable(bool isSearchable) override;
133     ModifyTime GetModifyTime(const std::string &table, const std::string &columnName,
134         std::vector<PRIKey> &keys) override;
135     int GetRebuilt(RebuiltType &rebuilt) override;
136     int CleanDirtyData(const std::string &table, uint64_t cursor) override;
137     std::pair<int32_t, int32_t> Attach(const RdbStoreConfig &config, const std::string &attachName,
138         int32_t waitTime) override;
139     std::pair<int32_t, int32_t> Detach(const std::string &attachName, int32_t waitTime) override;
140     int ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock) override;
141     int32_t GetDbType() const override;
142     std::pair<int32_t, uint32_t> LockCloudContainer() override;
143     int32_t UnlockCloudContainer() override;
144     int InterruptBackup() override;
145     int32_t GetBackupStatus() const override;
146     std::pair<int32_t, std::shared_ptr<Transaction>> CreateTransaction(int32_t type) override;
147 
148     // not virtual functions /
149     const RdbStoreConfig &GetConfig();
150     int ConfigLocale(const std::string &localeStr);
151     std::string GetName();
152     std::string GetFileType();
153     int32_t ExchangeSlaverToMaster();
154 
155 protected:
156     std::string GetLogTableName(const std::string &tableName) override;
157 
158 private:
159     using Stmt = std::shared_ptr<Statement>;
160     using RdbParam = DistributedRdb::RdbSyncerParam;
161     using Options = DistributedRdb::RdbService::Option;
162     using Memo = DistributedRdb::PredicatesMemo;
163     class CloudTables {
164     public:
165         int32_t AddTables(const std::vector<std::string> &tables);
166         int32_t RmvTables(const std::vector<std::string> &tables);
167         bool Change(const std::string &table);
168         std::set<std::string> Steal();
169 
170     private:
171         std::mutex mutex_;
172         std::set<std::string> tables_;
173         std::set<std::string> changes_;
174     };
175 
176     static void AfterOpen(const RdbParam &param, int32_t retry = 0);
177     int InnerOpen();
178     void InitSyncerParam(const RdbStoreConfig &config, bool created);
179     int ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute = false,
180         const std::vector<ValueObject> &bindArgs = {});
181     std::pair<int32_t, ValueObject> HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,
182         const std::string &sql, const ValueObject &object, int sqlType);
183     int CheckAttach(const std::string &sql);
184     std::pair<int32_t, Stmt> BeginExecuteSql(const std::string &sql);
185     int GetDataBasePath(const std::string &databasePath, std::string &backupFilePath);
186     void DoCloudSync(const std::string &table);
187     static int InnerSync(const RdbParam &param, const Options &option, const Memo &predicates,
188         const AsyncDetail &async);
189     int InnerBackup(const std::string &databasePath,
190         const std::vector<uint8_t> &destEncryptKey = std::vector<uint8_t>());
191     ModifyTime GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys);
192     Uri GetUri(const std::string &event);
193     int SubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer);
194     int SubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer);
195     int32_t SubscribeLocalDetail(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer);
196     int SubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer);
197     int UnSubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer);
198     int UnSubscribeLocalAll(const SubscribeOption &option);
199     int UnSubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer);
200     int UnSubscribeLocalSharedAll(const SubscribeOption &option);
201     int32_t UnsubscribeLocalDetail(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer);
202     int UnSubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer);
203     int RegisterDataChangeCallback();
204     void InitDelayNotifier();
205     std::pair<int32_t, std::shared_ptr<Connection>> CreateWritableConn();
206     std::vector<ValueObject> CreateBackupBindArgs(const std::string &databasePath,
207         const std::vector<uint8_t> &destEncryptKey);
208     std::pair<int32_t, Stmt> GetStatement(const std::string &sql, std::shared_ptr<Connection> conn) const;
209     std::pair<int32_t, Stmt> GetStatement(const std::string &sql, bool read = false) const;
210     int AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
211         const std::vector<uint8_t> &key, int32_t waitTime);
212     int SetDefaultEncryptSql(const std::shared_ptr<Statement> &statement, std::string sql,
213         const RdbStoreConfig &config);
214     int SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config);
215     int GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys);
216     int GetSlaveName(const std::string &dbName, std::string &backupFilePath);
217     void NotifyDataChange();
218     bool TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore = false);
219     int GetDestPath(const std::string &backupPath, std::string &destPath);
220 
221     static constexpr char SCHEME_RDB[] = "rdb://";
222     static constexpr uint32_t EXPANSION = 2;
223     static inline constexpr uint32_t INTERVAL = 10;
224     static inline constexpr uint32_t RETRY_INTERVAL = 5; // s
225     static inline constexpr int32_t MAX_RETRY_TIMES = 5;
226     static constexpr const char *ROW_ID = "ROWID";
227 
228     bool isOpen_ = false;
229     bool isReadOnly_ = false;
230     bool isMemoryRdb_;
231     uint32_t rebuild_ = RebuiltType::NONE;
232     SlaveStatus slaveStatus_ = SlaveStatus::UNDEFINED;
233     int64_t vSchema_ = 0;
234     std::atomic<int64_t> newTrxId_ = 1;
235     const RdbStoreConfig config_;
236     DistributedRdb::RdbSyncerParam syncerParam_;
237     std::string path_;
238     std::string name_;
239     std::string fileType_;
240     mutable std::shared_mutex rwMutex_;
241     std::mutex mutex_;
242     std::shared_ptr<ConnectionPool> connectionPool_ = nullptr;
243     std::shared_ptr<DelayNotify> delayNotifier_ = nullptr;
244     std::shared_ptr<CloudTables> cloudInfo_ = std::make_shared<CloudTables>();
245     std::map<std::string, std::list<std::shared_ptr<RdbStoreLocalObserver>>> localObservers_;
246     std::map<std::string, std::list<sptr<RdbStoreLocalSharedObserver>>> localSharedObservers_;
247     ConcurrentMap<std::string, std::string> attachedInfo_;
248     ConcurrentMap<int64_t, std::shared_ptr<Connection>> trxConnMap_ = {};
249     std::list<std::weak_ptr<Transaction>> transactions_;
250 };
251 } // namespace OHOS::NativeRdb
252 #endif
253