1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef META_DATA_H 17 #define META_DATA_H 18 19 #include <atomic> 20 #include <map> 21 #include <mutex> 22 #include <vector> 23 24 #include "db_types.h" 25 #include "ikvdb_sync_interface.h" 26 #include "version.h" 27 #include "query_sync_water_mark_helper.h" 28 29 namespace DistributedDB { 30 struct MetaDataValue { 31 TimeOffset timeOffset = 0; 32 uint64_t lastUpdateTime = 0; 33 uint64_t localWaterMark = 0; 34 uint64_t peerWaterMark = 0; 35 Timestamp dbCreateTime = 0; 36 uint64_t clearDeviceDataMark = 0; // Default 0 for not remove device data. 37 uint64_t syncMark = 0; // 0x1 ability sync finish 0x2 time sync finish 38 uint64_t remoteSchemaVersion = 0; // reset zero when local schema change 39 int64_t systemTimeOffset = 0; // record dev time offset 40 }; 41 42 struct LocalMetaData { 43 uint32_t version = LOCAL_META_DATA_VERSION_V2; // start at 108 44 uint64_t localSchemaVersion = 0; 45 }; 46 47 enum class SyncMark : uint32_t { 48 SYNC_MARK_ABILITY_SYNC = 0x01, 49 SYNC_MARK_TIME_SYNC = 0x02, 50 SYNC_MARK_TIME_CHANGE = 0x04, 51 }; 52 53 class Metadata { 54 public: 55 class MetaWaterMarkAutoLock final { 56 public: 57 explicit MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata); 58 ~MetaWaterMarkAutoLock(); 59 private: 60 DISABLE_COPY_ASSIGN_MOVE(MetaWaterMarkAutoLock); 61 const std::shared_ptr<Metadata> metadataPtr_; 62 }; 63 64 Metadata(); 65 virtual ~Metadata(); 66 67 int Initialize(ISyncInterface *storage); 68 69 int SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue); 70 71 void GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue); 72 73 virtual void GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue); 74 75 int SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue); 76 77 void GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash = true); 78 79 int SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 80 81 int SaveLocalTimeOffset(TimeOffset timeOffset); 82 83 TimeOffset GetLocalTimeOffset() const; 84 85 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash); 86 87 int EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName); 88 89 void SetLastLocalTime(Timestamp lastLocalTime); 90 91 Timestamp GetLastLocalTime() const; 92 93 int SetSendQueryWaterMark(const std::string &queryIdentify, 94 const std::string &deviceId, const WaterMark &waterMark); 95 96 // the querySync's sendWatermark will increase by the device watermark 97 // if the sendWatermark less than device watermark 98 int GetSendQueryWaterMark(const std::string &queryIdentify, 99 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 100 101 int SetRecvQueryWaterMark(const std::string &queryIdentify, 102 const std::string &deviceId, const WaterMark &waterMark); 103 104 // the querySync's recvWatermark will increase by the device watermark 105 // if the watermark less than device watermark 106 int GetRecvQueryWaterMark(const std::string &queryIdentify, 107 const std::string &deviceId, WaterMark &waterMark); 108 109 virtual int SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, 110 const Timestamp ×tamp); 111 112 virtual int GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp); 113 114 int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark); 115 116 // the deleteSync's sendWatermark will increase by the device watermark 117 // if the sendWatermark less than device watermark 118 int GetSendDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark, bool isAutoLift = true); 119 120 int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash = true); 121 122 // the deleteSync's recvWatermark will increase by the device watermark 123 // if the recvWatermark less than device watermark 124 int GetRecvDeleteSyncWaterMark(const std::string &deviceId, WaterMark &waterMark); 125 126 void GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue); 127 128 int SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash); 129 130 int ResetMetaDataAfterRemoveData(const DeviceID &deviceId); 131 132 void GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue); 133 134 // always get value from db, value updated from storage trigger 135 uint64_t GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const; 136 137 void RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId); 138 139 int SaveClientId(const std::string &deviceId, const std::string &clientId); 140 141 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const; 142 143 void LockWaterMark() const; 144 145 void UnlockWaterMark() const; 146 147 int GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info); 148 149 int ClearAllAbilitySyncFinishMark(); 150 151 int SetAbilitySyncFinishMark(const std::string &deviceId, bool finish); 152 153 bool IsAbilitySyncFinish(const std::string &deviceId); 154 155 int ClearAllTimeSyncFinishMark(); 156 157 int SetTimeSyncFinishMark(const std::string &deviceId, bool finish); 158 159 int SetTimeChangeMark(const std::string &deviceId, bool change); 160 161 bool IsTimeSyncFinish(const std::string &deviceId); 162 163 bool IsTimeChange(const std::string &deviceId); 164 165 int SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion); 166 167 uint64_t GetRemoteSchemaVersion(const std::string &deviceId); 168 169 int SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset); 170 171 int64_t GetSystemTimeOffset(const std::string &deviceId); 172 173 std::pair<int, uint64_t> GetLocalSchemaVersion(); 174 175 int SetLocalSchemaVersion(uint64_t schemaVersion); 176 private: 177 178 int SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash = true); 179 180 // sync module need hash devices id 181 void GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash); 182 183 static int SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue); 184 185 static int DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue); 186 187 int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const; 188 189 int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue); 190 191 void PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value); 192 193 void GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue); 194 195 int64_t StringToLong(const std::vector<uint8_t> &value) const; 196 197 int GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys); 198 199 int LoadAllMetadata(); 200 201 void GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash); 202 203 // this function will read data from db by metaData's key 204 // and then serialize it and put to map 205 int LoadDeviceIdDataToMap(const Key &key); 206 207 // reset the waterMark to zero 208 int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash); 209 210 int SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish); 211 212 bool IsContainSyncMark(const std::string &deviceId, SyncMark syncMark); 213 214 int SaveLocalMetaData(const LocalMetaData &localMetaData); 215 216 std::pair<int, LocalMetaData> GetLocalMetaData(); 217 218 enum class MetaValueAction : uint32_t { 219 CLEAR_ABILITY_SYNC_MARK = 0x01, 220 CLEAR_TIME_SYNC_MARK = 0x02, 221 CLEAR_REMOTE_SCHEMA_VERSION = 0x04, 222 CLEAR_SYSTEM_TIME_OFFSET = 0x08, 223 CLEAR_TIME_CHANGE_MARK = 0x10, 224 SET_TIME_CHANGE_MARK = 0x20, 225 }; 226 227 int ClearAllMetaDataValue(uint32_t innerAction); 228 229 static void ClearMetaDataValue(uint32_t innerAction, MetaDataValue &metaDataValue); 230 231 static std::pair<int, Value> SerializeLocalMetaData(const LocalMetaData &localMetaData); 232 233 static std::pair<int, LocalMetaData> DeSerializeLocalMetaData(const Value &value); 234 235 static uint64_t CalculateLocalMetaDataLength(); 236 237 int InitLocalMetaData(); 238 239 // store localTimeOffset in ram; if change, should add a lock first, change here and metadata, 240 // then release lock 241 std::atomic<TimeOffset> localTimeOffset_; 242 std::mutex localTimeOffsetLock_; 243 ISyncInterface *naturalStoragePtr_; 244 245 // if changed, it should be locked from save-to-db to change-in-memory.save to db must be first, 246 // if save to db fail, it will not be changed in memory. 247 std::map<std::string, MetaDataValue> metadataMap_; 248 mutable std::mutex metadataLock_; 249 std::map<DeviceID, DeviceID> deviceIdToHashDeviceIdMap_; 250 251 // store localTimeOffset in ram, used to make timestamp increase 252 mutable std::mutex lastLocalTimeLock_; 253 Timestamp lastLocalTime_; 254 255 QuerySyncWaterMarkHelper querySyncWaterMarkHelper_; 256 257 // set value: SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId) 258 // queryId is not in set while key is not found from db first time, and return lastTimestamp = INT64_MAX 259 // if query is in set return 0 while not found from db, means already sync before, don't trigger again 260 mutable std::map<DeviceID, std::set<std::string>> queryIdMap_; 261 262 std::mutex clientIdLock_; 263 std::map<DeviceID, std::string> clientIdCache_; 264 265 mutable std::recursive_mutex waterMarkMutex_; 266 mutable std::mutex localMetaDataMutex_; 267 }; 268 } // namespace DistributedDB 269 #endif 270