1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef KVDB_SYNCABLE_TEST_H
17 #define KVDB_SYNCABLE_TEST_H
18 
19 #include <map>
20 #include <mutex>
21 #include <vector>
22 
23 #include "single_ver_kvdb_sync_interface.h"
24 #include "query_object.h"
25 #include "store_types.h"
26 
27 namespace DistributedDB {
28 struct VirtualDataItem {
29     Key key;
30     Value value;
31     Timestamp timestamp = 0;
32     Timestamp writeTimestamp = 0;
33     uint64_t flag = 0;
34     bool isLocal = true;
35     uint32_t deviceId = 0; // 0: means local
36     static const uint64_t DELETE_FLAG = 0x01;
37     static const uint64_t LOCAL_FLAG = 0x02;
38 };
39 
40 struct VirtualContinueToken {
41     Timestamp begin = 0u;
42     Timestamp end = 0u;
43 };
44 class VirtualSingleVerSyncDBInterface : public SingleVerKvDBSyncInterface {
45 public:
46     VirtualSingleVerSyncDBInterface();
47     int GetInterfaceType() const override;
48 
49     void IncRefCount() override;
50 
51     void DecRefCount() override;
52 
53     std::vector<uint8_t> GetIdentifier() const override;
54 
55     int GetMetaData(const Key& key, Value& value) const override;
56 
57     int PutMetaData(const Key& key, const Value& value, bool isInTransaction) override;
58 
59     int DeleteMetaData(const std::vector<Key> &keys) override;
60     // Delete multiple meta data records with key prefix in a transaction.
61     int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override;
62 
63     int GetAllMetaKeys(std::vector<Key>& keys) const override;
64 
65     int GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
66         ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override;
67 
68     int GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
69         const DataSizeSpecInfo &dataSizeInfo) const override;
70 
71     void ReleaseContinueToken(ContinueToken& continueStmtToken) const override;
72 
73     void GetMaxTimestamp(Timestamp& stamp) const override;
74 
75     int RemoveDeviceData(const std::string &deviceName, bool isNeedNotify) override;
76 
77     int GetSyncData(const Key& key, VirtualDataItem& dataItem);
78 
79     int PutSyncData(const DataItem& item);
80 
81     int PutData(const Key &key, const Value &value, const Timestamp &time, int flag);
82 
83     int GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries,
84         ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const override;
85 
86     int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo,
87         ContinueToken &continueStmtToken, std::vector<SingleVerKvEntry *> &entries) const override;
88 
89     int GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
90         const DataSizeSpecInfo &dataSizeInfo) const override;
91 
92     int PutSyncDataWithQuery(const QueryObject &query, const std::vector<SingleVerKvEntry *> &entries,
93         const std::string &deviceName) override;
94 
95     SchemaObject GetSchemaInfo() const override;
96 
97     bool CheckCompatible(const std::string& schema, uint8_t type) const override;
98 
99     void SetSchemaInfo(const std::string& schema);
100 
101     const KvDBProperties &GetDbProperties() const override;
102 
103     void SetSaveDataDelayTime(uint64_t milliDelayTime);
104 
105     int GetSecurityOption(SecurityOption &option) const override;
106 
107     bool IsReadable() const override;
108 
109     void SetSecurityOption(SecurityOption &option);
110 
111     void NotifyRemotePushFinished(const std::string &targetId) const override;
112 
113     int GetDatabaseCreateTimestamp(Timestamp &outTime) const override;
114 
115     int GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const override;
116     int GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const override;
117 
118     // return E_OK if subscribe is legal, ERROR on exception.
119     int CheckAndInitQueryCondition(QueryObject &query) const override;
120 
121     int InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
122         const std::string &targetID, bool isPush) const override;
123 
124     int AddSubscribe(const std::string &subscribeId, const QueryObject &query, bool needCacheSubscribe) override;
125 
126     int RemoveSubscribe(const std::string &subscribeId) override;
127 
128     int RemoveSubscribe(const std::vector<std::string> &subscribeIds) override;
129 
130     void SetBusy(bool busy, bool readBusy = false);
131 
132     void PutDeviceData(const std::string &deviceName, const Key &key, const Value &value);
133 
134     void GetDeviceData(const std::string &deviceName, const Key &key, Value &value);
135 
136     void SetIdentifier(std::vector<uint8_t> &identifier);
137 
138     void SetDbProperties(KvDBProperties &kvDBProperties);
139 
140     void DelayGetSyncData(uint32_t milliDelayTime);
141 
142     void SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl);
143     void ResetDataControl();
144 
145     void SetSaveDataCallback(const std::function<void()> &callback);
146 
147     void ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack);
148 
149     void SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier);
150 
151     void SetCompressSync(bool compressSync);
152 private:
153     int GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
154         std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const;
155 
156     int GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
157         uint32_t blockSize, ContinueToken& continueStmtToken) const;
158 
159     int PutSyncData(std::vector<VirtualDataItem>& dataItems, const std::string &deviceName);
160 
161     int DataControl() const;
162 
163     bool GetDataInner(Timestamp begin, Timestamp end, Timestamp &currentWaterMark,
164         const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const;
165 
166     mutable std::map<std::vector<uint8_t>, std::vector<uint8_t>> metadata_;
167     std::vector<VirtualDataItem> dbData_;
168     std::map<std::string, uint32_t> deviceMapping_; // key: deviceName, value: deviceId
169     uint32_t availableDeviceId_ = 0;
170     std::string schema_;
171     SchemaObject schemaObj_;
172     KvDBProperties properties_;
173     uint64_t saveDataDelayTime_ = 0;
174     SecurityOption secOption_;
175     bool busy_ = false;
176     bool readBusy_ = false;
177 
178     std::mutex deviceDataLock_;
179     std::map<std::string, std::map<Key, Value>> deviceData_;
180     std::vector<uint8_t> identifier_;
181     uint64_t getDataDelayTime_ = 0;
182     uint64_t dbCreateTime_;
183 
184     int countDown_ = -1;
185     int expectedErrCode_ = E_OK;
186     bool isGetDataControl_ = true; // control get data: true, control save data : false
187 
188     std::mutex saveDataMutex_;
189     std::function<void()> saveDataCallback_;
190 
191     std::function<int(SecurityOption &)> getSecurityOptionCallBack_;
192 
193     mutable std::mutex pushNotifierMutex_;
194     std::function<void(const std::string &)> pushNotifier_;
195 
196     bool compressSync_ = false;
197 };
198 }  // namespace DistributedDB
199 
200 #endif // KVDB_SYNCABLE_TEST_H