1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "meta_data.h"
17
18 #include <openssl/rand.h>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "platform_specific.h"
26 #include "securec.h"
27 #include "sync_types.h"
28 #include "time_helper.h"
29 #include "version.h"
30
31 namespace DistributedDB {
32 namespace {
33 // store local timeoffset;this is a special key;
34 constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId";
35 constexpr const char *LOCAL_META_DATA_KEY = "localMetaData";
36 }
37
Metadata()38 Metadata::Metadata()
39 : localTimeOffset_(0),
40 naturalStoragePtr_(nullptr),
41 lastLocalTime_(0)
42 {}
43
~Metadata()44 Metadata::~Metadata()
45 {
46 naturalStoragePtr_ = nullptr;
47 metadataMap_.clear();
48 }
49
Initialize(ISyncInterface * storage)50 int Metadata::Initialize(ISyncInterface* storage)
51 {
52 naturalStoragePtr_ = storage;
53 std::vector<uint8_t> key;
54 std::vector<uint8_t> timeOffset;
55 DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key);
56
57 int errCode = GetMetadataFromDb(key, timeOffset);
58 if (errCode == -E_NOT_FOUND) {
59 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET);
60 if (err != E_OK) {
61 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err);
62 return err;
63 }
64 } else if (errCode == E_OK) {
65 localTimeOffset_ = StringToLong(timeOffset);
66 } else {
67 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode);
68 return errCode;
69 }
70 {
71 std::lock_guard<std::mutex> lockGuard(metadataLock_);
72 metadataMap_.clear();
73 }
74 (void)querySyncWaterMarkHelper_.Initialize(storage);
75 return LoadAllMetadata();
76 }
77
SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)78 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue)
79 {
80 MetaDataValue metadata;
81 std::lock_guard<std::mutex> lockGuard(metadataLock_);
82 GetMetaDataValue(deviceId, metadata, true);
83 metadata.timeOffset = inValue;
84 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime();
85 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId));
86 return SaveMetaDataValue(deviceId, metadata);
87 }
88
GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)89 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue)
90 {
91 MetaDataValue metadata;
92 std::lock_guard<std::mutex> lockGuard(metadataLock_);
93 GetMetaDataValue(deviceId, metadata, true);
94 outValue = metadata.timeOffset;
95 }
96
GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)97 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue)
98 {
99 MetaDataValue metadata;
100 std::lock_guard<std::mutex> lockGuard(metadataLock_);
101 GetMetaDataValue(deviceId, metadata, true);
102 outValue = metadata.localWaterMark;
103 }
104
SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)105 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue)
106 {
107 MetaDataValue metadata;
108 std::lock_guard<std::mutex> lockGuard(metadataLock_);
109 GetMetaDataValue(deviceId, metadata, true);
110 metadata.localWaterMark = inValue;
111 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue);
112 return SaveMetaDataValue(deviceId, metadata);
113 }
114
GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)115 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash)
116 {
117 MetaDataValue metadata;
118 std::lock_guard<std::mutex> lockGuard(metadataLock_);
119 GetMetaDataValue(deviceId, metadata, isNeedHash);
120 outValue = metadata.peerWaterMark;
121 }
122
SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)123 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
124 {
125 MetaDataValue metadata;
126 std::lock_guard<std::mutex> lockGuard(metadataLock_);
127 GetMetaDataValue(deviceId, metadata, isNeedHash);
128 metadata.peerWaterMark = inValue;
129 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue);
130 return SaveMetaDataValue(deviceId, metadata, isNeedHash);
131 }
132
SaveLocalTimeOffset(TimeOffset timeOffset)133 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset)
134 {
135 std::string timeOffsetString = std::to_string(timeOffset);
136 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end());
137 std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY);
138 std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end());
139
140 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_);
141 localTimeOffset_ = timeOffset;
142 LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset);
143 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue);
144 if (errCode != E_OK) {
145 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode);
146 }
147 return errCode;
148 }
149
GetLocalTimeOffset() const150 TimeOffset Metadata::GetLocalTimeOffset() const
151 {
152 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst);
153 return localTimeOffset;
154 }
155
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)156 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
157 {
158 return EraseDeviceWaterMark(deviceId, isNeedHash, "");
159 }
160
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)161 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName)
162 {
163 // reload meta data again
164 (void)LoadAllMetadata();
165 std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_);
166 // try to erase all the waterMark
167 // erase deleteSync recv waterMark
168 WaterMark waterMark = 0;
169 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
170 if (errCodeDeleteSync != E_OK) {
171 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync);
172 return errCodeDeleteSync;
173 }
174 // erase querySync recv waterMark
175 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
176 if (errCodeQuerySync != E_OK) {
177 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync);
178 return errCodeQuerySync;
179 }
180 // peerWaterMark must be erased at last
181 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash);
182 if (errCode != E_OK) {
183 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode);
184 return errCode;
185 }
186 return E_OK;
187 }
188
SetLastLocalTime(Timestamp lastLocalTime)189 void Metadata::SetLastLocalTime(Timestamp lastLocalTime)
190 {
191 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
192 if (lastLocalTime > lastLocalTime_) {
193 lastLocalTime_ = lastLocalTime;
194 }
195 }
196
GetLastLocalTime() const197 Timestamp Metadata::GetLastLocalTime() const
198 {
199 std::lock_guard<std::mutex> lock(lastLocalTimeLock_);
200 return lastLocalTime_;
201 }
202
SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)203 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash)
204 {
205 std::vector<uint8_t> value;
206 int errCode = SerializeMetaData(inValue, value);
207 if (errCode != E_OK) {
208 return errCode;
209 }
210
211 DeviceID hashDeviceId;
212 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
213 std::vector<uint8_t> key;
214 DBCommon::StringToVector(hashDeviceId, key);
215 errCode = SetMetadataToDb(key, value);
216 if (errCode != E_OK) {
217 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode);
218 return errCode;
219 }
220 PutMetadataToMap(hashDeviceId, inValue);
221 return E_OK;
222 }
223
GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)224 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash)
225 {
226 DeviceID hashDeviceId;
227 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
228 GetMetadataFromMap(hashDeviceId, outValue);
229 }
230
SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)231 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue)
232 {
233 outValue.resize(sizeof(MetaDataValue));
234 errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue));
235 if (err != EOK) {
236 return -E_SECUREC_ERROR;
237 }
238 return E_OK;
239 }
240
DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)241 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue)
242 {
243 if (inValue.empty()) {
244 return -E_INVALID_ARGS;
245 }
246 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size());
247 if (err != EOK) {
248 return -E_SECUREC_ERROR;
249 }
250 return E_OK;
251 }
252
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const253 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const
254 {
255 if (naturalStoragePtr_ == nullptr) {
256 return -E_INVALID_DB;
257 }
258 return naturalStoragePtr_->GetMetaData(key, outValue);
259 }
260
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)261 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
262 {
263 if (naturalStoragePtr_ == nullptr) {
264 return -E_INVALID_DB;
265 }
266 return naturalStoragePtr_->PutMetaData(key, inValue, false);
267 }
268
PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)269 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value)
270 {
271 metadataMap_[deviceId] = value;
272 }
273
GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)274 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue)
275 {
276 if (metadataMap_.find(deviceId) == metadataMap_.end() && RuntimeContext::GetInstance()->IsTimeChanged()) {
277 metadataMap_[deviceId].syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
278 }
279 outValue = metadataMap_[deviceId];
280 }
281
StringToLong(const std::vector<uint8_t> & value) const282 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const
283 {
284 std::string valueString(value.begin(), value.end());
285 int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE);
286 LOGD("Metadata::StringToLong longData = %" PRId64, longData);
287 return longData;
288 }
289
GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)290 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys)
291 {
292 if (naturalStoragePtr_ == nullptr) {
293 return -E_INVALID_DB;
294 }
295 return naturalStoragePtr_->GetAllMetaKeys(keys);
296 }
297
298 namespace {
IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)299 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix)
300 {
301 if (inKey.size() < expectPrefix.size()) {
302 return false;
303 }
304 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size());
305 if (prefixInKey != expectPrefix) {
306 return false;
307 }
308 return true;
309 }
310 }
311
LoadAllMetadata()312 int Metadata::LoadAllMetadata()
313 {
314 std::vector<std::vector<uint8_t>> metaDataKeys;
315 int errCode = GetAllMetadataKey(metaDataKeys);
316 if (errCode != E_OK) {
317 LOGE("[Metadata] get all metadata key failed err=%d", errCode);
318 return errCode;
319 }
320
321 std::vector<std::vector<uint8_t>> querySyncIds;
322 for (const auto &deviceId : metaDataKeys) {
323 if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) {
324 errCode = LoadDeviceIdDataToMap(deviceId);
325 if (errCode != E_OK) {
326 return errCode;
327 }
328 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) {
329 querySyncIds.push_back(deviceId);
330 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) {
331 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId);
332 if (errCode != E_OK) {
333 return errCode;
334 }
335 }
336 }
337 errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds);
338 if (errCode != E_OK) {
339 return errCode;
340 }
341 return InitLocalMetaData();
342 }
343
LoadDeviceIdDataToMap(const Key & key)344 int Metadata::LoadDeviceIdDataToMap(const Key &key)
345 {
346 std::vector<uint8_t> value;
347 int errCode = GetMetadataFromDb(key, value);
348 if (errCode != E_OK) {
349 return errCode;
350 }
351 MetaDataValue metaValue;
352 std::string metaKey(key.begin(), key.end());
353 errCode = DeSerializeMetaData(value, metaValue);
354 if (errCode != E_OK) {
355 return errCode;
356 }
357 std::lock_guard<std::mutex> lockGuard(metadataLock_);
358 PutMetadataToMap(metaKey, metaValue);
359 return errCode;
360 }
361
GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)362 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash)
363 {
364 if (!isNeedHash) {
365 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId;
366 return;
367 }
368 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) {
369 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId);
370 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId));
371 } else {
372 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId];
373 }
374 }
375
GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)376 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify,
377 const std::string &deviceId, WaterMark &waterMark)
378 {
379 QueryWaterMark queryWaterMark;
380 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
381 if (errCode != E_OK) {
382 return errCode;
383 }
384 WaterMark peerWaterMark;
385 GetPeerWaterMark(deviceId, peerWaterMark);
386 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark);
387 return E_OK;
388 }
389
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)390 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify,
391 const std::string &deviceId, const WaterMark &waterMark)
392 {
393 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
394 }
395
GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)396 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify,
397 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift)
398 {
399 QueryWaterMark queryWaterMark;
400 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
401 if (errCode != E_OK) {
402 return errCode;
403 }
404 if (isAutoLift) {
405 WaterMark localWaterMark;
406 GetLocalWaterMark(deviceId, localWaterMark);
407 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark);
408 } else {
409 waterMark = queryWaterMark.sendWaterMark;
410 }
411 return E_OK;
412 }
413
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)414 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify,
415 const std::string &deviceId, const WaterMark &waterMark)
416 {
417 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark);
418 }
419
GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)420 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp)
421 {
422 QueryWaterMark queryWaterMark;
423 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark);
424 if (errCode != E_OK) {
425 return errCode;
426 }
427 timestamp = queryWaterMark.lastQueryTime;
428 return E_OK;
429 }
430
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)431 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId,
432 const Timestamp ×tamp)
433 {
434 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp);
435 }
436
GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)437 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift)
438 {
439 DeleteWaterMark deleteWaterMark;
440 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
441 if (errCode != E_OK) {
442 return errCode;
443 }
444 if (isAutoLift) {
445 WaterMark localWaterMark;
446 GetLocalWaterMark(deviceId, localWaterMark);
447 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark);
448 } else {
449 waterMark = deleteWaterMark.sendWaterMark;
450 }
451 return E_OK;
452 }
453
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)454 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
455 {
456 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark);
457 }
458
GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)459 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
460 {
461 DeleteWaterMark deleteWaterMark;
462 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark);
463 if (errCode != E_OK) {
464 return errCode;
465 }
466 WaterMark peerWaterMark;
467 GetPeerWaterMark(deviceId, peerWaterMark);
468 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark);
469 return E_OK;
470 }
471
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)472 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash)
473 {
474 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash);
475 }
476
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)477 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash)
478 {
479 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash);
480 }
481
GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)482 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue)
483 {
484 MetaDataValue metadata;
485 std::lock_guard<std::mutex> lockGuard(metadataLock_);
486 DeviceID hashDeviceId;
487 GetHashDeviceId(deviceId, hashDeviceId, true);
488 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
489 metadata = metadataMap_[hashDeviceId];
490 outValue = metadata.dbCreateTime;
491 return;
492 }
493 outValue = 0;
494 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId));
495 }
496
SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)497 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash)
498 {
499 MetaDataValue metadata;
500 std::lock_guard<std::mutex> lockGuard(metadataLock_);
501 DeviceID hashDeviceId;
502 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash);
503 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
504 metadata = metadataMap_[hashDeviceId];
505 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) {
506 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK;
507 LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64,
508 STR_MASK(deviceId), inValue);
509 }
510 if (metadata.dbCreateTime == 0) {
511 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue);
512 }
513 }
514 metadata.dbCreateTime = inValue;
515 return SaveMetaDataValue(deviceId, metadata);
516 }
517
ResetMetaDataAfterRemoveData(const DeviceID & deviceId)518 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId)
519 {
520 MetaDataValue metadata;
521 std::lock_guard<std::mutex> lockGuard(metadataLock_);
522 DeviceID hashDeviceId;
523 GetHashDeviceId(deviceId, hashDeviceId, true);
524 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
525 metadata = metadataMap_[hashDeviceId];
526 metadata.clearDeviceDataMark = 0; // clear mark
527 return SaveMetaDataValue(deviceId, metadata);
528 }
529 return -E_NOT_FOUND;
530 }
531
GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)532 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue)
533 {
534 MetaDataValue metadata;
535 std::lock_guard<std::mutex> lockGuard(metadataLock_);
536 DeviceID hashDeviceId;
537 GetHashDeviceId(deviceId, hashDeviceId, true);
538 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) {
539 metadata = metadataMap_[hashDeviceId];
540 outValue = metadata.clearDeviceDataMark;
541 return;
542 }
543 outValue = 0;
544 }
545
GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const546 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const
547 {
548 std::vector<uint8_t> key;
549 std::vector<uint8_t> value;
550 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
551 DBCommon::StringToVector(hashqueryId, key);
552 int errCode = GetMetadataFromDb(key, value);
553 std::lock_guard<std::mutex> lockGuard(metadataLock_);
554 if (errCode == -E_NOT_FOUND) {
555 auto iter = queryIdMap_.find(deviceId);
556 if (iter != queryIdMap_.end()) {
557 if (iter->second.find(hashqueryId) == iter->second.end()) {
558 iter->second.insert(hashqueryId);
559 return INT64_MAX;
560 }
561 return 0;
562 } else {
563 queryIdMap_[deviceId] = { hashqueryId };
564 return INT64_MAX;
565 }
566 }
567 auto iter = queryIdMap_.find(deviceId);
568 // while value is found in db, it can be found in db later when db is not closed
569 // so no need to record the hashqueryId in map
570 if (errCode == E_OK && iter != queryIdMap_.end()) {
571 iter->second.erase(hashqueryId);
572 }
573 return StringToLong(value);
574 }
575
RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)576 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId)
577 {
578 std::lock_guard<std::mutex> lockGuard(metadataLock_);
579 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId);
580 auto iter = queryIdMap_.find(deviceId);
581 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) {
582 iter->second.erase(hashqueryId);
583 }
584 }
585
SaveClientId(const std::string & deviceId,const std::string & clientId)586 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId)
587 {
588 {
589 // already save in cache
590 std::lock_guard<std::mutex> autoLock(clientIdLock_);
591 if (clientIdCache_[deviceId] == clientId) {
592 return E_OK;
593 }
594 }
595 std::string keyStr;
596 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
597 std::string valueStr = DBCommon::TransferHashString(deviceId);
598 Key key;
599 DBCommon::StringToVector(keyStr, key);
600 Value value;
601 DBCommon::StringToVector(valueStr, value);
602 int errCode = SetMetadataToDb(key, value);
603 if (errCode != E_OK) {
604 return errCode;
605 }
606 std::lock_guard<std::mutex> autoLock(clientIdLock_);
607 clientIdCache_[deviceId] = clientId;
608 return E_OK;
609 }
610
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const611 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
612 {
613 // don't use cache here avoid invalid cache
614 std::string keyStr;
615 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId);
616 Key key;
617 DBCommon::StringToVector(keyStr, key);
618 Value value;
619 int errCode = GetMetadataFromDb(key, value);
620 if (errCode == -E_NOT_FOUND) {
621 LOGD("[Metadata] not found clientId by %.3s", clientId.c_str());
622 return -E_NOT_SUPPORT;
623 }
624 if (errCode != E_OK) {
625 LOGE("[Metadata] reload clientId failed %d", errCode);
626 return errCode;
627 }
628 DBCommon::VectorToString(value, hashDevId);
629 return E_OK;
630 }
631
LockWaterMark() const632 void Metadata::LockWaterMark() const
633 {
634 waterMarkMutex_.lock();
635 }
636
UnlockWaterMark() const637 void Metadata::UnlockWaterMark() const
638 {
639 waterMarkMutex_.unlock();
640 }
641
GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)642 int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info)
643 {
644 Key devKey;
645 DeviceID hashDeviceId;
646 {
647 std::lock_guard<std::mutex> lockGuard(metadataLock_);
648 GetHashDeviceId(dev, hashDeviceId, isNeedHash);
649 DBCommon::StringToVector(hashDeviceId, devKey);
650 }
651 // read from db avoid watermark update in diff process
652 int errCode = LoadDeviceIdDataToMap(devKey);
653 if (errCode == -E_NOT_FOUND) {
654 LOGD("[Metadata] not found meta value");
655 return E_OK;
656 }
657 if (errCode != E_OK) {
658 LOGE("[Metadata] reload meta value failed %d", errCode);
659 return errCode;
660 }
661 {
662 std::lock_guard<std::mutex> lockGuard(metadataLock_);
663 MetaDataValue metadata;
664 GetMetadataFromMap(hashDeviceId, metadata);
665 info.sendMark = metadata.localWaterMark;
666 info.receiveMark = metadata.peerWaterMark;
667 }
668 return E_OK;
669 }
670
ClearAllAbilitySyncFinishMark()671 int Metadata::ClearAllAbilitySyncFinishMark()
672 {
673 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) |
674 static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION));
675 }
676
ClearAllTimeSyncFinishMark()677 int Metadata::ClearAllTimeSyncFinishMark()
678 {
679 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) |
680 static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) |
681 static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK));
682 }
683
ClearAllMetaDataValue(uint32_t innerClearAction)684 int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction)
685 {
686 int errCode = E_OK;
687 std::lock_guard<std::mutex> lockGuard(metadataLock_);
688 for (auto &[hashDev, metaDataValue] : metadataMap_) {
689 ClearMetaDataValue(innerClearAction, metaDataValue);
690 std::vector<uint8_t> value;
691 int innerErrCode = SerializeMetaData(metaDataValue, value);
692 // clear sync mark without transaction here
693 // just ignore error metadata value
694 if (innerErrCode != E_OK) {
695 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction,
696 innerErrCode);
697 errCode = errCode == E_OK ? innerErrCode : errCode;
698 continue;
699 }
700
701 std::vector<uint8_t> key;
702 DBCommon::StringToVector(hashDev, key);
703 innerErrCode = SetMetadataToDb(key, value);
704 if (innerErrCode != E_OK) {
705 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction,
706 innerErrCode);
707 errCode = errCode == E_OK ? innerErrCode : errCode;
708 }
709 }
710 if (errCode == E_OK) {
711 LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction);
712 }
713 return errCode;
714 }
715
ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)716 void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue)
717 {
718 auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK);
719 if ((innerClearAction & mark) == mark) {
720 metaDataValue.syncMark =
721 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC));
722 }
723 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK);
724 if ((innerClearAction & mark) == mark) {
725 metaDataValue.syncMark =
726 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC));
727 }
728 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION);
729 if ((innerClearAction & mark) == mark) {
730 metaDataValue.remoteSchemaVersion = 0;
731 }
732 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET);
733 if ((innerClearAction & mark) == mark) {
734 metaDataValue.systemTimeOffset = 0;
735 }
736 mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK);
737 if ((innerClearAction & mark) == mark) {
738 metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE);
739 }
740 }
741
SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)742 int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish)
743 {
744 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish);
745 }
746
IsAbilitySyncFinish(const std::string & deviceId)747 bool Metadata::IsAbilitySyncFinish(const std::string &deviceId)
748 {
749 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC);
750 }
751
SetTimeSyncFinishMark(const std::string & deviceId,bool finish)752 int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish)
753 {
754 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish);
755 }
756
SetTimeChangeMark(const std::string & deviceId,bool change)757 int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change)
758 {
759 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change);
760 }
761
IsTimeSyncFinish(const std::string & deviceId)762 bool Metadata::IsTimeSyncFinish(const std::string &deviceId)
763 {
764 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC);
765 }
766
IsTimeChange(const std::string & deviceId)767 bool Metadata::IsTimeChange(const std::string &deviceId)
768 {
769 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE);
770 }
771
SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)772 int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish)
773 {
774 MetaDataValue metadata;
775 std::lock_guard<std::mutex> lockGuard(metadataLock_);
776 GetMetaDataValue(deviceId, metadata, true);
777 auto mark = static_cast<uint64_t>(syncMark);
778 if (finish) {
779 metadata.syncMark |= mark;
780 } else {
781 metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark);
782 }
783 LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish),
784 metadata.syncMark);
785 return SaveMetaDataValue(deviceId, metadata);
786 }
787
IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)788 bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark)
789 {
790 MetaDataValue metadata;
791 std::lock_guard<std::mutex> lockGuard(metadataLock_);
792 GetMetaDataValue(deviceId, metadata, true);
793 auto mark = static_cast<uint64_t>(syncMark);
794 return (metadata.syncMark & mark) == mark;
795 }
796
SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)797 int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion)
798 {
799 MetaDataValue metadata;
800 std::lock_guard<std::mutex> lockGuard(metadataLock_);
801 GetMetaDataValue(deviceId, metadata, true);
802 metadata.remoteSchemaVersion = schemaVersion;
803 LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion);
804 int errCode = SaveMetaDataValue(deviceId, metadata);
805 if (errCode != E_OK) {
806 LOGW("[Metadata] Set remote schema version failed");
807 }
808 return errCode;
809 }
810
GetRemoteSchemaVersion(const std::string & deviceId)811 uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId)
812 {
813 MetaDataValue metadata;
814 std::lock_guard<std::mutex> lockGuard(metadataLock_);
815 GetMetaDataValue(deviceId, metadata, true);
816 LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion);
817 return metadata.remoteSchemaVersion;
818 }
819
SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)820 int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset)
821 {
822 MetaDataValue metadata;
823 std::lock_guard<std::mutex> lockGuard(metadataLock_);
824 GetMetaDataValue(deviceId, metadata, true);
825 metadata.systemTimeOffset = systemTimeOffset;
826 LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset);
827 return SaveMetaDataValue(deviceId, metadata);
828 }
829
GetSystemTimeOffset(const std::string & deviceId)830 int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId)
831 {
832 MetaDataValue metadata;
833 std::lock_guard<std::mutex> lockGuard(metadataLock_);
834 GetMetaDataValue(deviceId, metadata, true);
835 LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset);
836 return metadata.systemTimeOffset;
837 }
838
GetLocalSchemaVersion()839 std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion()
840 {
841 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
842 auto [errCode, localMetaData] = GetLocalMetaData();
843 if (errCode != E_OK) {
844 return {errCode, 0};
845 }
846 LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion);
847 return {errCode, localMetaData.localSchemaVersion};
848 }
849
SetLocalSchemaVersion(uint64_t schemaVersion)850 int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion)
851 {
852 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
853 auto [errCode, localMetaData] = GetLocalMetaData();
854 if (errCode != E_OK) {
855 return errCode;
856 }
857 localMetaData.localSchemaVersion = schemaVersion;
858 LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion);
859 return SaveLocalMetaData(localMetaData);
860 }
861
SaveLocalMetaData(const LocalMetaData & localMetaData)862 int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData)
863 {
864 auto [errCode, value] = SerializeLocalMetaData(localMetaData);
865 if (errCode != E_OK) {
866 LOGE("[Metadata] Serialize local meta data failed %d", errCode);
867 return errCode;
868 }
869 std::string k(LOCAL_META_DATA_KEY);
870 Key key(k.begin(), k.end());
871 return SetMetadataToDb(key, value);
872 }
873
GetLocalMetaData()874 std::pair<int, LocalMetaData> Metadata::GetLocalMetaData()
875 {
876 std::string k(LOCAL_META_DATA_KEY);
877 Key key(k.begin(), k.end());
878 Value value;
879 int errCode = GetMetadataFromDb(key, value);
880 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
881 return {errCode, {}};
882 }
883 return DeSerializeLocalMetaData(value);
884 }
885
SerializeLocalMetaData(const LocalMetaData & localMetaData)886 std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData)
887 {
888 std::pair<int, Value> res = {E_OK, {}};
889 auto &[errCode, value] = res;
890 value.resize(CalculateLocalMetaDataLength());
891 Parcel parcel(value.data(), value.size());
892 (void)parcel.WriteUInt32(localMetaData.version);
893 parcel.EightByteAlign();
894 (void)parcel.WriteUInt64(localMetaData.localSchemaVersion);
895 if (parcel.IsError()) {
896 LOGE("[Metadata] Serialize localMetaData failed");
897 errCode = -E_SERIALIZE_ERROR;
898 value.clear();
899 return res;
900 }
901 return res;
902 }
903
DeSerializeLocalMetaData(const Value & value)904 std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value)
905 {
906 std::pair<int, LocalMetaData> res;
907 auto &[errCode, meta] = res;
908 if (value.empty()) {
909 errCode = E_OK;
910 return res;
911 }
912 Parcel parcel(const_cast<uint8_t *>(value.data()), value.size());
913 parcel.ReadUInt32(meta.version);
914 if (meta.version >= LOCAL_META_DATA_VERSION_V2) {
915 parcel.EightByteAlign();
916 }
917 parcel.ReadUInt64(meta.localSchemaVersion);
918 if (parcel.IsError()) {
919 LOGE("[Metadata] DeSerialize localMetaData failed");
920 errCode = -E_SERIALIZE_ERROR;
921 return res;
922 }
923 return res;
924 }
925
CalculateLocalMetaDataLength()926 uint64_t Metadata::CalculateLocalMetaDataLength()
927 {
928 uint64_t length = Parcel::GetUInt32Len(); // version
929 length = Parcel::GetEightByteAlign(length);
930 length += Parcel::GetUInt64Len(); // local schema version
931 return length;
932 }
933
MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)934 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)
935 : metadataPtr_(std::move(metadata))
936 {
937 if (metadataPtr_ != nullptr) {
938 metadataPtr_->LockWaterMark();
939 }
940 }
941
~MetaWaterMarkAutoLock()942 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock()
943 {
944 if (metadataPtr_ != nullptr) {
945 metadataPtr_->UnlockWaterMark();
946 }
947 }
948
InitLocalMetaData()949 int Metadata::InitLocalMetaData()
950 {
951 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_);
952 auto [errCode, localMetaData] = GetLocalMetaData();
953 if (errCode != E_OK) {
954 return errCode;
955 }
956 if (localMetaData.localSchemaVersion != 0) {
957 return E_OK;
958 }
959 uint64_t curTime = 0u;
960 errCode = OS::GetCurrentSysTimeInMicrosecond(curTime);
961 if (errCode != E_OK) {
962 LOGW("[Metadata] get system time failed when init schema version!");
963 localMetaData.localSchemaVersion += 1;
964 } else {
965 localMetaData.localSchemaVersion = curTime;
966 }
967 errCode = SaveLocalMetaData(localMetaData);
968 if (errCode != E_OK) {
969 LOGE("[Metadata] init local schema version failed:%d", errCode);
970 }
971 return errCode;
972 }
973 } // namespace DistributedDB