1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef RELATIONAL_SYNC_ABLE_STORAGE_H
16 #define RELATIONAL_SYNC_ABLE_STORAGE_H
17 #ifdef RELATIONAL_STORE
18 
19 #include "cloud/cloud_upload_recorder.h"
20 #include "cloud/schema_mgr.h"
21 #include "icloud_sync_storage_interface.h"
22 #include "lru_map.h"
23 #include "relational_db_sync_interface.h"
24 #include "relationaldb_properties.h"
25 #include "sqlite_single_relational_storage_engine.h"
26 #include "sqlite_single_ver_relational_continue_token.h"
27 #include "sync_able_engine.h"
28 
29 namespace DistributedDB {
30 using RelationalObserverAction =
31     std::function<void(const std::string &device, ChangedData &&changedData, bool isChangedData)>;
32 class RelationalSyncAbleStorage : public RelationalDBSyncInterface, public ICloudSyncStorageInterface,
33     public virtual RefObject {
34 public:
35     explicit RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine);
36     ~RelationalSyncAbleStorage() override;
37 
38     // Get interface type of this kvdb.
39     int GetInterfaceType() const override;
40 
41     // Get the interface ref-count, in order to access asynchronously.
42     void IncRefCount() override;
43 
44     // Drop the interface ref-count.
45     void DecRefCount() override;
46 
47     // Get the identifier of this rdb.
48     std::vector<uint8_t> GetIdentifier() const override;
49 
50     // Get the dual tuple identifier of this rdb.
51     std::vector<uint8_t> GetDualTupleIdentifier() const override;
52 
53     // Get the max timestamp of all entries in database.
54     void GetMaxTimestamp(Timestamp &stamp) const override;
55 
56     // Get the max timestamp of one table.
57     int GetMaxTimestamp(const std::string &tableName, Timestamp &stamp) const override;
58 
59     // Get meta data associated with the given key.
60     int GetMetaData(const Key &key, Value &value) const override;
61 
62     // Put meta data as a key-value entry.
63     int PutMetaData(const Key &key, const Value &value) override;
64 
65     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
66 
67     // Delete multiple meta data records in a transaction.
68     int DeleteMetaData(const std::vector<Key> &keys) override;
69 
70     // Delete multiple meta data records with key prefix in a transaction.
71     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
72 
73     // Get all meta data keys.
74     int GetAllMetaKeys(std::vector<Key> &keys) const override;
75 
76     const RelationalDBProperties &GetDbProperties() const override;
77 
78     // Get the data which would be synced with query condition
79     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
80         const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
81         std::vector<SingleVerKvEntry *> &entries) const override;
82 
83     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
84         const DataSizeSpecInfo &dataSizeInfo) const override;
85 
86     int PutSyncDataWithQuery(const QueryObject &object, const std::vector<SingleVerKvEntry *> &entries,
87         const DeviceID &deviceName) override;
88 
89     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
90 
91     RelationalSchemaObject GetSchemaInfo() const override;
92 
93     int GetSecurityOption(SecurityOption &option) const override;
94 
95     void NotifyRemotePushFinished(const std::string &deviceId) const override;
96 
97     // Get the timestamp when database created or imported
98     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
99 
100     std::vector<QuerySyncObject> GetTablesQuery() override;
101 
102     int LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj) override;
103 
104     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
105         const std::string &targetID, bool isPush) const override;
106 
107     int CheckAndInitQueryCondition(QueryObject &query) const override;
108     int RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
109         const RelationalObserverAction &action);
110     int UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer);
111     void TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData, bool isChangedData) override;
112 
113     int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override;
114 
115     int RegisterSchemaChangedCallback(const std::function<void()> &callback) override;
116 
117     void NotifySchemaChanged();
118 
119     void RegisterHeartBeatListener(const std::function<void()> &listener);
120 
121     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
122 
123     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
124 
125     int ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize, RelationalRowDataSet &data,
126         ContinueToken &token) const override;
127 
128     int SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema, uint8_t type) override;
129 
130     int GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj) override;
131 
132     void ReleaseRemoteQueryContinueToken(ContinueToken &token) const override;
133 
134     // recycling the write handle
135     void SetReusedHandle(StorageExecutor *handle);
136 
137     int StartTransaction(TransactType type) override;
138 
139     int Commit() override;
140 
141     int Rollback() override;
142 
143     int GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp, bool isCloudForcePush,
144         bool isCompensatedTask, int64_t &count) override;
145 
146     int GetAllUploadCount(const QuerySyncObject &query, const std::vector<Timestamp> &timestampVec,
147         bool isCloudForcePush, bool isCompensatedTask, int64_t &count) override;
148 
149     int GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &object, const Timestamp &beginTime,
150         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
151 
152     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) override;
153 
154     int GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject, bool isCloudForcePush,
155         bool isCompensatedTask, std::vector<std::string> &cloudGid) override;
156 
157     int ReleaseCloudDataToken(ContinueToken &continueStmtToken) override;
158 
159     int GetSchemaFromDB(RelationalSchemaObject &schema) override;
160 
161     int ChkSchema(const TableName &tableName) override;
162 
163     int SetCloudDbSchema(const DataBaseSchema &schema) override;
164 
165     int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema) override;
166 
167     int GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema) override;
168 
169     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
170         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo) override;
171 
172     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData) override;
173 
174     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
175         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets) override;
176 
177     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess) override;
178 
179     int SetLogTriggerStatus(bool status) override;
180     int SetCursorIncFlag(bool flag) override;
181 
182     int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid) override;
183 
184     void SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine);
185 
186     std::string GetIdentify() const override;
187 
188     void EraseDataChangeCallback(uint64_t connectionId);
189 
190     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
191 
192     int CheckQueryValid(const QuerySyncObject &query) override;
193 
194     int CreateTempSyncTrigger(const std::string &tableName) override;
195     int GetAndResetServerObserverData(const std::string &tableName, ChangeProperties &changeProperties) override;
196     int ClearAllTempSyncTrigger() override;
197     bool IsSharedTable(const std::string &tableName) override;
198 
199     std::map<std::string, std::string> GetSharedTableOriginNames();
200 
201     void SetLogicDelete(bool logicDelete);
202 
203     void SetCloudTaskConfig(const CloudTaskConfig &config) override;
204 
205     std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
206         const Bytes &hashKey, VBucket &assets) override;
207 
208     int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader) override;
209 
210     int UpsertData(RecordStatus status, const std::string &tableName, const std::vector<VBucket> &records);
211 
212     int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo) override;
213 
214     int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users) override;
215 
216     int ClearUnLockingNoNeedCompensated() override;
217 
218     int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
219         const std::set<std::string> &gidFilters) override;
220 
221     CloudSyncConfig GetCloudSyncConfig() const override;
222 
223     void SetCloudSyncConfig(const CloudSyncConfig &config);
224 
225     bool IsTableExistReference(const std::string &table) override;
226 
227     bool IsTableExistReferenceOrReferenceBy(const std::string &table) override;
228 
229     void ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type, Timestamp localMark) override;
230 protected:
231     int FillReferenceData(CloudSyncData &syncData);
232 
233     int GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
234         const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo);
235 
236     int PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
237         DownloadData &downloadData);
238 
239     virtual int GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
240         std::map<int64_t, Entries> &referenceGid);
241 
242     int FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle, OpType opType,
243         const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid);
244 
245     int UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
246         const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock = false);
247 
248     static int FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
249         const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend);
250 
251 private:
252     SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode,
253         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
254     SQLiteSingleVerRelationalStorageExecutor *GetHandleExpectTransaction(bool isWrite, int &errCode,
255         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
256     void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const;
257 
258     // get
259     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerRelationalContinueToken *&token,
260         const DataSizeSpecInfo &dataSizeInfo) const;
261     int GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
262         std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const;
263 
264     int GetTableReference(const std::string &tableName,
265         std::map<std::string, std::vector<TableReferenceProperty>> &reference);
266 
267     std::pair<std::string, int> GetSourceTableName(const std::string &tableName);
268 
269     std::pair<std::string, int> GetSharedTargetTableName(const std::string &tableName);
270     // put
271     int PutSyncData(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
272     int SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems, const std::string &deviceName);
273     void FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type);
274     StoreInfo GetStoreInfo() const;
275 
276     bool IsCurrentLogicDelete() const;
277 
278     int UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
279         const std::vector<VBucket> &records);
280 
281     int UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
282         const std::vector<VBucket> &records);
283 
284     int GetCloudTableWithoutShared(std::vector<TableSchema> &tables);
285 
286     int GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
287         const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery);
288 
289     int CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
290         bool flag = false);
291 
292     bool CheckTableSupportCompensatedSync(const TableSchema &table);
293 
294     void ExecuteDataChangeCallback(
295         const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
296         const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt);
297     // data
298     std::shared_ptr<SQLiteSingleRelationalStorageEngine> storageEngine_ = nullptr;
299     std::function<void()> onSchemaChanged_;
300     mutable std::mutex onSchemaChangedMutex_;
301     std::mutex dataChangeDeviceMutex_;
302     std::map<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> dataChangeCallbackMap_;
303     std::function<void()> heartBeatListener_;
304     mutable std::mutex heartBeatMutex_;
305 
306     LruMap<std::string, std::string> remoteDeviceSchema_;
307     StorageExecutor *reusedHandle_;
308     mutable std::mutex reusedHandleMutex_;
309 
310     // cache securityOption
311     mutable std::mutex securityOptionMutex_;
312     mutable SecurityOption securityOption_;
313     mutable bool isCachedOption_;
314 
315     SQLiteSingleVerRelationalStorageExecutor *transactionHandle_ = nullptr;
316     mutable std::shared_mutex transactionMutex_; // used for transaction
317 
318     SchemaMgr schemaMgr_;
319     mutable std::shared_mutex schemaMgrMutex_;
320     std::shared_ptr<SyncAbleEngine> syncAbleEngine_ = nullptr;
321 
322     std::atomic<bool> logicDelete_ = false;
323     std::atomic<bool> allowLogicDelete_ = false;
324 
325     std::function<void (void)> syncFinishFunc_;
326     std::function<void (void)> uploadStartFunc_;
327 
328     mutable std::mutex configMutex_;
329     CloudSyncConfig cloudSyncConfig_;
330 
331     CloudUploadRecorder uploadRecorder_;
332 };
333 }  // namespace DistributedDB
334 #endif
335 #endif // RELATIONAL_SYNC_ABLE_STORAGE_H
336