1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef SQLITE_SINGLE_VER_NATURAL_STORE_H
16 #define SQLITE_SINGLE_VER_NATURAL_STORE_H
17 #include <atomic>
18 #include <mutex>
19 
20 #include "isyncer.h"
21 #include "kv_storage_handle.h"
22 #include "kv_store_nb_conflict_data_impl.h"
23 #include "runtime_context.h"
24 #include "single_ver_natural_store.h"
25 #include "single_ver_natural_store_commit_notify_data.h"
26 #include "sqlite_cloud_kv_store.h"
27 #include "sqlite_single_ver_continue_token.h"
28 #include "sqlite_single_ver_storage_engine.h"
29 #include "sqlite_utils.h"
30 
31 namespace DistributedDB {
32 class SQLiteSingleVerNaturalStore : public SingleVerNaturalStore, public KvStorageHandle {
33 public:
34     SQLiteSingleVerNaturalStore();
35     ~SQLiteSingleVerNaturalStore() override;
36 
37     // Delete the copy and assign constructors
38     DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerNaturalStore);
39 
40     // Open the database
41     int Open(const KvDBProperties &kvDBProp) override;
42 
43     // Invoked automatically when connection count is zero
44     void Close() override;
45 
46     // Create a connection object.
47     GenericKvDBConnection *NewConnection(int &errCode) override;
48 
49     // Get interface type of this kvdb.
50     int GetInterfaceType() const override;
51 
52     // Get the interface ref-count, in order to access asynchronously.1
53     void IncRefCount() override;
54 
55     // Drop the interface ref-count.
56     void DecRefCount() override;
57 
58     // Get the identifier of this kvdb.
59     std::vector<uint8_t> GetIdentifier() const override;
60     // Get the dual tuple identifier of this kvdb.
61     std::vector<uint8_t> GetDualTupleIdentifier() const override;
62 
63     // Get interface for syncer.
64     IKvDBSyncInterface *GetSyncInterface() override;
65 
66     int GetMetaData(const Key &key, Value &value) const override;
67 
68     int PutMetaData(const Key &key, const Value &value, bool isInTransaction) override;
69 
70     // Delete multiple meta data records in a transaction.
71     int DeleteMetaData(const std::vector<Key> &keys) override;
72     // Delete multiple meta data records with key prefix in a transaction.
73     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
74 
75     int GetAllMetaKeys(std::vector<Key> &keys) const override;
76 
77     int GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
78         const DataSizeSpecInfo &dataSizeInfo) const override;
79 
80     int GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries,
81         ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override;
82 
83     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
84         ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const override;
85 
86     int GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
87         const DataSizeSpecInfo &dataSizeInfo) const override;
88 
89     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
90         const DataSizeSpecInfo &dataSizeInfo) const override;
91 
92     void ReleaseContinueToken(ContinueToken &continueStmtToken) const override;
93 
94     int PutSyncDataWithQuery(const QueryObject &query, const std::vector<SingleVerKvEntry *> &entries,
95         const std::string &deviceName) override;
96 
97     void GetMaxTimestamp(Timestamp &stamp) const override;
98 
99     int Rekey(const CipherPassword &passwd) override;
100 
101     int Export(const std::string &filePath, const CipherPassword &passwd) override;
102 
103     int Import(const std::string &filePath, const CipherPassword &passwd) override;
104 
105     // In sync procedure, call this function
106     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
107 
108     // In local procedure, call this function
109     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify, bool isInSync);
110 
111     // remove device data for cloud
112     int RemoveDeviceData(const std::string &deviceName, ClearMode mode);
113 
114     // remove device data for cloud and user
115     int RemoveDeviceData(const std::string &deviceName, const std::string &user, ClearMode mode);
116     SQLiteSingleVerStorageExecutor *GetHandle(bool isWrite, int &errCode,
117         OperatePerm perm = OperatePerm::NORMAL_PERM) const;
118 
119     void ReleaseHandle(SQLiteSingleVerStorageExecutor *&handle) const;
120 
121     int TransObserverTypeToRegisterFunctionType(int observerType, RegisterFuncType &type) const override;
122 
123     int TransConflictTypeToRegisterFunctionType(int conflictType, RegisterFuncType &type) const override;
124 
125     bool CheckWritePermission() const override;
126 
127     SchemaObject GetSchemaInfo() const override;
128 
129     bool CheckCompatible(const std::string &schema, uint8_t type) const override;
130 
131     Timestamp GetCurrentTimestamp(bool needStartSync = true) override;
132 
133     SchemaObject GetSchemaObject() const;
134 
135     const SchemaObject &GetSchemaObjectConstRef() const;
136 
137     const KvDBProperties &GetDbProperties() const override;
138 
139     int GetKvDBSize(const KvDBProperties &properties, uint64_t &size) const override;
140     KvDBProperties &GetDbPropertyForUpdate();
141 
142     int InitDatabaseContext(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt = false);
143 
144     int RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier);
145 
146     int SetAutoLifeCycleTime(uint32_t time);
147 
148     int GetSecurityOption(SecurityOption &option) const override;
149 
150     bool IsDataMigrating() const override;
151 
152     void SetConnectionFlag(bool isExisted) const override;
153 
154     int TriggerToMigrateData() const;
155 
156     int CheckValueAndAmendIfNeed(ValueSource sourceType, const Value &oriValue, Value &amendValue,
157         bool &useAmendValue) const;
158 
159     int CheckReadDataControlled() const;
160     bool IsCacheDBMode() const;
161     bool IsExtendedCacheDBMode() const;
162 
163     void IncreaseCacheRecordVersion() const;
164     uint64_t GetCacheRecordVersion() const;
165     uint64_t GetAndIncreaseCacheRecordVersion() const;
166 
167     void NotifyRemotePushFinished(const std::string &targetId) const override;
168 
169     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
170 
171     int CheckIntegrity() const override;
172 
173     int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override;
174     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
175 
176     // Check and init query object for query sync and subscribe, flatbuffer schema will always return E_NOT_SUPPORT.
177     // return E_OK if subscribe is legal, ERROR on exception.
178     int CheckAndInitQueryCondition(QueryObject &query) const override;
179 
180     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
181         const std::string &targetID, bool isPush) const override;
182 
183     void SetSendDataInterceptor(const PushDataInterceptor &interceptor) override;
184 
185     int AddSubscribe(const std::string &subscribeId, const QueryObject &query, bool needCacheSubscribe) override;
186 
187     int RemoveSubscribe(const std::string &subscribeId) override;
188 
189     int RemoveSubscribe(const std::vector<std::string> &subscribeIds) override;
190 
191     int SetMaxLogSize(uint64_t limit);
192 
193     uint64_t GetMaxLogSize() const;
194 
195     void Dump(int fd) override;
196 
197     int IsSupportSubscribe() const override;
198 
199     void AbortHandle() override;
200 
201     void EnableHandle() override;
202 
203     int TryHandle() const override;
204 
205     std::pair<int, SQLiteSingleVerStorageExecutor*> GetStorageExecutor(bool isWrite) override;
206 
207     void RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor) override;
208 
209     TimeOffset GetLocalTimeOffsetForCloud() override;
210 
211     int SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema);
212 
213     int RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action);
214 
215     int UnRegisterObserverAction(const KvStoreObserver *observer);
216 
217     int GetCloudVersion(const std::string &device, std::map<std::string, std::string> &versionMap);
218 
219     void SetReceiveDataInterceptor(const DataInterceptor &interceptor) override;
220 
221     int SetCloudSyncConfig(const CloudSyncConfig &config);
222 
223     uint64_t GetTimestampFromDB() override;
224 
225     // for test mock
GetCloudKvStore()226     const SqliteCloudKvStore* GetCloudKvStore()
227     {
228         return sqliteCloudKvStore_;
229     }
230 protected:
231     void AsyncDataMigration(SQLiteSingleVerStorageEngine *storageEngine) const;
232 
233     void ReleaseResources();
234 
235     ICloudSyncStorageInterface *GetICloudSyncInterface() const override;
236 
237     std::map<std::string, DataBaseSchema> GetDataBaseSchemas() override;
238 
239     bool CheckSchemaSupportForCloudSync() const override;
240 private:
241 
242     int CheckDatabaseRecovery(const KvDBProperties &kvDBProp);
243 
244     int RegisterNotification();
245 
246     int SaveSyncDataItems(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
247         bool checkValueContent);
248 
249     int InitStorageEngine(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt);
250 
251     void InitialLocalDataTimestamp();
252 
253     int GetSchema(SchemaObject &schema) const;
254 
255     static void InitDataBaseOption(const KvDBProperties &kvDBProp, OpenDbProperties &option);
256 
257     static int SetUserVer(const KvDBProperties &kvDBProp, int version);
258 
259     void NotifyRemovedData(std::vector<Entry> &entries);
260 
261     // Decide read only based on schema situation
262     int DecideReadOnlyBaseOnSchema(const KvDBProperties &kvDBProp, bool &isReadOnly,
263         SchemaObject &savedSchemaObj) const;
264 
265     void HeartBeatForLifeCycle() const;
266 
267     int StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier) const;
268 
269     int ResetLifeCycleTimer() const;
270 
271     int StopLifeCycleTimer() const;
272     void InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *committedData);
273 
274     // Change value that should be amended, and neglect value that is incompatible
275     void CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const;
276 
277     int RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify) const;
278 
279     int RemoveDeviceDataNormally(const std::string &hashDev, bool isNeedNotify);
280 
281     int SaveSyncDataToMain(const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo);
282 
283     // Currently, this function only suitable to be call from sync in insert_record_from_sync procedure
284     // Take attention if future coder attempt to call it in other situation procedure
285     int SaveSyncItems(const QueryObject& query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
286         Timestamp &maxTimestamp, SingleVerNaturalStoreCommitNotifyData *commitData) const;
287 
288     int SaveSyncDataToCacheDB(const QueryObject &query, std::vector<DataItem> &dataItems,
289         const DeviceInfo &deviceInfo);
290 
291     int SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor *handle, const QueryObject &query,
292         std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo, Timestamp &maxTimestamp) const;
293 
294     int GetSyncDataForQuerySync(std::vector<DataItem> &dataItems, SQLiteSingleVerContinueToken *&continueStmtToken,
295         const DataSizeSpecInfo &dataSizeInfo) const;
296 
297     int SaveCreateDBTime();
298     int SaveCreateDBTimeIfNotExisted();
299 
300     virtual int GetAndInitStorageEngine(const KvDBProperties &kvDBProp);
301 
302     int RemoveAllSubscribe();
303 
304     int GetExistsDeviceList(std::set<std::string> &devices) const;
305 
306     int RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify);
307 
308     int RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode);
309 
310     int RemoveDeviceDataInner(const std::string &hashDev, const std::string &user, ClearMode mode);
311 
312     void GetAndResizeLocalIdentity(std::string &outTarget) const;
313 
314     DECLARE_OBJECT_TAG(SQLiteSingleVerNaturalStore);
315 
316     mutable std::shared_mutex engineMutex_;
317     SQLiteSingleVerStorageEngine *storageEngine_;
318 
319     bool notificationEventsRegistered_;
320     bool notificationConflictEventsRegistered_;
321     bool isInitialized_;
322     bool isReadOnly_;
323     mutable std::mutex initialMutex_;
324     mutable std::mutex lifeCycleMutex_;
325     mutable DatabaseLifeCycleNotifier lifeCycleNotifier_;
326     mutable TimerId lifeTimerId_;
327     uint32_t autoLifeTime_;
328     mutable Timestamp createDBTime_;
329     mutable std::mutex createDBTimeMutex_;
330 
331     mutable std::shared_mutex dataInterceptorMutex_;
332     PushDataInterceptor pushDataInterceptor_;
333     DataInterceptor receiveDataInterceptor_;
334 
335     std::atomic<uint64_t> maxLogSize_;
336 
337     mutable std::shared_mutex abortHandleMutex_;
338     OperatePerm abortPerm_;
339 
340     mutable std::mutex cloudStoreMutex_;
341     SqliteCloudKvStore *sqliteCloudKvStore_;
342 };
343 } // namespace DistributedDB
344 #endif // SQLITE_SINGLE_VER_NATURAL_STORE_H
345