1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "meta_data.h"
17 
18 #include <openssl/rand.h>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "platform_specific.h"
26 #include "securec.h"
27 #include "sync_types.h"
28 #include "time_helper.h"
29 #include "version.h"
30 
31 namespace DistributedDB {
32 namespace {
33     // store local timeoffset;this is a special key;
34     constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId";
35     constexpr const char *LOCAL_META_DATA_KEY = "localMetaData";
36 }
37 
Metadata()38 Metadata::Metadata()
39     : localTimeOffset_(0),
40       naturalStoragePtr_(nullptr),
41       lastLocalTime_(0)
42 {}
43 
~Metadata()44 Metadata::~Metadata()
45 {
46     naturalStoragePtr_ = nullptr;
47     metadataMap_.clear();
48 }
49 
Initialize(ISyncInterface * storage)50 int Metadata::Initialize(ISyncInterface* storage)
51 {
52     naturalStoragePtr_ = storage;
53     std::vector<uint8_t> key;
54     std::vector<uint8_t> timeOffset;
55     DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key);
56 
57     int errCode = GetMetadataFromDb(key, timeOffset);
58     if (errCode == -E_NOT_FOUND) {
59         int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
60         if (err != E_OK) {
61             LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
62             return err;
63         }
64     } else if (errCode == E_OK) {
65         localTimeOffset_ = StringToLong(timeOffset);
66     } else {
67         LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
68         return errCode;
69     }
70     {
71         std::lock_guard<std::mutex> lockGuard(metadataLock_);
72         metadataMap_.clear();
73     }
74     (void)querySyncWaterMarkHelper_.Initialize(storage);
75     return LoadAllMetadata();
76 }
77 
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)78 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
79 {
80     MetaDataValue metadata;
81     std::lock_guard<std::mutex> lockGuard(metadataLock_);
82     GetMetaDataValue(deviceId, metadata, true);
83     metadata.timeOffset = inValue;
84     metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
85     LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
86     return SaveMetaDataValue(deviceId, metadata);
87 }
88 
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)89 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
90 {
91     MetaDataValue metadata;
92     std::lock_guard<std::mutex> lockGuard(metadataLock_);
93     GetMetaDataValue(deviceId, metadata, true);
94     outValue = metadata.timeOffset;
95 }
96 
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)97 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
98 {
99     MetaDataValue metadata;
100     std::lock_guard<std::mutex> lockGuard(metadataLock_);
101     GetMetaDataValue(deviceId, metadata, true);
102     outValue = metadata.localWaterMark;
103 }
104 
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)105 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
106 {
107     MetaDataValue metadata;
108     std::lock_guard<std::mutex> lockGuard(metadataLock_);
109     GetMetaDataValue(deviceId, metadata, true);
110     metadata.localWaterMark = inValue;
111     LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
112     return SaveMetaDataValue(deviceId, metadata);
113 }
114 
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)115 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash)
116 {
117     MetaDataValue metadata;
118     std::lock_guard<std::mutex> lockGuard(metadataLock_);
119     GetMetaDataValue(deviceId, metadata, isNeedHash);
120     outValue = metadata.peerWaterMark;
121 }
122 
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)123 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
124 {
125     MetaDataValue metadata;
126     std::lock_guard<std::mutex> lockGuard(metadataLock_);
127     GetMetaDataValue(deviceId, metadata, isNeedHash);
128     metadata.peerWaterMark = inValue;
129     LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
130     return SaveMetaDataValue(deviceId, metadata, isNeedHash);
131 }
132 
SaveLocalTimeOffset(TimeOffset timeOffset)133 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
134 {
135     std::string timeOffsetString = std::to_string(timeOffset);
136     std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
137     std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY);
138     std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end());
139 
140     std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
141     localTimeOffset_ = timeOffset;
142     LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
143     int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
144     if (errCode != E_OK) {
145         LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
146     }
147     return errCode;
148 }
149 
GetLocalTimeOffset() const150 TimeOffset Metadata::GetLocalTimeOffset() const
151 {
152     TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
153     return localTimeOffset;
154 }
155 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)156 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
157 {
158     return EraseDeviceWaterMark(deviceId, isNeedHash, "");
159 }
160 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)161 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
162 {
163     // reload meta data again
164     (void)LoadAllMetadata();
165     std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
166     // try to erase all the waterMark
167     // erase deleteSync recv waterMark
168     WaterMark waterMark = 0;
169     int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
170     if (errCodeDeleteSync != E_OK) {
171         LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
172         return errCodeDeleteSync;
173     }
174     // erase querySync recv waterMark
175     int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
176     if (errCodeQuerySync != E_OK) {
177         LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
178         return errCodeQuerySync;
179     }
180     // peerWaterMark must be erased at last
181     int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
182     if (errCode != E_OK) {
183         LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
184         return errCode;
185     }
186     return E_OK;
187 }
188 
SetLastLocalTime(Timestamp lastLocalTime)189 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
190 {
191     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
192     if (lastLocalTime > lastLocalTime_) {
193         lastLocalTime_ = lastLocalTime;
194     }
195 }
196 
GetLastLocalTime() const197 Timestamp Metadata::GetLastLocalTime() const
198 {
199     std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
200     return lastLocalTime_;
201 }
202 
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)203 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
204 {
205     std::vector<uint8_t> value;
206     int errCode = SerializeMetaData(inValue, value);
207     if (errCode != E_OK) {
208         return errCode;
209     }
210 
211     DeviceID hashDeviceId;
212     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
213     std::vector<uint8_t> key;
214     DBCommon::StringToVector(hashDeviceId, key);
215     errCode = SetMetadataToDb(key, value);
216     if (errCode != E_OK) {
217         LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
218         return errCode;
219     }
220     PutMetadataToMap(hashDeviceId, inValue);
221     return E_OK;
222 }
223 
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)224 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
225 {
226     DeviceID hashDeviceId;
227     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
228     GetMetadataFromMap(hashDeviceId, outValue);
229 }
230 
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)231 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
232 {
233     outValue.resize(sizeof(MetaDataValue));
234     errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
235     if (err != EOK) {
236         return -E_SECUREC_ERROR;
237     }
238     return E_OK;
239 }
240 
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)241 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
242 {
243     if (inValue.empty()) {
244         return -E_INVALID_ARGS;
245     }
246     errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
247     if (err != EOK) {
248         return -E_SECUREC_ERROR;
249     }
250     return E_OK;
251 }
252 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const253 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
254 {
255     if (naturalStoragePtr_ == nullptr) {
256         return -E_INVALID_DB;
257     }
258     return naturalStoragePtr_->GetMetaData(key, outValue);
259 }
260 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)261 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
262 {
263     if (naturalStoragePtr_ == nullptr) {
264         return -E_INVALID_DB;
265     }
266     return naturalStoragePtr_->PutMetaData(key, inValue, false);
267 }
268 
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)269 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
270 {
271     metadataMap_[deviceId] = value;
272 }
273 
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)274 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
275 {
276     if (metadataMap_.find(deviceId) == metadataMap_.end() && RuntimeContext::GetInstance()->IsTimeChanged()) {
277         metadataMap_[deviceId].syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
278     }
279     outValue = metadataMap_[deviceId];
280 }
281 
StringToLong(const std::vector<uint8_t> & value) const282 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
283 {
284     std::string valueString(value.begin(), value.end());
285     int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE);
286     LOGD("Metadata::StringToLong longData = %" PRId64, longData);
287     return longData;
288 }
289 
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)290 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
291 {
292     if (naturalStoragePtr_ == nullptr) {
293         return -E_INVALID_DB;
294     }
295     return naturalStoragePtr_->GetAllMetaKeys(keys);
296 }
297 
298 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)299 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
300 {
301     if (inKey.size() < expectPrefix.size()) {
302         return false;
303     }
304     std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
305     if (prefixInKey != expectPrefix) {
306         return false;
307     }
308     return true;
309 }
310 }
311 
LoadAllMetadata()312 int Metadata::LoadAllMetadata()
313 {
314     std::vector<std::vector<uint8_t>> metaDataKeys;
315     int errCode = GetAllMetadataKey(metaDataKeys);
316     if (errCode != E_OK) {
317         LOGE("[Metadata] get all metadata key failed err=%d", errCode);
318         return errCode;
319     }
320 
321     std::vector<std::vector<uint8_t>> querySyncIds;
322     for (const auto &deviceId : metaDataKeys) {
323         if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
324             errCode = LoadDeviceIdDataToMap(deviceId);
325             if (errCode != E_OK) {
326                 return errCode;
327             }
328         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
329             querySyncIds.push_back(deviceId);
330         } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
331             errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
332             if (errCode != E_OK) {
333                 return errCode;
334             }
335         }
336     }
337     errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
338     if (errCode != E_OK) {
339         return errCode;
340     }
341     return InitLocalMetaData();
342 }
343 
LoadDeviceIdDataToMap(const Key & key)344 int Metadata::LoadDeviceIdDataToMap(const Key &key)
345 {
346     std::vector<uint8_t> value;
347     int errCode = GetMetadataFromDb(key, value);
348     if (errCode != E_OK) {
349         return errCode;
350     }
351     MetaDataValue metaValue;
352     std::string metaKey(key.begin(), key.end());
353     errCode = DeSerializeMetaData(value, metaValue);
354     if (errCode != E_OK) {
355         return errCode;
356     }
357     std::lock_guard<std::mutex> lockGuard(metadataLock_);
358     PutMetadataToMap(metaKey, metaValue);
359     return errCode;
360 }
361 
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)362 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
363 {
364     if (!isNeedHash) {
365         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
366         return;
367     }
368     if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
369         hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
370         deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
371     } else {
372         hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
373     }
374 }
375 
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)376 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
377     const std::string &deviceId, WaterMark &waterMark)
378 {
379     QueryWaterMark queryWaterMark;
380     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
381     if (errCode != E_OK) {
382         return errCode;
383     }
384     WaterMark peerWaterMark;
385     GetPeerWaterMark(deviceId, peerWaterMark);
386     waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
387     return E_OK;
388 }
389 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)390 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
391     const std::string &deviceId, const WaterMark &waterMark)
392 {
393     return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
394 }
395 
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)396 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
397     const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
398 {
399     QueryWaterMark queryWaterMark;
400     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
401     if (errCode != E_OK) {
402         return errCode;
403     }
404     if (isAutoLift) {
405         WaterMark localWaterMark;
406         GetLocalWaterMark(deviceId, localWaterMark);
407         waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
408     } else {
409         waterMark = queryWaterMark.sendWaterMark;
410     }
411     return E_OK;
412 }
413 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)414 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
415     const std::string &deviceId, const WaterMark &waterMark)
416 {
417     return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
418 }
419 
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)420 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp &timestamp)
421 {
422     QueryWaterMark queryWaterMark;
423     int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
424     if (errCode != E_OK) {
425         return errCode;
426     }
427     timestamp = queryWaterMark.lastQueryTime;
428     return E_OK;
429 }
430 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)431 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
432     const Timestamp &timestamp)
433 {
434     return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
435 }
436 
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)437 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
438 {
439     DeleteWaterMark deleteWaterMark;
440     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
441     if (errCode != E_OK) {
442         return errCode;
443     }
444     if (isAutoLift) {
445         WaterMark localWaterMark;
446         GetLocalWaterMark(deviceId, localWaterMark);
447         waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
448     } else {
449         waterMark = deleteWaterMark.sendWaterMark;
450     }
451     return E_OK;
452 }
453 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)454 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
455 {
456     return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
457 }
458 
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)459 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
460 {
461     DeleteWaterMark deleteWaterMark;
462     int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
463     if (errCode != E_OK) {
464         return errCode;
465     }
466     WaterMark peerWaterMark;
467     GetPeerWaterMark(deviceId, peerWaterMark);
468     waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
469     return E_OK;
470 }
471 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)472 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
473 {
474     return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
475 }
476 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)477 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
478 {
479     return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
480 }
481 
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)482 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
483 {
484     MetaDataValue metadata;
485     std::lock_guard<std::mutex> lockGuard(metadataLock_);
486     DeviceID hashDeviceId;
487     GetHashDeviceId(deviceId, hashDeviceId, true);
488     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
489         metadata = metadataMap_[hashDeviceId];
490         outValue = metadata.dbCreateTime;
491         return;
492     }
493     outValue = 0;
494     LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
495 }
496 
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)497 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
498 {
499     MetaDataValue metadata;
500     std::lock_guard<std::mutex> lockGuard(metadataLock_);
501     DeviceID hashDeviceId;
502     GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
503     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
504         metadata = metadataMap_[hashDeviceId];
505         if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
506             metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
507             LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
508                 STR_MASK(deviceId), inValue);
509         }
510         if (metadata.dbCreateTime == 0) {
511             LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
512         }
513     }
514     metadata.dbCreateTime = inValue;
515     return SaveMetaDataValue(deviceId, metadata);
516 }
517 
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)518 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
519 {
520     MetaDataValue metadata;
521     std::lock_guard<std::mutex> lockGuard(metadataLock_);
522     DeviceID hashDeviceId;
523     GetHashDeviceId(deviceId, hashDeviceId, true);
524     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
525         metadata = metadataMap_[hashDeviceId];
526         metadata.clearDeviceDataMark = 0; // clear mark
527         return SaveMetaDataValue(deviceId, metadata);
528     }
529     return -E_NOT_FOUND;
530 }
531 
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)532 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
533 {
534     MetaDataValue metadata;
535     std::lock_guard<std::mutex> lockGuard(metadataLock_);
536     DeviceID hashDeviceId;
537     GetHashDeviceId(deviceId, hashDeviceId, true);
538     if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
539         metadata = metadataMap_[hashDeviceId];
540         outValue = metadata.clearDeviceDataMark;
541         return;
542     }
543     outValue = 0;
544 }
545 
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const546 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
547 {
548     std::vector<uint8_t> key;
549     std::vector<uint8_t> value;
550     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
551     DBCommon::StringToVector(hashqueryId, key);
552     int errCode = GetMetadataFromDb(key, value);
553     std::lock_guard<std::mutex> lockGuard(metadataLock_);
554     if (errCode == -E_NOT_FOUND) {
555         auto iter = queryIdMap_.find(deviceId);
556         if (iter != queryIdMap_.end()) {
557             if (iter->second.find(hashqueryId) == iter->second.end()) {
558                 iter->second.insert(hashqueryId);
559                 return INT64_MAX;
560             }
561             return 0;
562         } else {
563             queryIdMap_[deviceId] = { hashqueryId };
564             return INT64_MAX;
565         }
566     }
567     auto iter = queryIdMap_.find(deviceId);
568     // while value is found in db, it can be found in db later when db is not closed
569     // so no need to record the hashqueryId in map
570     if (errCode == E_OK && iter != queryIdMap_.end()) {
571         iter->second.erase(hashqueryId);
572     }
573     return StringToLong(value);
574 }
575 
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)576 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
577 {
578     std::lock_guard<std::mutex> lockGuard(metadataLock_);
579     std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
580     auto iter = queryIdMap_.find(deviceId);
581     if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
582         iter->second.erase(hashqueryId);
583     }
584 }
585 
SaveClientId(const std::string & deviceId,const std::string & clientId)586 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
587 {
588     {
589         // already save in cache
590         std::lock_guard<std::mutex> autoLock(clientIdLock_);
591         if (clientIdCache_[deviceId] == clientId) {
592             return E_OK;
593         }
594     }
595     std::string keyStr;
596     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
597     std::string valueStr = DBCommon::TransferHashString(deviceId);
598     Key key;
599     DBCommon::StringToVector(keyStr, key);
600     Value value;
601     DBCommon::StringToVector(valueStr, value);
602     int errCode = SetMetadataToDb(key, value);
603     if (errCode != E_OK) {
604         return errCode;
605     }
606     std::lock_guard<std::mutex> autoLock(clientIdLock_);
607     clientIdCache_[deviceId] = clientId;
608     return E_OK;
609 }
610 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const611 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
612 {
613     // don't use cache here avoid invalid cache
614     std::string keyStr;
615     keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
616     Key key;
617     DBCommon::StringToVector(keyStr, key);
618     Value value;
619     int errCode = GetMetadataFromDb(key, value);
620     if (errCode == -E_NOT_FOUND) {
621         LOGD("[Metadata] not found clientId by %.3s", clientId.c_str());
622         return -E_NOT_SUPPORT;
623     }
624     if (errCode != E_OK) {
625         LOGE("[Metadata] reload clientId failed %d", errCode);
626         return errCode;
627     }
628     DBCommon::VectorToString(value, hashDevId);
629     return E_OK;
630 }
631 
LockWaterMark() const632 void Metadata::LockWaterMark() const
633 {
634     waterMarkMutex_.lock();
635 }
636 
UnlockWaterMark() const637 void Metadata::UnlockWaterMark() const
638 {
639     waterMarkMutex_.unlock();
640 }
641 
GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)642 int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info)
643 {
644     Key devKey;
645     DeviceID hashDeviceId;
646     {
647         std::lock_guard<std::mutex> lockGuard(metadataLock_);
648         GetHashDeviceId(dev, hashDeviceId, isNeedHash);
649         DBCommon::StringToVector(hashDeviceId, devKey);
650     }
651     // read from db avoid watermark update in diff process
652     int errCode = LoadDeviceIdDataToMap(devKey);
653     if (errCode == -E_NOT_FOUND) {
654         LOGD("[Metadata] not found meta value");
655         return E_OK;
656     }
657     if (errCode != E_OK) {
658         LOGE("[Metadata] reload meta value failed %d", errCode);
659         return errCode;
660     }
661     {
662         std::lock_guard<std::mutex> lockGuard(metadataLock_);
663         MetaDataValue metadata;
664         GetMetadataFromMap(hashDeviceId, metadata);
665         info.sendMark = metadata.localWaterMark;
666         info.receiveMark = metadata.peerWaterMark;
667     }
668     return E_OK;
669 }
670 
ClearAllAbilitySyncFinishMark()671 int Metadata::ClearAllAbilitySyncFinishMark()
672 {
673     return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) |
674         static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION));
675 }
676 
ClearAllTimeSyncFinishMark()677 int Metadata::ClearAllTimeSyncFinishMark()
678 {
679     return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) |
680         static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) |
681         static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK));
682 }
683 
ClearAllMetaDataValue(uint32_t innerClearAction)684 int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction)
685 {
686     int errCode = E_OK;
687     std::lock_guard<std::mutex> lockGuard(metadataLock_);
688     for (auto &[hashDev, metaDataValue] : metadataMap_) {
689         ClearMetaDataValue(innerClearAction, metaDataValue);
690         std::vector<uint8_t> value;
691         int innerErrCode = SerializeMetaData(metaDataValue, value);
692         // clear sync mark without transaction here
693         // just ignore error metadata value
694         if (innerErrCode != E_OK) {
695             LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction,
696                 innerErrCode);
697             errCode = errCode == E_OK ? innerErrCode : errCode;
698             continue;
699         }
700 
701         std::vector<uint8_t> key;
702         DBCommon::StringToVector(hashDev, key);
703         innerErrCode = SetMetadataToDb(key, value);
704         if (innerErrCode != E_OK) {
705             LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction,
706                 innerErrCode);
707             errCode = errCode == E_OK ? innerErrCode : errCode;
708         }
709     }
710     if (errCode == E_OK) {
711         LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction);
712     }
713     return errCode;
714 }
715 
ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)716 void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue)
717 {
718     auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK);
719     if ((innerClearAction & mark) == mark) {
720         metaDataValue.syncMark =
721             DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC));
722     }
723     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK);
724     if ((innerClearAction & mark) == mark) {
725         metaDataValue.syncMark =
726             DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC));
727     }
728     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION);
729     if ((innerClearAction & mark) == mark) {
730         metaDataValue.remoteSchemaVersion = 0;
731     }
732     mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET);
733     if ((innerClearAction & mark) == mark) {
734         metaDataValue.systemTimeOffset = 0;
735     }
736     mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK);
737     if ((innerClearAction & mark) == mark) {
738         metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
739     }
740 }
741 
SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)742 int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish)
743 {
744     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish);
745 }
746 
IsAbilitySyncFinish(const std::string & deviceId)747 bool Metadata::IsAbilitySyncFinish(const std::string &deviceId)
748 {
749     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC);
750 }
751 
SetTimeSyncFinishMark(const std::string & deviceId,bool finish)752 int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish)
753 {
754     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish);
755 }
756 
SetTimeChangeMark(const std::string & deviceId,bool change)757 int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change)
758 {
759     return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change);
760 }
761 
IsTimeSyncFinish(const std::string & deviceId)762 bool Metadata::IsTimeSyncFinish(const std::string &deviceId)
763 {
764     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC);
765 }
766 
IsTimeChange(const std::string & deviceId)767 bool Metadata::IsTimeChange(const std::string &deviceId)
768 {
769     return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE);
770 }
771 
SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)772 int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish)
773 {
774     MetaDataValue metadata;
775     std::lock_guard<std::mutex> lockGuard(metadataLock_);
776     GetMetaDataValue(deviceId, metadata, true);
777     auto mark = static_cast<uint64_t>(syncMark);
778     if (finish) {
779         metadata.syncMark |= mark;
780     } else {
781         metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark);
782     }
783     LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish),
784         metadata.syncMark);
785     return SaveMetaDataValue(deviceId, metadata);
786 }
787 
IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)788 bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark)
789 {
790     MetaDataValue metadata;
791     std::lock_guard<std::mutex> lockGuard(metadataLock_);
792     GetMetaDataValue(deviceId, metadata, true);
793     auto mark = static_cast<uint64_t>(syncMark);
794     return (metadata.syncMark & mark) == mark;
795 }
796 
SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)797 int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion)
798 {
799     MetaDataValue metadata;
800     std::lock_guard<std::mutex> lockGuard(metadataLock_);
801     GetMetaDataValue(deviceId, metadata, true);
802     metadata.remoteSchemaVersion = schemaVersion;
803     LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion);
804     int errCode = SaveMetaDataValue(deviceId, metadata);
805     if (errCode != E_OK) {
806         LOGW("[Metadata] Set remote schema version failed");
807     }
808     return errCode;
809 }
810 
GetRemoteSchemaVersion(const std::string & deviceId)811 uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId)
812 {
813     MetaDataValue metadata;
814     std::lock_guard<std::mutex> lockGuard(metadataLock_);
815     GetMetaDataValue(deviceId, metadata, true);
816     LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion);
817     return metadata.remoteSchemaVersion;
818 }
819 
SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)820 int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset)
821 {
822     MetaDataValue metadata;
823     std::lock_guard<std::mutex> lockGuard(metadataLock_);
824     GetMetaDataValue(deviceId, metadata, true);
825     metadata.systemTimeOffset = systemTimeOffset;
826     LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset);
827     return SaveMetaDataValue(deviceId, metadata);
828 }
829 
GetSystemTimeOffset(const std::string & deviceId)830 int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId)
831 {
832     MetaDataValue metadata;
833     std::lock_guard<std::mutex> lockGuard(metadataLock_);
834     GetMetaDataValue(deviceId, metadata, true);
835     LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset);
836     return metadata.systemTimeOffset;
837 }
838 
GetLocalSchemaVersion()839 std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion()
840 {
841     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
842     auto [errCode, localMetaData] = GetLocalMetaData();
843     if (errCode != E_OK) {
844         return {errCode, 0};
845     }
846     LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion);
847     return {errCode, localMetaData.localSchemaVersion};
848 }
849 
SetLocalSchemaVersion(uint64_t schemaVersion)850 int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion)
851 {
852     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
853     auto [errCode, localMetaData] = GetLocalMetaData();
854     if (errCode != E_OK) {
855         return errCode;
856     }
857     localMetaData.localSchemaVersion = schemaVersion;
858     LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion);
859     return SaveLocalMetaData(localMetaData);
860 }
861 
SaveLocalMetaData(const LocalMetaData & localMetaData)862 int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData)
863 {
864     auto [errCode, value] = SerializeLocalMetaData(localMetaData);
865     if (errCode != E_OK) {
866         LOGE("[Metadata] Serialize local meta data failed %d", errCode);
867         return errCode;
868     }
869     std::string k(LOCAL_META_DATA_KEY);
870     Key key(k.begin(), k.end());
871     return SetMetadataToDb(key, value);
872 }
873 
GetLocalMetaData()874 std::pair<int, LocalMetaData> Metadata::GetLocalMetaData()
875 {
876     std::string k(LOCAL_META_DATA_KEY);
877     Key key(k.begin(), k.end());
878     Value value;
879     int errCode = GetMetadataFromDb(key, value);
880     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
881         return {errCode, {}};
882     }
883     return DeSerializeLocalMetaData(value);
884 }
885 
SerializeLocalMetaData(const LocalMetaData & localMetaData)886 std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData)
887 {
888     std::pair<int, Value> res = {E_OK, {}};
889     auto &[errCode, value] = res;
890     value.resize(CalculateLocalMetaDataLength());
891     Parcel parcel(value.data(), value.size());
892     (void)parcel.WriteUInt32(localMetaData.version);
893     parcel.EightByteAlign();
894     (void)parcel.WriteUInt64(localMetaData.localSchemaVersion);
895     if (parcel.IsError()) {
896         LOGE("[Metadata] Serialize localMetaData failed");
897         errCode = -E_SERIALIZE_ERROR;
898         value.clear();
899         return res;
900     }
901     return res;
902 }
903 
DeSerializeLocalMetaData(const Value & value)904 std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value)
905 {
906     std::pair<int, LocalMetaData> res;
907     auto &[errCode, meta] = res;
908     if (value.empty()) {
909         errCode = E_OK;
910         return res;
911     }
912     Parcel parcel(const_cast<uint8_t *>(value.data()), value.size());
913     parcel.ReadUInt32(meta.version);
914     if (meta.version >= LOCAL_META_DATA_VERSION_V2) {
915         parcel.EightByteAlign();
916     }
917     parcel.ReadUInt64(meta.localSchemaVersion);
918     if (parcel.IsError()) {
919         LOGE("[Metadata] DeSerialize localMetaData failed");
920         errCode = -E_SERIALIZE_ERROR;
921         return res;
922     }
923     return res;
924 }
925 
CalculateLocalMetaDataLength()926 uint64_t Metadata::CalculateLocalMetaDataLength()
927 {
928     uint64_t length = Parcel::GetUInt32Len(); // version
929     length = Parcel::GetEightByteAlign(length);
930     length += Parcel::GetUInt64Len(); // local schema version
931     return length;
932 }
933 
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)934 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
935     : metadataPtr_(std::move(metadata))
936 {
937     if (metadataPtr_ != nullptr) {
938         metadataPtr_->LockWaterMark();
939     }
940 }
941 
~MetaWaterMarkAutoLock()942 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
943 {
944     if (metadataPtr_ != nullptr) {
945         metadataPtr_->UnlockWaterMark();
946     }
947 }
948 
InitLocalMetaData()949 int Metadata::InitLocalMetaData()
950 {
951     std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
952     auto [errCode, localMetaData] = GetLocalMetaData();
953     if (errCode != E_OK) {
954         return errCode;
955     }
956     if (localMetaData.localSchemaVersion != 0) {
957         return E_OK;
958     }
959     uint64_t curTime = 0u;
960     errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
961     if (errCode != E_OK) {
962         LOGW("[Metadata] get system time failed when init schema version!");
963         localMetaData.localSchemaVersion += 1;
964     } else {
965         localMetaData.localSchemaVersion = curTime;
966     }
967     errCode = SaveLocalMetaData(localMetaData);
968     if (errCode != E_OK) {
969         LOGE("[Metadata] init local schema version failed:%d", errCode);
970     }
971     return errCode;
972 }
973 }  // namespace DistributedDB