1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_DATA_H
17 #define META_DATA_H
18 
19 #include <atomic>
20 #include <map>
21 #include <mutex>
22 #include <vector>
23 
24 #include "db_types.h"
25 #include "ikvdb_sync_interface.h"
26 #include "version.h"
27 #include "query_sync_water_mark_helper.h"
28 
29 namespace DistributedDB {
30 struct MetaDataValue {
31     TimeOffset timeOffset = 0;
32     uint64_t lastUpdateTime = 0;
33     uint64_t localWaterMark = 0;
34     uint64_t peerWaterMark = 0;
35     Timestamp dbCreateTime = 0;
36     uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data.
37     uint64_t syncMark = 0; // 0x1 ability sync finish 0x2 time sync finish
38     uint64_t remoteSchemaVersion = 0; // reset zero when local schema change
39     int64_t systemTimeOffset = 0; // record dev time offset
40 };
41 
42 struct LocalMetaData {
43     uint32_t version = LOCAL_META_DATA_VERSION_V2; // start at 108
44     uint64_t localSchemaVersion = 0;
45 };
46 
47 enum class SyncMark : uint32_t {
48     SYNC_MARK_ABILITY_SYNC = 0x01,
49     SYNC_MARK_TIME_SYNC = 0x02,
50     SYNC_MARK_TIME_CHANGE = 0x04,
51 };
52 
53 class Metadata {
54 public:
55     class MetaWaterMarkAutoLock final {
56     public:
57         explicit MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata);
58         ~MetaWaterMarkAutoLock();
59     private:
60         DISABLE_COPY_ASSIGN_MOVE(MetaWaterMarkAutoLock);
61         const std::shared_ptr<Metadata> metadataPtr_;
62     };
63 
64     Metadata();
65     virtual ~Metadata();
66 
67     int Initialize(ISyncInterface *storage);
68 
69     int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue);
70 
71     void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue);
72 
73     virtual void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue);
74 
75     int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue);
76 
77     void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash = true);
78 
79     int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
80 
81     int SaveLocalTimeOffset(TimeOffset timeOffset);
82 
83     TimeOffset GetLocalTimeOffset() const;
84 
85     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash);
86 
87     int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName);
88 
89     void SetLastLocalTime(Timestamp lastLocalTime);
90 
91     Timestamp GetLastLocalTime() const;
92 
93     int SetSendQueryWaterMark(const std::string &queryIdentify,
94         const std::string &deviceId, const WaterMark &waterMark);
95 
96     // the querySync's sendWatermark will increase by the device watermark
97     // if the sendWatermark less than device watermark
98     int GetSendQueryWaterMark(const std::string &queryIdentify,
99         const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
100 
101     int SetRecvQueryWaterMark(const std::string &queryIdentify,
102         const std::string &deviceId, const WaterMark &waterMark);
103 
104     // the querySync's recvWatermark will increase by the device watermark
105     // if the watermark less than device watermark
106     int GetRecvQueryWaterMark(const std::string &queryIdentify,
107         const std::string &deviceId, WaterMark &waterMark);
108 
109     virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
110         const Timestamp &timestamp);
111 
112     virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp);
113 
114     int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark);
115 
116     // the deleteSync's sendWatermark will increase by the device watermark
117     // if the sendWatermark less than device watermark
118     int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true);
119 
120     int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true);
121 
122     // the deleteSync's recvWatermark will increase by the device watermark
123     // if the recvWatermark less than device watermark
124     int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark);
125 
126     void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue);
127 
128     int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash);
129 
130     int ResetMetaDataAfterRemoveData(const DeviceID &deviceId);
131 
132     void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue);
133 
134     // always get value from db, value updated from storage trigger
135     uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const;
136 
137     void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId);
138 
139     int SaveClientId(const std::string &deviceId, const std::string &clientId);
140 
141     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const;
142 
143     void LockWaterMark() const;
144 
145     void UnlockWaterMark() const;
146 
147     int GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info);
148 
149     int ClearAllAbilitySyncFinishMark();
150 
151     int SetAbilitySyncFinishMark(const std::string &deviceId, bool finish);
152 
153     bool IsAbilitySyncFinish(const std::string &deviceId);
154 
155     int ClearAllTimeSyncFinishMark();
156 
157     int SetTimeSyncFinishMark(const std::string &deviceId, bool finish);
158 
159     int SetTimeChangeMark(const std::string &deviceId, bool change);
160 
161     bool IsTimeSyncFinish(const std::string &deviceId);
162 
163     bool IsTimeChange(const std::string &deviceId);
164 
165     int SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion);
166 
167     uint64_t GetRemoteSchemaVersion(const std::string &deviceId);
168 
169     int SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset);
170 
171     int64_t GetSystemTimeOffset(const std::string &deviceId);
172 
173     std::pair<int, uint64_t> GetLocalSchemaVersion();
174 
175     int SetLocalSchemaVersion(uint64_t schemaVersion);
176 private:
177 
178     int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true);
179 
180     // sync module need hash devices id
181     void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash);
182 
183     static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue);
184 
185     static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue);
186 
187     int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const;
188 
189     int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue);
190 
191     void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value);
192 
193     void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue);
194 
195     int64_t StringToLong(const std::vector<uint8_t> &value) const;
196 
197     int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys);
198 
199     int LoadAllMetadata();
200 
201     void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash);
202 
203     // this function will read data from db by metaData's key
204     // and then serialize it and put to map
205     int LoadDeviceIdDataToMap(const Key &key);
206 
207     // reset the waterMark to zero
208     int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash);
209 
210     int SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish);
211 
212     bool IsContainSyncMark(const std::string &deviceId, SyncMark syncMark);
213 
214     int SaveLocalMetaData(const LocalMetaData &localMetaData);
215 
216     std::pair<int, LocalMetaData> GetLocalMetaData();
217 
218     enum class MetaValueAction : uint32_t {
219         CLEAR_ABILITY_SYNC_MARK = 0x01,
220         CLEAR_TIME_SYNC_MARK = 0x02,
221         CLEAR_REMOTE_SCHEMA_VERSION = 0x04,
222         CLEAR_SYSTEM_TIME_OFFSET = 0x08,
223         CLEAR_TIME_CHANGE_MARK = 0x10,
224         SET_TIME_CHANGE_MARK = 0x20,
225     };
226 
227     int ClearAllMetaDataValue(uint32_t innerAction);
228 
229     static void ClearMetaDataValue(uint32_t innerAction, MetaDataValue &metaDataValue);
230 
231     static std::pair<int, Value> SerializeLocalMetaData(const LocalMetaData &localMetaData);
232 
233     static std::pair<int, LocalMetaData> DeSerializeLocalMetaData(const Value &value);
234 
235     static uint64_t CalculateLocalMetaDataLength();
236 
237     int InitLocalMetaData();
238 
239     // store localTimeOffset in ram; if change, should add a lock first, change here and metadata,
240     // then release lock
241     std::atomic<TimeOffset> localTimeOffset_;
242     std::mutex localTimeOffsetLock_;
243     ISyncInterface *naturalStoragePtr_;
244 
245     // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first,
246     // if save to db fail, it will not be changed in memory.
247     std::map<std::string, MetaDataValue> metadataMap_;
248     mutable std::mutex metadataLock_;
249     std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_;
250 
251     // store localTimeOffset in ram, used to make timestamp increase
252     mutable std::mutex lastLocalTimeLock_;
253     Timestamp lastLocalTime_;
254 
255     QuerySyncWaterMarkHelper querySyncWaterMarkHelper_;
256 
257     // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId)
258     // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX
259     // if query is in set return 0 while not found from db, means already sync before, don't trigger again
260     mutable std::map<DeviceID, std::set<std::string>> queryIdMap_;
261 
262     std::mutex clientIdLock_;
263     std::map<DeviceID, std::string> clientIdCache_;
264 
265     mutable std::recursive_mutex waterMarkMutex_;
266     mutable std::mutex localMetaDataMutex_;
267 };
268 }  // namespace DistributedDB
269 #endif
270