1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "meta_data.h"
17  
18  #include <openssl/rand.h>
19  
20  #include "db_common.h"
21  #include "db_constant.h"
22  #include "db_errno.h"
23  #include "hash.h"
24  #include "log_print.h"
25  #include "platform_specific.h"
26  #include "securec.h"
27  #include "sync_types.h"
28  #include "time_helper.h"
29  #include "version.h"
30  
31  namespace DistributedDB {
32  namespace {
33      // store local timeoffset;this is a special key;
34      constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId";
35      constexpr const char *LOCAL_META_DATA_KEY = "localMetaData";
36  }
37  
Metadata()38  Metadata::Metadata()
39      : localTimeOffset_(0),
40        naturalStoragePtr_(nullptr),
41        lastLocalTime_(0)
42  {}
43  
~Metadata()44  Metadata::~Metadata()
45  {
46      naturalStoragePtr_ = nullptr;
47      metadataMap_.clear();
48  }
49  
Initialize(ISyncInterface * storage)50  int Metadata::Initialize(ISyncInterface* storage)
51  {
52      naturalStoragePtr_ = storage;
53      std::vector<uint8_t> key;
54      std::vector<uint8_t> timeOffset;
55      DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key);
56  
57      int errCode = GetMetadataFromDb(key, timeOffset);
58      if (errCode == -E_NOT_FOUND) {
59          int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
60          if (err != E_OK) {
61              LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
62              return err;
63          }
64      } else if (errCode == E_OK) {
65          localTimeOffset_ = StringToLong(timeOffset);
66      } else {
67          LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
68          return errCode;
69      }
70      {
71          std::lock_guard<std::mutex> lockGuard(metadataLock_);
72          metadataMap_.clear();
73      }
74      (void)querySyncWaterMarkHelper_.Initialize(storage);
75      return LoadAllMetadata();
76  }
77  
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)78  int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
79  {
80      MetaDataValue metadata;
81      std::lock_guard<std::mutex> lockGuard(metadataLock_);
82      GetMetaDataValue(deviceId, metadata, true);
83      metadata.timeOffset = inValue;
84      metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
85      LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
86      return SaveMetaDataValue(deviceId, metadata);
87  }
88  
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)89  void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
90  {
91      MetaDataValue metadata;
92      std::lock_guard<std::mutex> lockGuard(metadataLock_);
93      GetMetaDataValue(deviceId, metadata, true);
94      outValue = metadata.timeOffset;
95  }
96  
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)97  void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
98  {
99      MetaDataValue metadata;
100      std::lock_guard<std::mutex> lockGuard(metadataLock_);
101      GetMetaDataValue(deviceId, metadata, true);
102      outValue = metadata.localWaterMark;
103  }
104  
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)105  int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
106  {
107      MetaDataValue metadata;
108      std::lock_guard<std::mutex> lockGuard(metadataLock_);
109      GetMetaDataValue(deviceId, metadata, true);
110      metadata.localWaterMark = inValue;
111      LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
112      return SaveMetaDataValue(deviceId, metadata);
113  }
114  
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)115  void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash)
116  {
117      MetaDataValue metadata;
118      std::lock_guard<std::mutex> lockGuard(metadataLock_);
119      GetMetaDataValue(deviceId, metadata, isNeedHash);
120      outValue = metadata.peerWaterMark;
121  }
122  
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)123  int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
124  {
125      MetaDataValue metadata;
126      std::lock_guard<std::mutex> lockGuard(metadataLock_);
127      GetMetaDataValue(deviceId, metadata, isNeedHash);
128      metadata.peerWaterMark = inValue;
129      LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
130      return SaveMetaDataValue(deviceId, metadata, isNeedHash);
131  }
132  
SaveLocalTimeOffset(TimeOffset timeOffset)133  int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
134  {
135      std::string timeOffsetString = std::to_string(timeOffset);
136      std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
137      std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY);
138      std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end());
139  
140      std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
141      localTimeOffset_ = timeOffset;
142      LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
143      int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
144      if (errCode != E_OK) {
145          LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
146      }
147      return errCode;
148  }
149  
GetLocalTimeOffset() const150  TimeOffset Metadata::GetLocalTimeOffset() const
151  {
152      TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
153      return localTimeOffset;
154  }
155  
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)156  int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
157  {
158      return EraseDeviceWaterMark(deviceId, isNeedHash, "");
159  }
160  
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)161  int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
162  {
163      // reload meta data again
164      (void)LoadAllMetadata();
165      std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
166      // try to erase all the waterMark
167      // erase deleteSync recv waterMark
168      WaterMark waterMark = 0;
169      int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
170      if (errCodeDeleteSync != E_OK) {
171          LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
172          return errCodeDeleteSync;
173      }
174      // erase querySync recv waterMark
175      int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
176      if (errCodeQuerySync != E_OK) {
177          LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
178          return errCodeQuerySync;
179      }
180      // peerWaterMark must be erased at last
181      int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
182      if (errCode != E_OK) {
183          LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
184          return errCode;
185      }
186      return E_OK;
187  }
188  
SetLastLocalTime(Timestamp lastLocalTime)189  void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
190  {
191      std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
192      if (lastLocalTime > lastLocalTime_) {
193          lastLocalTime_ = lastLocalTime;
194      }
195  }
196  
GetLastLocalTime() const197  Timestamp Metadata::GetLastLocalTime() const
198  {
199      std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
200      return lastLocalTime_;
201  }
202  
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)203  int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
204  {
205      std::vector<uint8_t> value;
206      int errCode = SerializeMetaData(inValue, value);
207      if (errCode != E_OK) {
208          return errCode;
209      }
210  
211      DeviceID hashDeviceId;
212      GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
213      std::vector<uint8_t> key;
214      DBCommon::StringToVector(hashDeviceId, key);
215      errCode = SetMetadataToDb(key, value);
216      if (errCode != E_OK) {
217          LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
218          return errCode;
219      }
220      PutMetadataToMap(hashDeviceId, inValue);
221      return E_OK;
222  }
223  
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)224  void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
225  {
226      DeviceID hashDeviceId;
227      GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
228      GetMetadataFromMap(hashDeviceId, outValue);
229  }
230  
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)231  int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
232  {
233      outValue.resize(sizeof(MetaDataValue));
234      errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
235      if (err != EOK) {
236          return -E_SECUREC_ERROR;
237      }
238      return E_OK;
239  }
240  
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)241  int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
242  {
243      if (inValue.empty()) {
244          return -E_INVALID_ARGS;
245      }
246      errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
247      if (err != EOK) {
248          return -E_SECUREC_ERROR;
249      }
250      return E_OK;
251  }
252  
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const253  int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
254  {
255      if (naturalStoragePtr_ == nullptr) {
256          return -E_INVALID_DB;
257      }
258      return naturalStoragePtr_->GetMetaData(key, outValue);
259  }
260  
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)261  int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
262  {
263      if (naturalStoragePtr_ == nullptr) {
264          return -E_INVALID_DB;
265      }
266      return naturalStoragePtr_->PutMetaData(key, inValue, false);
267  }
268  
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)269  void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
270  {
271      metadataMap_[deviceId] = value;
272  }
273  
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)274  void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
275  {
276      if (metadataMap_.find(deviceId) == metadataMap_.end() && RuntimeContext::GetInstance()->IsTimeChanged()) {
277          metadataMap_[deviceId].syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
278      }
279      outValue = metadataMap_[deviceId];
280  }
281  
StringToLong(const std::vector<uint8_t> & value) const282  int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
283  {
284      std::string valueString(value.begin(), value.end());
285      int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE);
286      LOGD("Metadata::StringToLong longData = %" PRId64, longData);
287      return longData;
288  }
289  
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)290  int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
291  {
292      if (naturalStoragePtr_ == nullptr) {
293          return -E_INVALID_DB;
294      }
295      return naturalStoragePtr_->GetAllMetaKeys(keys);
296  }
297  
298  namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)299  bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
300  {
301      if (inKey.size() < expectPrefix.size()) {
302          return false;
303      }
304      std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
305      if (prefixInKey != expectPrefix) {
306          return false;
307      }
308      return true;
309  }
310  }
311  
LoadAllMetadata()312  int Metadata::LoadAllMetadata()
313  {
314      std::vector<std::vector<uint8_t>> metaDataKeys;
315      int errCode = GetAllMetadataKey(metaDataKeys);
316      if (errCode != E_OK) {
317          LOGE("[Metadata] get all metadata key failed err=%d", errCode);
318          return errCode;
319      }
320  
321      std::vector<std::vector<uint8_t>> querySyncIds;
322      for (const auto &deviceId : metaDataKeys) {
323          if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
324              errCode = LoadDeviceIdDataToMap(deviceId);
325              if (errCode != E_OK) {
326                  return errCode;
327              }
328          } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
329              querySyncIds.push_back(deviceId);
330          } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
331              errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
332              if (errCode != E_OK) {
333                  return errCode;
334              }
335          }
336      }
337      errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
338      if (errCode != E_OK) {
339          return errCode;
340      }
341      return InitLocalMetaData();
342  }
343  
LoadDeviceIdDataToMap(const Key & key)344  int Metadata::LoadDeviceIdDataToMap(const Key &key)
345  {
346      std::vector<uint8_t> value;
347      int errCode = GetMetadataFromDb(key, value);
348      if (errCode != E_OK) {
349          return errCode;
350      }
351      MetaDataValue metaValue;
352      std::string metaKey(key.begin(), key.end());
353      errCode = DeSerializeMetaData(value, metaValue);
354      if (errCode != E_OK) {
355          return errCode;
356      }
357      std::lock_guard<std::mutex> lockGuard(metadataLock_);
358      PutMetadataToMap(metaKey, metaValue);
359      return errCode;
360  }
361  
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)362  void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
363  {
364      if (!isNeedHash) {
365          hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
366          return;
367      }
368      if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
369          hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
370          deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
371      } else {
372          hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
373      }
374  }
375  
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)376  int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
377      const std::string &deviceId, WaterMark &waterMark)
378  {
379      QueryWaterMark queryWaterMark;
380      int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
381      if (errCode != E_OK) {
382          return errCode;
383      }
384      WaterMark peerWaterMark;
385      GetPeerWaterMark(deviceId, peerWaterMark);
386      waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
387      return E_OK;
388  }
389  
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)390  int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
391      const std::string &deviceId, const WaterMark &waterMark)
392  {
393      return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
394  }
395  
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)396  int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
397      const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
398  {
399      QueryWaterMark queryWaterMark;
400      int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
401      if (errCode != E_OK) {
402          return errCode;
403      }
404      if (isAutoLift) {
405          WaterMark localWaterMark;
406          GetLocalWaterMark(deviceId, localWaterMark);
407          waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
408      } else {
409          waterMark = queryWaterMark.sendWaterMark;
410      }
411      return E_OK;
412  }
413  
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)414  int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
415      const std::string &deviceId, const WaterMark &waterMark)
416  {
417      return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
418  }
419  
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)420  int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp)
421  {
422      QueryWaterMark queryWaterMark;
423      int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
424      if (errCode != E_OK) {
425          return errCode;
426      }
427      timestamp = queryWaterMark.lastQueryTime;
428      return E_OK;
429  }
430  
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)431  int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
432      const Timestamp &timestamp)
433  {
434      return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
435  }
436  
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)437  int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
438  {
439      DeleteWaterMark deleteWaterMark;
440      int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
441      if (errCode != E_OK) {
442          return errCode;
443      }
444      if (isAutoLift) {
445          WaterMark localWaterMark;
446          GetLocalWaterMark(deviceId, localWaterMark);
447          waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
448      } else {
449          waterMark = deleteWaterMark.sendWaterMark;
450      }
451      return E_OK;
452  }
453  
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)454  int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
455  {
456      return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
457  }
458  
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)459  int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
460  {
461      DeleteWaterMark deleteWaterMark;
462      int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
463      if (errCode != E_OK) {
464          return errCode;
465      }
466      WaterMark peerWaterMark;
467      GetPeerWaterMark(deviceId, peerWaterMark);
468      waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
469      return E_OK;
470  }
471  
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)472  int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
473  {
474      return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
475  }
476  
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)477  int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
478  {
479      return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
480  }
481  
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)482  void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
483  {
484      MetaDataValue metadata;
485      std::lock_guard<std::mutex> lockGuard(metadataLock_);
486      DeviceID hashDeviceId;
487      GetHashDeviceId(deviceId, hashDeviceId, true);
488      if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
489          metadata = metadataMap_[hashDeviceId];
490          outValue = metadata.dbCreateTime;
491          return;
492      }
493      outValue = 0;
494      LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
495  }
496  
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)497  int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
498  {
499      MetaDataValue metadata;
500      std::lock_guard<std::mutex> lockGuard(metadataLock_);
501      DeviceID hashDeviceId;
502      GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
503      if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
504          metadata = metadataMap_[hashDeviceId];
505          if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
506              metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
507              LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
508                  STR_MASK(deviceId), inValue);
509          }
510          if (metadata.dbCreateTime == 0) {
511              LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
512          }
513      }
514      metadata.dbCreateTime = inValue;
515      return SaveMetaDataValue(deviceId, metadata);
516  }
517  
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)518  int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
519  {
520      MetaDataValue metadata;
521      std::lock_guard<std::mutex> lockGuard(metadataLock_);
522      DeviceID hashDeviceId;
523      GetHashDeviceId(deviceId, hashDeviceId, true);
524      if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
525          metadata = metadataMap_[hashDeviceId];
526          metadata.clearDeviceDataMark = 0; // clear mark
527          return SaveMetaDataValue(deviceId, metadata);
528      }
529      return -E_NOT_FOUND;
530  }
531  
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)532  void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
533  {
534      MetaDataValue metadata;
535      std::lock_guard<std::mutex> lockGuard(metadataLock_);
536      DeviceID hashDeviceId;
537      GetHashDeviceId(deviceId, hashDeviceId, true);
538      if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
539          metadata = metadataMap_[hashDeviceId];
540          outValue = metadata.clearDeviceDataMark;
541          return;
542      }
543      outValue = 0;
544  }
545  
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const546  uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
547  {
548      std::vector<uint8_t> key;
549      std::vector<uint8_t> value;
550      std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
551      DBCommon::StringToVector(hashqueryId, key);
552      int errCode = GetMetadataFromDb(key, value);
553      std::lock_guard<std::mutex> lockGuard(metadataLock_);
554      if (errCode == -E_NOT_FOUND) {
555          auto iter = queryIdMap_.find(deviceId);
556          if (iter != queryIdMap_.end()) {
557              if (iter->second.find(hashqueryId) == iter->second.end()) {
558                  iter->second.insert(hashqueryId);
559                  return INT64_MAX;
560              }
561              return 0;
562          } else {
563              queryIdMap_[deviceId] = { hashqueryId };
564              return INT64_MAX;
565          }
566      }
567      auto iter = queryIdMap_.find(deviceId);
568      // while value is found in db, it can be found in db later when db is not closed
569      // so no need to record the hashqueryId in map
570      if (errCode == E_OK && iter != queryIdMap_.end()) {
571          iter->second.erase(hashqueryId);
572      }
573      return StringToLong(value);
574  }
575  
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)576  void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
577  {
578      std::lock_guard<std::mutex> lockGuard(metadataLock_);
579      std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
580      auto iter = queryIdMap_.find(deviceId);
581      if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
582          iter->second.erase(hashqueryId);
583      }
584  }
585  
SaveClientId(const std::string & deviceId,const std::string & clientId)586  int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
587  {
588      {
589          // already save in cache
590          std::lock_guard<std::mutex> autoLock(clientIdLock_);
591          if (clientIdCache_[deviceId] == clientId) {
592              return E_OK;
593          }
594      }
595      std::string keyStr;
596      keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
597      std::string valueStr = DBCommon::TransferHashString(deviceId);
598      Key key;
599      DBCommon::StringToVector(keyStr, key);
600      Value value;
601      DBCommon::StringToVector(valueStr, value);
602      int errCode = SetMetadataToDb(key, value);
603      if (errCode != E_OK) {
604          return errCode;
605      }
606      std::lock_guard<std::mutex> autoLock(clientIdLock_);
607      clientIdCache_[deviceId] = clientId;
608      return E_OK;
609  }
610  
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const611  int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
612  {
613      // don't use cache here avoid invalid cache
614      std::string keyStr;
615      keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
616      Key key;
617      DBCommon::StringToVector(keyStr, key);
618      Value value;
619      int errCode = GetMetadataFromDb(key, value);
620      if (errCode == -E_NOT_FOUND) {
621          LOGD("[Metadata] not found clientId by %.3s", clientId.c_str());
622          return -E_NOT_SUPPORT;
623      }
624      if (errCode != E_OK) {
625          LOGE("[Metadata] reload clientId failed %d", errCode);
626          return errCode;
627      }
628      DBCommon::VectorToString(value, hashDevId);
629      return E_OK;
630  }
631  
LockWaterMark() const632  void Metadata::LockWaterMark() const
633  {
634      waterMarkMutex_.lock();
635  }
636  
UnlockWaterMark() const637  void Metadata::UnlockWaterMark() const
638  {
639      waterMarkMutex_.unlock();
640  }
641  
GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)642  int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info)
643  {
644      Key devKey;
645      DeviceID hashDeviceId;
646      {
647          std::lock_guard<std::mutex> lockGuard(metadataLock_);
648          GetHashDeviceId(dev, hashDeviceId, isNeedHash);
649          DBCommon::StringToVector(hashDeviceId, devKey);
650      }
651      // read from db avoid watermark update in diff process
652      int errCode = LoadDeviceIdDataToMap(devKey);
653      if (errCode == -E_NOT_FOUND) {
654          LOGD("[Metadata] not found meta value");
655          return E_OK;
656      }
657      if (errCode != E_OK) {
658          LOGE("[Metadata] reload meta value failed %d", errCode);
659          return errCode;
660      }
661      {
662          std::lock_guard<std::mutex> lockGuard(metadataLock_);
663          MetaDataValue metadata;
664          GetMetadataFromMap(hashDeviceId, metadata);
665          info.sendMark = metadata.localWaterMark;
666          info.receiveMark = metadata.peerWaterMark;
667      }
668      return E_OK;
669  }
670  
ClearAllAbilitySyncFinishMark()671  int Metadata::ClearAllAbilitySyncFinishMark()
672  {
673      return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) |
674          static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION));
675  }
676  
ClearAllTimeSyncFinishMark()677  int Metadata::ClearAllTimeSyncFinishMark()
678  {
679      return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) |
680          static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) |
681          static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK));
682  }
683  
ClearAllMetaDataValue(uint32_t innerClearAction)684  int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction)
685  {
686      int errCode = E_OK;
687      std::lock_guard<std::mutex> lockGuard(metadataLock_);
688      for (auto &[hashDev, metaDataValue] : metadataMap_) {
689          ClearMetaDataValue(innerClearAction, metaDataValue);
690          std::vector<uint8_t> value;
691          int innerErrCode = SerializeMetaData(metaDataValue, value);
692          // clear sync mark without transaction here
693          // just ignore error metadata value
694          if (innerErrCode != E_OK) {
695              LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction,
696                  innerErrCode);
697              errCode = errCode == E_OK ? innerErrCode : errCode;
698              continue;
699          }
700  
701          std::vector<uint8_t> key;
702          DBCommon::StringToVector(hashDev, key);
703          innerErrCode = SetMetadataToDb(key, value);
704          if (innerErrCode != E_OK) {
705              LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction,
706                  innerErrCode);
707              errCode = errCode == E_OK ? innerErrCode : errCode;
708          }
709      }
710      if (errCode == E_OK) {
711          LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction);
712      }
713      return errCode;
714  }
715  
ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)716  void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue)
717  {
718      auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK);
719      if ((innerClearAction & mark) == mark) {
720          metaDataValue.syncMark =
721              DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC));
722      }
723      mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK);
724      if ((innerClearAction & mark) == mark) {
725          metaDataValue.syncMark =
726              DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC));
727      }
728      mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION);
729      if ((innerClearAction & mark) == mark) {
730          metaDataValue.remoteSchemaVersion = 0;
731      }
732      mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET);
733      if ((innerClearAction & mark) == mark) {
734          metaDataValue.systemTimeOffset = 0;
735      }
736      mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK);
737      if ((innerClearAction & mark) == mark) {
738          metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
739      }
740  }
741  
SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)742  int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish)
743  {
744      return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish);
745  }
746  
IsAbilitySyncFinish(const std::string & deviceId)747  bool Metadata::IsAbilitySyncFinish(const std::string &deviceId)
748  {
749      return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC);
750  }
751  
SetTimeSyncFinishMark(const std::string & deviceId,bool finish)752  int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish)
753  {
754      return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish);
755  }
756  
SetTimeChangeMark(const std::string & deviceId,bool change)757  int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change)
758  {
759      return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change);
760  }
761  
IsTimeSyncFinish(const std::string & deviceId)762  bool Metadata::IsTimeSyncFinish(const std::string &deviceId)
763  {
764      return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC);
765  }
766  
IsTimeChange(const std::string & deviceId)767  bool Metadata::IsTimeChange(const std::string &deviceId)
768  {
769      return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE);
770  }
771  
SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)772  int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish)
773  {
774      MetaDataValue metadata;
775      std::lock_guard<std::mutex> lockGuard(metadataLock_);
776      GetMetaDataValue(deviceId, metadata, true);
777      auto mark = static_cast<uint64_t>(syncMark);
778      if (finish) {
779          metadata.syncMark |= mark;
780      } else {
781          metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark);
782      }
783      LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish),
784          metadata.syncMark);
785      return SaveMetaDataValue(deviceId, metadata);
786  }
787  
IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)788  bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark)
789  {
790      MetaDataValue metadata;
791      std::lock_guard<std::mutex> lockGuard(metadataLock_);
792      GetMetaDataValue(deviceId, metadata, true);
793      auto mark = static_cast<uint64_t>(syncMark);
794      return (metadata.syncMark & mark) == mark;
795  }
796  
SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)797  int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion)
798  {
799      MetaDataValue metadata;
800      std::lock_guard<std::mutex> lockGuard(metadataLock_);
801      GetMetaDataValue(deviceId, metadata, true);
802      metadata.remoteSchemaVersion = schemaVersion;
803      LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion);
804      int errCode = SaveMetaDataValue(deviceId, metadata);
805      if (errCode != E_OK) {
806          LOGW("[Metadata] Set remote schema version failed");
807      }
808      return errCode;
809  }
810  
GetRemoteSchemaVersion(const std::string & deviceId)811  uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId)
812  {
813      MetaDataValue metadata;
814      std::lock_guard<std::mutex> lockGuard(metadataLock_);
815      GetMetaDataValue(deviceId, metadata, true);
816      LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion);
817      return metadata.remoteSchemaVersion;
818  }
819  
SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)820  int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset)
821  {
822      MetaDataValue metadata;
823      std::lock_guard<std::mutex> lockGuard(metadataLock_);
824      GetMetaDataValue(deviceId, metadata, true);
825      metadata.systemTimeOffset = systemTimeOffset;
826      LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset);
827      return SaveMetaDataValue(deviceId, metadata);
828  }
829  
GetSystemTimeOffset(const std::string & deviceId)830  int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId)
831  {
832      MetaDataValue metadata;
833      std::lock_guard<std::mutex> lockGuard(metadataLock_);
834      GetMetaDataValue(deviceId, metadata, true);
835      LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset);
836      return metadata.systemTimeOffset;
837  }
838  
GetLocalSchemaVersion()839  std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion()
840  {
841      std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
842      auto [errCode, localMetaData] = GetLocalMetaData();
843      if (errCode != E_OK) {
844          return {errCode, 0};
845      }
846      LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion);
847      return {errCode, localMetaData.localSchemaVersion};
848  }
849  
SetLocalSchemaVersion(uint64_t schemaVersion)850  int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion)
851  {
852      std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
853      auto [errCode, localMetaData] = GetLocalMetaData();
854      if (errCode != E_OK) {
855          return errCode;
856      }
857      localMetaData.localSchemaVersion = schemaVersion;
858      LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion);
859      return SaveLocalMetaData(localMetaData);
860  }
861  
SaveLocalMetaData(const LocalMetaData & localMetaData)862  int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData)
863  {
864      auto [errCode, value] = SerializeLocalMetaData(localMetaData);
865      if (errCode != E_OK) {
866          LOGE("[Metadata] Serialize local meta data failed %d", errCode);
867          return errCode;
868      }
869      std::string k(LOCAL_META_DATA_KEY);
870      Key key(k.begin(), k.end());
871      return SetMetadataToDb(key, value);
872  }
873  
GetLocalMetaData()874  std::pair<int, LocalMetaData> Metadata::GetLocalMetaData()
875  {
876      std::string k(LOCAL_META_DATA_KEY);
877      Key key(k.begin(), k.end());
878      Value value;
879      int errCode = GetMetadataFromDb(key, value);
880      if (errCode != E_OK && errCode != -E_NOT_FOUND) {
881          return {errCode, {}};
882      }
883      return DeSerializeLocalMetaData(value);
884  }
885  
SerializeLocalMetaData(const LocalMetaData & localMetaData)886  std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData)
887  {
888      std::pair<int, Value> res = {E_OK, {}};
889      auto &[errCode, value] = res;
890      value.resize(CalculateLocalMetaDataLength());
891      Parcel parcel(value.data(), value.size());
892      (void)parcel.WriteUInt32(localMetaData.version);
893      parcel.EightByteAlign();
894      (void)parcel.WriteUInt64(localMetaData.localSchemaVersion);
895      if (parcel.IsError()) {
896          LOGE("[Metadata] Serialize localMetaData failed");
897          errCode = -E_SERIALIZE_ERROR;
898          value.clear();
899          return res;
900      }
901      return res;
902  }
903  
DeSerializeLocalMetaData(const Value & value)904  std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value)
905  {
906      std::pair<int, LocalMetaData> res;
907      auto &[errCode, meta] = res;
908      if (value.empty()) {
909          errCode = E_OK;
910          return res;
911      }
912      Parcel parcel(const_cast<uint8_t *>(value.data()), value.size());
913      parcel.ReadUInt32(meta.version);
914      if (meta.version >= LOCAL_META_DATA_VERSION_V2) {
915          parcel.EightByteAlign();
916      }
917      parcel.ReadUInt64(meta.localSchemaVersion);
918      if (parcel.IsError()) {
919          LOGE("[Metadata] DeSerialize localMetaData failed");
920          errCode = -E_SERIALIZE_ERROR;
921          return res;
922      }
923      return res;
924  }
925  
CalculateLocalMetaDataLength()926  uint64_t Metadata::CalculateLocalMetaDataLength()
927  {
928      uint64_t length = Parcel::GetUInt32Len(); // version
929      length = Parcel::GetEightByteAlign(length);
930      length += Parcel::GetUInt64Len(); // local schema version
931      return length;
932  }
933  
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)934  Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
935      : metadataPtr_(std::move(metadata))
936  {
937      if (metadataPtr_ != nullptr) {
938          metadataPtr_->LockWaterMark();
939      }
940  }
941  
~MetaWaterMarkAutoLock()942  Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
943  {
944      if (metadataPtr_ != nullptr) {
945          metadataPtr_->UnlockWaterMark();
946      }
947  }
948  
InitLocalMetaData()949  int Metadata::InitLocalMetaData()
950  {
951      std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
952      auto [errCode, localMetaData] = GetLocalMetaData();
953      if (errCode != E_OK) {
954          return errCode;
955      }
956      if (localMetaData.localSchemaVersion != 0) {
957          return E_OK;
958      }
959      uint64_t curTime = 0u;
960      errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
961      if (errCode != E_OK) {
962          LOGW("[Metadata] get system time failed when init schema version!");
963          localMetaData.localSchemaVersion += 1;
964      } else {
965          localMetaData.localSchemaVersion = curTime;
966      }
967      errCode = SaveLocalMetaData(localMetaData);
968      if (errCode != E_OK) {
969          LOGE("[Metadata] init local schema version failed:%d", errCode);
970      }
971      return errCode;
972  }
973  }  // namespace DistributedDB