1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "meta_data.h" 17 18 #include <openssl/rand.h> 19 20 #include "db_common.h" 21 #include "db_constant.h" 22 #include "db_errno.h" 23 #include "hash.h" 24 #include "log_print.h" 25 #include "platform_specific.h" 26 #include "securec.h" 27 #include "sync_types.h" 28 #include "time_helper.h" 29 #include "version.h" 30 31 namespace DistributedDB { 32 namespace { 33 // store local timeoffset;this is a special key; 34 constexpr const char *CLIENT_ID_PREFIX_KEY = "clientId"; 35 constexpr const char *LOCAL_META_DATA_KEY = "localMetaData"; 36 } 37 Metadata()38 Metadata::Metadata() 39 : localTimeOffset_(0), 40 naturalStoragePtr_(nullptr), 41 lastLocalTime_(0) 42 {} 43 ~Metadata()44 Metadata::~Metadata() 45 { 46 naturalStoragePtr_ = nullptr; 47 metadataMap_.clear(); 48 } 49 Initialize(ISyncInterface * storage)50 int Metadata::Initialize(ISyncInterface* storage) 51 { 52 naturalStoragePtr_ = storage; 53 std::vector<uint8_t> key; 54 std::vector<uint8_t> timeOffset; 55 DBCommon::StringToVector(std::string(DBConstant::LOCALTIME_OFFSET_KEY), key); 56 57 int errCode = GetMetadataFromDb(key, timeOffset); 58 if (errCode == -E_NOT_FOUND) { 59 int err = SaveLocalTimeOffset(TimeHelper::BASE_OFFSET); 60 if (err != E_OK) { 61 LOGD("[Metadata][Initialize]SaveLocalTimeOffset failed errCode:%d", err); 62 return err; 63 } 64 } else if (errCode == E_OK) { 65 localTimeOffset_ = StringToLong(timeOffset); 66 } else { 67 LOGE("Metadata::Initialize get meatadata from db failed,err=%d", errCode); 68 return errCode; 69 } 70 { 71 std::lock_guard<std::mutex> lockGuard(metadataLock_); 72 metadataMap_.clear(); 73 } 74 (void)querySyncWaterMarkHelper_.Initialize(storage); 75 return LoadAllMetadata(); 76 } 77 SaveTimeOffset(const DeviceID & deviceId,TimeOffset inValue)78 int Metadata::SaveTimeOffset(const DeviceID &deviceId, TimeOffset inValue) 79 { 80 MetaDataValue metadata; 81 std::lock_guard<std::mutex> lockGuard(metadataLock_); 82 GetMetaDataValue(deviceId, metadata, true); 83 metadata.timeOffset = inValue; 84 metadata.lastUpdateTime = TimeHelper::GetSysCurrentTime(); 85 LOGD("Metadata::SaveTimeOffset = %" PRId64 " dev %s", inValue, STR_MASK(deviceId)); 86 return SaveMetaDataValue(deviceId, metadata); 87 } 88 GetTimeOffset(const DeviceID & deviceId,TimeOffset & outValue)89 void Metadata::GetTimeOffset(const DeviceID &deviceId, TimeOffset &outValue) 90 { 91 MetaDataValue metadata; 92 std::lock_guard<std::mutex> lockGuard(metadataLock_); 93 GetMetaDataValue(deviceId, metadata, true); 94 outValue = metadata.timeOffset; 95 } 96 GetLocalWaterMark(const DeviceID & deviceId,uint64_t & outValue)97 void Metadata::GetLocalWaterMark(const DeviceID &deviceId, uint64_t &outValue) 98 { 99 MetaDataValue metadata; 100 std::lock_guard<std::mutex> lockGuard(metadataLock_); 101 GetMetaDataValue(deviceId, metadata, true); 102 outValue = metadata.localWaterMark; 103 } 104 SaveLocalWaterMark(const DeviceID & deviceId,uint64_t inValue)105 int Metadata::SaveLocalWaterMark(const DeviceID &deviceId, uint64_t inValue) 106 { 107 MetaDataValue metadata; 108 std::lock_guard<std::mutex> lockGuard(metadataLock_); 109 GetMetaDataValue(deviceId, metadata, true); 110 metadata.localWaterMark = inValue; 111 LOGD("Metadata::SaveLocalWaterMark = %" PRIu64, inValue); 112 return SaveMetaDataValue(deviceId, metadata); 113 } 114 GetPeerWaterMark(const DeviceID & deviceId,uint64_t & outValue,bool isNeedHash)115 void Metadata::GetPeerWaterMark(const DeviceID &deviceId, uint64_t &outValue, bool isNeedHash) 116 { 117 MetaDataValue metadata; 118 std::lock_guard<std::mutex> lockGuard(metadataLock_); 119 GetMetaDataValue(deviceId, metadata, isNeedHash); 120 outValue = metadata.peerWaterMark; 121 } 122 SavePeerWaterMark(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)123 int Metadata::SavePeerWaterMark(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash) 124 { 125 MetaDataValue metadata; 126 std::lock_guard<std::mutex> lockGuard(metadataLock_); 127 GetMetaDataValue(deviceId, metadata, isNeedHash); 128 metadata.peerWaterMark = inValue; 129 LOGD("Metadata::SavePeerWaterMark = %" PRIu64, inValue); 130 return SaveMetaDataValue(deviceId, metadata, isNeedHash); 131 } 132 SaveLocalTimeOffset(TimeOffset timeOffset)133 int Metadata::SaveLocalTimeOffset(TimeOffset timeOffset) 134 { 135 std::string timeOffsetString = std::to_string(timeOffset); 136 std::vector<uint8_t> timeOffsetValue(timeOffsetString.begin(), timeOffsetString.end()); 137 std::string keyStr(DBConstant::LOCALTIME_OFFSET_KEY); 138 std::vector<uint8_t> localTimeOffsetValue(keyStr.begin(), keyStr.end()); 139 140 std::lock_guard<std::mutex> lockGuard(localTimeOffsetLock_); 141 localTimeOffset_ = timeOffset; 142 LOGD("Metadata::SaveLocalTimeOffset offset = %" PRId64, timeOffset); 143 int errCode = SetMetadataToDb(localTimeOffsetValue, timeOffsetValue); 144 if (errCode != E_OK) { 145 LOGE("Metadata::SaveLocalTimeOffset SetMetadataToDb failed errCode:%d", errCode); 146 } 147 return errCode; 148 } 149 GetLocalTimeOffset() const150 TimeOffset Metadata::GetLocalTimeOffset() const 151 { 152 TimeOffset localTimeOffset = localTimeOffset_.load(std::memory_order_seq_cst); 153 return localTimeOffset; 154 } 155 EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)156 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash) 157 { 158 return EraseDeviceWaterMark(deviceId, isNeedHash, ""); 159 } 160 EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash,const std::string & tableName)161 int Metadata::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash, const std::string &tableName) 162 { 163 // reload meta data again 164 (void)LoadAllMetadata(); 165 std::lock_guard<std::recursive_mutex> autoLock(waterMarkMutex_); 166 // try to erase all the waterMark 167 // erase deleteSync recv waterMark 168 WaterMark waterMark = 0; 169 int errCodeDeleteSync = SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash); 170 if (errCodeDeleteSync != E_OK) { 171 LOGE("[Metadata] erase deleteWaterMark failed errCode:%d", errCodeDeleteSync); 172 return errCodeDeleteSync; 173 } 174 // erase querySync recv waterMark 175 int errCodeQuerySync = ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash); 176 if (errCodeQuerySync != E_OK) { 177 LOGE("[Metadata] erase queryWaterMark failed errCode:%d", errCodeQuerySync); 178 return errCodeQuerySync; 179 } 180 // peerWaterMark must be erased at last 181 int errCode = SavePeerWaterMark(deviceId, 0, isNeedHash); 182 if (errCode != E_OK) { 183 LOGE("[Metadata] erase peerWaterMark failed errCode:%d", errCode); 184 return errCode; 185 } 186 return E_OK; 187 } 188 SetLastLocalTime(Timestamp lastLocalTime)189 void Metadata::SetLastLocalTime(Timestamp lastLocalTime) 190 { 191 std::lock_guard<std::mutex> lock(lastLocalTimeLock_); 192 if (lastLocalTime > lastLocalTime_) { 193 lastLocalTime_ = lastLocalTime; 194 } 195 } 196 GetLastLocalTime() const197 Timestamp Metadata::GetLastLocalTime() const 198 { 199 std::lock_guard<std::mutex> lock(lastLocalTimeLock_); 200 return lastLocalTime_; 201 } 202 SaveMetaDataValue(const DeviceID & deviceId,const MetaDataValue & inValue,bool isNeedHash)203 int Metadata::SaveMetaDataValue(const DeviceID &deviceId, const MetaDataValue &inValue, bool isNeedHash) 204 { 205 std::vector<uint8_t> value; 206 int errCode = SerializeMetaData(inValue, value); 207 if (errCode != E_OK) { 208 return errCode; 209 } 210 211 DeviceID hashDeviceId; 212 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash); 213 std::vector<uint8_t> key; 214 DBCommon::StringToVector(hashDeviceId, key); 215 errCode = SetMetadataToDb(key, value); 216 if (errCode != E_OK) { 217 LOGE("Metadata::SetMetadataToDb failed errCode:%d", errCode); 218 return errCode; 219 } 220 PutMetadataToMap(hashDeviceId, inValue); 221 return E_OK; 222 } 223 GetMetaDataValue(const DeviceID & deviceId,MetaDataValue & outValue,bool isNeedHash)224 void Metadata::GetMetaDataValue(const DeviceID &deviceId, MetaDataValue &outValue, bool isNeedHash) 225 { 226 DeviceID hashDeviceId; 227 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash); 228 GetMetadataFromMap(hashDeviceId, outValue); 229 } 230 SerializeMetaData(const MetaDataValue & inValue,std::vector<uint8_t> & outValue)231 int Metadata::SerializeMetaData(const MetaDataValue &inValue, std::vector<uint8_t> &outValue) 232 { 233 outValue.resize(sizeof(MetaDataValue)); 234 errno_t err = memcpy_s(outValue.data(), outValue.size(), &inValue, sizeof(MetaDataValue)); 235 if (err != EOK) { 236 return -E_SECUREC_ERROR; 237 } 238 return E_OK; 239 } 240 DeSerializeMetaData(const std::vector<uint8_t> & inValue,MetaDataValue & outValue)241 int Metadata::DeSerializeMetaData(const std::vector<uint8_t> &inValue, MetaDataValue &outValue) 242 { 243 if (inValue.empty()) { 244 return -E_INVALID_ARGS; 245 } 246 errno_t err = memcpy_s(&outValue, sizeof(MetaDataValue), inValue.data(), inValue.size()); 247 if (err != EOK) { 248 return -E_SECUREC_ERROR; 249 } 250 return E_OK; 251 } 252 GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue) const253 int Metadata::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue) const 254 { 255 if (naturalStoragePtr_ == nullptr) { 256 return -E_INVALID_DB; 257 } 258 return naturalStoragePtr_->GetMetaData(key, outValue); 259 } 260 SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)261 int Metadata::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue) 262 { 263 if (naturalStoragePtr_ == nullptr) { 264 return -E_INVALID_DB; 265 } 266 return naturalStoragePtr_->PutMetaData(key, inValue, false); 267 } 268 PutMetadataToMap(const DeviceID & deviceId,const MetaDataValue & value)269 void Metadata::PutMetadataToMap(const DeviceID &deviceId, const MetaDataValue &value) 270 { 271 metadataMap_[deviceId] = value; 272 } 273 GetMetadataFromMap(const DeviceID & deviceId,MetaDataValue & outValue)274 void Metadata::GetMetadataFromMap(const DeviceID &deviceId, MetaDataValue &outValue) 275 { 276 if (metadataMap_.find(deviceId) == metadataMap_.end() && RuntimeContext::GetInstance()->IsTimeChanged()) { 277 metadataMap_[deviceId].syncMark = static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE); 278 } 279 outValue = metadataMap_[deviceId]; 280 } 281 StringToLong(const std::vector<uint8_t> & value) const282 int64_t Metadata::StringToLong(const std::vector<uint8_t> &value) const 283 { 284 std::string valueString(value.begin(), value.end()); 285 int64_t longData = std::strtoll(valueString.c_str(), nullptr, DBConstant::STR_TO_LL_BY_DEVALUE); 286 LOGD("Metadata::StringToLong longData = %" PRId64, longData); 287 return longData; 288 } 289 GetAllMetadataKey(std::vector<std::vector<uint8_t>> & keys)290 int Metadata::GetAllMetadataKey(std::vector<std::vector<uint8_t>> &keys) 291 { 292 if (naturalStoragePtr_ == nullptr) { 293 return -E_INVALID_DB; 294 } 295 return naturalStoragePtr_->GetAllMetaKeys(keys); 296 } 297 298 namespace { IsMetaDataKey(const Key & inKey,const std::string & expectPrefix)299 bool IsMetaDataKey(const Key &inKey, const std::string &expectPrefix) 300 { 301 if (inKey.size() < expectPrefix.size()) { 302 return false; 303 } 304 std::string prefixInKey(inKey.begin(), inKey.begin() + expectPrefix.size()); 305 if (prefixInKey != expectPrefix) { 306 return false; 307 } 308 return true; 309 } 310 } 311 LoadAllMetadata()312 int Metadata::LoadAllMetadata() 313 { 314 std::vector<std::vector<uint8_t>> metaDataKeys; 315 int errCode = GetAllMetadataKey(metaDataKeys); 316 if (errCode != E_OK) { 317 LOGE("[Metadata] get all metadata key failed err=%d", errCode); 318 return errCode; 319 } 320 321 std::vector<std::vector<uint8_t>> querySyncIds; 322 for (const auto &deviceId : metaDataKeys) { 323 if (IsMetaDataKey(deviceId, DBConstant::DEVICEID_PREFIX_KEY)) { 324 errCode = LoadDeviceIdDataToMap(deviceId); 325 if (errCode != E_OK) { 326 return errCode; 327 } 328 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey())) { 329 querySyncIds.push_back(deviceId); 330 } else if (IsMetaDataKey(deviceId, QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey())) { 331 errCode = querySyncWaterMarkHelper_.LoadDeleteSyncDataToCache(deviceId); 332 if (errCode != E_OK) { 333 return errCode; 334 } 335 } 336 } 337 errCode = querySyncWaterMarkHelper_.RemoveLeastUsedQuerySyncItems(querySyncIds); 338 if (errCode != E_OK) { 339 return errCode; 340 } 341 return InitLocalMetaData(); 342 } 343 LoadDeviceIdDataToMap(const Key & key)344 int Metadata::LoadDeviceIdDataToMap(const Key &key) 345 { 346 std::vector<uint8_t> value; 347 int errCode = GetMetadataFromDb(key, value); 348 if (errCode != E_OK) { 349 return errCode; 350 } 351 MetaDataValue metaValue; 352 std::string metaKey(key.begin(), key.end()); 353 errCode = DeSerializeMetaData(value, metaValue); 354 if (errCode != E_OK) { 355 return errCode; 356 } 357 std::lock_guard<std::mutex> lockGuard(metadataLock_); 358 PutMetadataToMap(metaKey, metaValue); 359 return errCode; 360 } 361 GetHashDeviceId(const DeviceID & deviceId,DeviceID & hashDeviceId,bool isNeedHash)362 void Metadata::GetHashDeviceId(const DeviceID &deviceId, DeviceID &hashDeviceId, bool isNeedHash) 363 { 364 if (!isNeedHash) { 365 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + deviceId; 366 return; 367 } 368 if (deviceIdToHashDeviceIdMap_.count(deviceId) == 0) { 369 hashDeviceId = DBConstant::DEVICEID_PREFIX_KEY + DBCommon::TransferHashString(deviceId); 370 deviceIdToHashDeviceIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeviceId)); 371 } else { 372 hashDeviceId = deviceIdToHashDeviceIdMap_[deviceId]; 373 } 374 } 375 GetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark)376 int Metadata::GetRecvQueryWaterMark(const std::string &queryIdentify, 377 const std::string &deviceId, WaterMark &waterMark) 378 { 379 QueryWaterMark queryWaterMark; 380 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark); 381 if (errCode != E_OK) { 382 return errCode; 383 } 384 WaterMark peerWaterMark; 385 GetPeerWaterMark(deviceId, peerWaterMark); 386 waterMark = std::max(queryWaterMark.recvWaterMark, peerWaterMark); 387 return E_OK; 388 } 389 SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)390 int Metadata::SetRecvQueryWaterMark(const std::string &queryIdentify, 391 const std::string &deviceId, const WaterMark &waterMark) 392 { 393 return querySyncWaterMarkHelper_.SetRecvQueryWaterMark(queryIdentify, deviceId, waterMark); 394 } 395 GetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,WaterMark & waterMark,bool isAutoLift)396 int Metadata::GetSendQueryWaterMark(const std::string &queryIdentify, 397 const std::string &deviceId, WaterMark &waterMark, bool isAutoLift) 398 { 399 QueryWaterMark queryWaterMark; 400 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark); 401 if (errCode != E_OK) { 402 return errCode; 403 } 404 if (isAutoLift) { 405 WaterMark localWaterMark; 406 GetLocalWaterMark(deviceId, localWaterMark); 407 waterMark = std::max(queryWaterMark.sendWaterMark, localWaterMark); 408 } else { 409 waterMark = queryWaterMark.sendWaterMark; 410 } 411 return E_OK; 412 } 413 SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)414 int Metadata::SetSendQueryWaterMark(const std::string &queryIdentify, 415 const std::string &deviceId, const WaterMark &waterMark) 416 { 417 return querySyncWaterMarkHelper_.SetSendQueryWaterMark(queryIdentify, deviceId, waterMark); 418 } 419 GetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,Timestamp & timestamp)420 int Metadata::GetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, Timestamp ×tamp) 421 { 422 QueryWaterMark queryWaterMark; 423 int errCode = querySyncWaterMarkHelper_.GetQueryWaterMark(queryIdentify, deviceId, queryWaterMark); 424 if (errCode != E_OK) { 425 return errCode; 426 } 427 timestamp = queryWaterMark.lastQueryTime; 428 return E_OK; 429 } 430 SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)431 int Metadata::SetLastQueryTime(const std::string &queryIdentify, const std::string &deviceId, 432 const Timestamp ×tamp) 433 { 434 return querySyncWaterMarkHelper_.SetLastQueryTime(queryIdentify, deviceId, timestamp); 435 } 436 GetSendDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark,bool isAutoLift)437 int Metadata::GetSendDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark, bool isAutoLift) 438 { 439 DeleteWaterMark deleteWaterMark; 440 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark); 441 if (errCode != E_OK) { 442 return errCode; 443 } 444 if (isAutoLift) { 445 WaterMark localWaterMark; 446 GetLocalWaterMark(deviceId, localWaterMark); 447 waterMark = std::max(deleteWaterMark.sendWaterMark, localWaterMark); 448 } else { 449 waterMark = deleteWaterMark.sendWaterMark; 450 } 451 return E_OK; 452 } 453 SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)454 int Metadata::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark) 455 { 456 return querySyncWaterMarkHelper_.SetSendDeleteSyncWaterMark(deviceId, waterMark); 457 } 458 GetRecvDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)459 int Metadata::GetRecvDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark) 460 { 461 DeleteWaterMark deleteWaterMark; 462 int errCode = querySyncWaterMarkHelper_.GetDeleteSyncWaterMark(deviceId, deleteWaterMark); 463 if (errCode != E_OK) { 464 return errCode; 465 } 466 WaterMark peerWaterMark; 467 GetPeerWaterMark(deviceId, peerWaterMark); 468 waterMark = std::max(deleteWaterMark.recvWaterMark, peerWaterMark); 469 return E_OK; 470 } 471 SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)472 int Metadata::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark, bool isNeedHash) 473 { 474 return querySyncWaterMarkHelper_.SetRecvDeleteSyncWaterMark(deviceId, waterMark, isNeedHash); 475 } 476 ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)477 int Metadata::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash) 478 { 479 return querySyncWaterMarkHelper_.ResetRecvQueryWaterMark(deviceId, tableName, isNeedHash); 480 } 481 GetDbCreateTime(const DeviceID & deviceId,uint64_t & outValue)482 void Metadata::GetDbCreateTime(const DeviceID &deviceId, uint64_t &outValue) 483 { 484 MetaDataValue metadata; 485 std::lock_guard<std::mutex> lockGuard(metadataLock_); 486 DeviceID hashDeviceId; 487 GetHashDeviceId(deviceId, hashDeviceId, true); 488 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) { 489 metadata = metadataMap_[hashDeviceId]; 490 outValue = metadata.dbCreateTime; 491 return; 492 } 493 outValue = 0; 494 LOGI("Metadata::GetDbCreateTime, not found dev = %s dbCreateTime", STR_MASK(deviceId)); 495 } 496 SetDbCreateTime(const DeviceID & deviceId,uint64_t inValue,bool isNeedHash)497 int Metadata::SetDbCreateTime(const DeviceID &deviceId, uint64_t inValue, bool isNeedHash) 498 { 499 MetaDataValue metadata; 500 std::lock_guard<std::mutex> lockGuard(metadataLock_); 501 DeviceID hashDeviceId; 502 GetHashDeviceId(deviceId, hashDeviceId, isNeedHash); 503 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) { 504 metadata = metadataMap_[hashDeviceId]; 505 if (metadata.dbCreateTime != 0 && metadata.dbCreateTime != inValue) { 506 metadata.clearDeviceDataMark = REMOVE_DEVICE_DATA_MARK; 507 LOGI("Metadata::SetDbCreateTime,set clear data mark,dev=%s,dbCreateTime=%" PRIu64, 508 STR_MASK(deviceId), inValue); 509 } 510 if (metadata.dbCreateTime == 0) { 511 LOGI("Metadata::SetDbCreateTime,update dev=%s,dbCreateTime=%" PRIu64, STR_MASK(deviceId), inValue); 512 } 513 } 514 metadata.dbCreateTime = inValue; 515 return SaveMetaDataValue(deviceId, metadata); 516 } 517 ResetMetaDataAfterRemoveData(const DeviceID & deviceId)518 int Metadata::ResetMetaDataAfterRemoveData(const DeviceID &deviceId) 519 { 520 MetaDataValue metadata; 521 std::lock_guard<std::mutex> lockGuard(metadataLock_); 522 DeviceID hashDeviceId; 523 GetHashDeviceId(deviceId, hashDeviceId, true); 524 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) { 525 metadata = metadataMap_[hashDeviceId]; 526 metadata.clearDeviceDataMark = 0; // clear mark 527 return SaveMetaDataValue(deviceId, metadata); 528 } 529 return -E_NOT_FOUND; 530 } 531 GetRemoveDataMark(const DeviceID & deviceId,uint64_t & outValue)532 void Metadata::GetRemoveDataMark(const DeviceID &deviceId, uint64_t &outValue) 533 { 534 MetaDataValue metadata; 535 std::lock_guard<std::mutex> lockGuard(metadataLock_); 536 DeviceID hashDeviceId; 537 GetHashDeviceId(deviceId, hashDeviceId, true); 538 if (metadataMap_.find(hashDeviceId) != metadataMap_.end()) { 539 metadata = metadataMap_[hashDeviceId]; 540 outValue = metadata.clearDeviceDataMark; 541 return; 542 } 543 outValue = 0; 544 } 545 GetQueryLastTimestamp(const DeviceID & deviceId,const std::string & queryId) const546 uint64_t Metadata::GetQueryLastTimestamp(const DeviceID &deviceId, const std::string &queryId) const 547 { 548 std::vector<uint8_t> key; 549 std::vector<uint8_t> value; 550 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId); 551 DBCommon::StringToVector(hashqueryId, key); 552 int errCode = GetMetadataFromDb(key, value); 553 std::lock_guard<std::mutex> lockGuard(metadataLock_); 554 if (errCode == -E_NOT_FOUND) { 555 auto iter = queryIdMap_.find(deviceId); 556 if (iter != queryIdMap_.end()) { 557 if (iter->second.find(hashqueryId) == iter->second.end()) { 558 iter->second.insert(hashqueryId); 559 return INT64_MAX; 560 } 561 return 0; 562 } else { 563 queryIdMap_[deviceId] = { hashqueryId }; 564 return INT64_MAX; 565 } 566 } 567 auto iter = queryIdMap_.find(deviceId); 568 // while value is found in db, it can be found in db later when db is not closed 569 // so no need to record the hashqueryId in map 570 if (errCode == E_OK && iter != queryIdMap_.end()) { 571 iter->second.erase(hashqueryId); 572 } 573 return StringToLong(value); 574 } 575 RemoveQueryFromRecordSet(const DeviceID & deviceId,const std::string & queryId)576 void Metadata::RemoveQueryFromRecordSet(const DeviceID &deviceId, const std::string &queryId) 577 { 578 std::lock_guard<std::mutex> lockGuard(metadataLock_); 579 std::string hashqueryId = DBConstant::SUBSCRIBE_QUERY_PREFIX + DBCommon::TransferHashString(queryId); 580 auto iter = queryIdMap_.find(deviceId); 581 if (iter != queryIdMap_.end() && iter->second.find(hashqueryId) != iter->second.end()) { 582 iter->second.erase(hashqueryId); 583 } 584 } 585 SaveClientId(const std::string & deviceId,const std::string & clientId)586 int Metadata::SaveClientId(const std::string &deviceId, const std::string &clientId) 587 { 588 { 589 // already save in cache 590 std::lock_guard<std::mutex> autoLock(clientIdLock_); 591 if (clientIdCache_[deviceId] == clientId) { 592 return E_OK; 593 } 594 } 595 std::string keyStr; 596 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId); 597 std::string valueStr = DBCommon::TransferHashString(deviceId); 598 Key key; 599 DBCommon::StringToVector(keyStr, key); 600 Value value; 601 DBCommon::StringToVector(valueStr, value); 602 int errCode = SetMetadataToDb(key, value); 603 if (errCode != E_OK) { 604 return errCode; 605 } 606 std::lock_guard<std::mutex> autoLock(clientIdLock_); 607 clientIdCache_[deviceId] = clientId; 608 return E_OK; 609 } 610 GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const611 int Metadata::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const 612 { 613 // don't use cache here avoid invalid cache 614 std::string keyStr; 615 keyStr.append(CLIENT_ID_PREFIX_KEY).append(clientId); 616 Key key; 617 DBCommon::StringToVector(keyStr, key); 618 Value value; 619 int errCode = GetMetadataFromDb(key, value); 620 if (errCode == -E_NOT_FOUND) { 621 LOGD("[Metadata] not found clientId by %.3s", clientId.c_str()); 622 return -E_NOT_SUPPORT; 623 } 624 if (errCode != E_OK) { 625 LOGE("[Metadata] reload clientId failed %d", errCode); 626 return errCode; 627 } 628 DBCommon::VectorToString(value, hashDevId); 629 return E_OK; 630 } 631 LockWaterMark() const632 void Metadata::LockWaterMark() const 633 { 634 waterMarkMutex_.lock(); 635 } 636 UnlockWaterMark() const637 void Metadata::UnlockWaterMark() const 638 { 639 waterMarkMutex_.unlock(); 640 } 641 GetWaterMarkInfoFromDB(const std::string & dev,bool isNeedHash,WatermarkInfo & info)642 int Metadata::GetWaterMarkInfoFromDB(const std::string &dev, bool isNeedHash, WatermarkInfo &info) 643 { 644 Key devKey; 645 DeviceID hashDeviceId; 646 { 647 std::lock_guard<std::mutex> lockGuard(metadataLock_); 648 GetHashDeviceId(dev, hashDeviceId, isNeedHash); 649 DBCommon::StringToVector(hashDeviceId, devKey); 650 } 651 // read from db avoid watermark update in diff process 652 int errCode = LoadDeviceIdDataToMap(devKey); 653 if (errCode == -E_NOT_FOUND) { 654 LOGD("[Metadata] not found meta value"); 655 return E_OK; 656 } 657 if (errCode != E_OK) { 658 LOGE("[Metadata] reload meta value failed %d", errCode); 659 return errCode; 660 } 661 { 662 std::lock_guard<std::mutex> lockGuard(metadataLock_); 663 MetaDataValue metadata; 664 GetMetadataFromMap(hashDeviceId, metadata); 665 info.sendMark = metadata.localWaterMark; 666 info.receiveMark = metadata.peerWaterMark; 667 } 668 return E_OK; 669 } 670 ClearAllAbilitySyncFinishMark()671 int Metadata::ClearAllAbilitySyncFinishMark() 672 { 673 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK) | 674 static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION)); 675 } 676 ClearAllTimeSyncFinishMark()677 int Metadata::ClearAllTimeSyncFinishMark() 678 { 679 return ClearAllMetaDataValue(static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK) | 680 static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET) | 681 static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK)); 682 } 683 ClearAllMetaDataValue(uint32_t innerClearAction)684 int Metadata::ClearAllMetaDataValue(uint32_t innerClearAction) 685 { 686 int errCode = E_OK; 687 std::lock_guard<std::mutex> lockGuard(metadataLock_); 688 for (auto &[hashDev, metaDataValue] : metadataMap_) { 689 ClearMetaDataValue(innerClearAction, metaDataValue); 690 std::vector<uint8_t> value; 691 int innerErrCode = SerializeMetaData(metaDataValue, value); 692 // clear sync mark without transaction here 693 // just ignore error metadata value 694 if (innerErrCode != E_OK) { 695 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " serialize meta data failed %d", innerClearAction, 696 innerErrCode); 697 errCode = errCode == E_OK ? innerErrCode : errCode; 698 continue; 699 } 700 701 std::vector<uint8_t> key; 702 DBCommon::StringToVector(hashDev, key); 703 innerErrCode = SetMetadataToDb(key, value); 704 if (innerErrCode != E_OK) { 705 LOGW("[Metadata][ClearAllMetaDataValue] action %" PRIu32 " save meta data failed %d", innerClearAction, 706 innerErrCode); 707 errCode = errCode == E_OK ? innerErrCode : errCode; 708 } 709 } 710 if (errCode == E_OK) { 711 LOGD("[Metadata][ClearAllMetaDataValue] success clear action %" PRIu32, innerClearAction); 712 } 713 return errCode; 714 } 715 ClearMetaDataValue(uint32_t innerClearAction,MetaDataValue & metaDataValue)716 void Metadata::ClearMetaDataValue(uint32_t innerClearAction, MetaDataValue &metaDataValue) 717 { 718 auto mark = static_cast<uint32_t>(MetaValueAction::CLEAR_ABILITY_SYNC_MARK); 719 if ((innerClearAction & mark) == mark) { 720 metaDataValue.syncMark = 721 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_ABILITY_SYNC)); 722 } 723 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_TIME_SYNC_MARK); 724 if ((innerClearAction & mark) == mark) { 725 metaDataValue.syncMark = 726 DBCommon::EraseBit(metaDataValue.syncMark, static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_SYNC)); 727 } 728 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_REMOTE_SCHEMA_VERSION); 729 if ((innerClearAction & mark) == mark) { 730 metaDataValue.remoteSchemaVersion = 0; 731 } 732 mark = static_cast<uint32_t>(MetaValueAction::CLEAR_SYSTEM_TIME_OFFSET); 733 if ((innerClearAction & mark) == mark) { 734 metaDataValue.systemTimeOffset = 0; 735 } 736 mark = static_cast<uint32_t>(MetaValueAction::SET_TIME_CHANGE_MARK); 737 if ((innerClearAction & mark) == mark) { 738 metaDataValue.syncMark |= static_cast<uint64_t>(SyncMark::SYNC_MARK_TIME_CHANGE); 739 } 740 } 741 SetAbilitySyncFinishMark(const std::string & deviceId,bool finish)742 int Metadata::SetAbilitySyncFinishMark(const std::string &deviceId, bool finish) 743 { 744 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC, finish); 745 } 746 IsAbilitySyncFinish(const std::string & deviceId)747 bool Metadata::IsAbilitySyncFinish(const std::string &deviceId) 748 { 749 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_ABILITY_SYNC); 750 } 751 SetTimeSyncFinishMark(const std::string & deviceId,bool finish)752 int Metadata::SetTimeSyncFinishMark(const std::string &deviceId, bool finish) 753 { 754 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC, finish); 755 } 756 SetTimeChangeMark(const std::string & deviceId,bool change)757 int Metadata::SetTimeChangeMark(const std::string &deviceId, bool change) 758 { 759 return SetSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE, change); 760 } 761 IsTimeSyncFinish(const std::string & deviceId)762 bool Metadata::IsTimeSyncFinish(const std::string &deviceId) 763 { 764 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_SYNC); 765 } 766 IsTimeChange(const std::string & deviceId)767 bool Metadata::IsTimeChange(const std::string &deviceId) 768 { 769 return IsContainSyncMark(deviceId, SyncMark::SYNC_MARK_TIME_CHANGE); 770 } 771 SetSyncMark(const std::string & deviceId,SyncMark syncMark,bool finish)772 int Metadata::SetSyncMark(const std::string &deviceId, SyncMark syncMark, bool finish) 773 { 774 MetaDataValue metadata; 775 std::lock_guard<std::mutex> lockGuard(metadataLock_); 776 GetMetaDataValue(deviceId, metadata, true); 777 auto mark = static_cast<uint64_t>(syncMark); 778 if (finish) { 779 metadata.syncMark |= mark; 780 } else { 781 metadata.syncMark = DBCommon::EraseBit(metadata.syncMark, mark); 782 } 783 LOGD("[Metadata] Mark:%" PRIx64 " sync finish:%d sync mark:%" PRIu64, mark, static_cast<int>(finish), 784 metadata.syncMark); 785 return SaveMetaDataValue(deviceId, metadata); 786 } 787 IsContainSyncMark(const std::string & deviceId,SyncMark syncMark)788 bool Metadata::IsContainSyncMark(const std::string &deviceId, SyncMark syncMark) 789 { 790 MetaDataValue metadata; 791 std::lock_guard<std::mutex> lockGuard(metadataLock_); 792 GetMetaDataValue(deviceId, metadata, true); 793 auto mark = static_cast<uint64_t>(syncMark); 794 return (metadata.syncMark & mark) == mark; 795 } 796 SetRemoteSchemaVersion(const std::string & deviceId,uint64_t schemaVersion)797 int Metadata::SetRemoteSchemaVersion(const std::string &deviceId, uint64_t schemaVersion) 798 { 799 MetaDataValue metadata; 800 std::lock_guard<std::mutex> lockGuard(metadataLock_); 801 GetMetaDataValue(deviceId, metadata, true); 802 metadata.remoteSchemaVersion = schemaVersion; 803 LOGI("[Metadata] Set %.3s schema version %" PRIu64, deviceId.c_str(), schemaVersion); 804 int errCode = SaveMetaDataValue(deviceId, metadata); 805 if (errCode != E_OK) { 806 LOGW("[Metadata] Set remote schema version failed"); 807 } 808 return errCode; 809 } 810 GetRemoteSchemaVersion(const std::string & deviceId)811 uint64_t Metadata::GetRemoteSchemaVersion(const std::string &deviceId) 812 { 813 MetaDataValue metadata; 814 std::lock_guard<std::mutex> lockGuard(metadataLock_); 815 GetMetaDataValue(deviceId, metadata, true); 816 LOGI("[Metadata] Get %.3s schema version %" PRIu64, deviceId.c_str(), metadata.remoteSchemaVersion); 817 return metadata.remoteSchemaVersion; 818 } 819 SetSystemTimeOffset(const std::string & deviceId,int64_t systemTimeOffset)820 int Metadata::SetSystemTimeOffset(const std::string &deviceId, int64_t systemTimeOffset) 821 { 822 MetaDataValue metadata; 823 std::lock_guard<std::mutex> lockGuard(metadataLock_); 824 GetMetaDataValue(deviceId, metadata, true); 825 metadata.systemTimeOffset = systemTimeOffset; 826 LOGI("[Metadata] Set %.3s systemTimeOffset %" PRId64, deviceId.c_str(), systemTimeOffset); 827 return SaveMetaDataValue(deviceId, metadata); 828 } 829 GetSystemTimeOffset(const std::string & deviceId)830 int64_t Metadata::GetSystemTimeOffset(const std::string &deviceId) 831 { 832 MetaDataValue metadata; 833 std::lock_guard<std::mutex> lockGuard(metadataLock_); 834 GetMetaDataValue(deviceId, metadata, true); 835 LOGI("[Metadata] Get %.3s systemTimeOffset %" PRId64, deviceId.c_str(), metadata.systemTimeOffset); 836 return metadata.systemTimeOffset; 837 } 838 GetLocalSchemaVersion()839 std::pair<int, uint64_t> Metadata::GetLocalSchemaVersion() 840 { 841 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_); 842 auto [errCode, localMetaData] = GetLocalMetaData(); 843 if (errCode != E_OK) { 844 return {errCode, 0}; 845 } 846 LOGI("[Metadata] Get local schema version %" PRIu64, localMetaData.localSchemaVersion); 847 return {errCode, localMetaData.localSchemaVersion}; 848 } 849 SetLocalSchemaVersion(uint64_t schemaVersion)850 int Metadata::SetLocalSchemaVersion(uint64_t schemaVersion) 851 { 852 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_); 853 auto [errCode, localMetaData] = GetLocalMetaData(); 854 if (errCode != E_OK) { 855 return errCode; 856 } 857 localMetaData.localSchemaVersion = schemaVersion; 858 LOGI("[Metadata] Set local schema version %" PRIu64, schemaVersion); 859 return SaveLocalMetaData(localMetaData); 860 } 861 SaveLocalMetaData(const LocalMetaData & localMetaData)862 int Metadata::SaveLocalMetaData(const LocalMetaData &localMetaData) 863 { 864 auto [errCode, value] = SerializeLocalMetaData(localMetaData); 865 if (errCode != E_OK) { 866 LOGE("[Metadata] Serialize local meta data failed %d", errCode); 867 return errCode; 868 } 869 std::string k(LOCAL_META_DATA_KEY); 870 Key key(k.begin(), k.end()); 871 return SetMetadataToDb(key, value); 872 } 873 GetLocalMetaData()874 std::pair<int, LocalMetaData> Metadata::GetLocalMetaData() 875 { 876 std::string k(LOCAL_META_DATA_KEY); 877 Key key(k.begin(), k.end()); 878 Value value; 879 int errCode = GetMetadataFromDb(key, value); 880 if (errCode != E_OK && errCode != -E_NOT_FOUND) { 881 return {errCode, {}}; 882 } 883 return DeSerializeLocalMetaData(value); 884 } 885 SerializeLocalMetaData(const LocalMetaData & localMetaData)886 std::pair<int, Value> Metadata::SerializeLocalMetaData(const LocalMetaData &localMetaData) 887 { 888 std::pair<int, Value> res = {E_OK, {}}; 889 auto &[errCode, value] = res; 890 value.resize(CalculateLocalMetaDataLength()); 891 Parcel parcel(value.data(), value.size()); 892 (void)parcel.WriteUInt32(localMetaData.version); 893 parcel.EightByteAlign(); 894 (void)parcel.WriteUInt64(localMetaData.localSchemaVersion); 895 if (parcel.IsError()) { 896 LOGE("[Metadata] Serialize localMetaData failed"); 897 errCode = -E_SERIALIZE_ERROR; 898 value.clear(); 899 return res; 900 } 901 return res; 902 } 903 DeSerializeLocalMetaData(const Value & value)904 std::pair<int, LocalMetaData> Metadata::DeSerializeLocalMetaData(const Value &value) 905 { 906 std::pair<int, LocalMetaData> res; 907 auto &[errCode, meta] = res; 908 if (value.empty()) { 909 errCode = E_OK; 910 return res; 911 } 912 Parcel parcel(const_cast<uint8_t *>(value.data()), value.size()); 913 parcel.ReadUInt32(meta.version); 914 if (meta.version >= LOCAL_META_DATA_VERSION_V2) { 915 parcel.EightByteAlign(); 916 } 917 parcel.ReadUInt64(meta.localSchemaVersion); 918 if (parcel.IsError()) { 919 LOGE("[Metadata] DeSerialize localMetaData failed"); 920 errCode = -E_SERIALIZE_ERROR; 921 return res; 922 } 923 return res; 924 } 925 CalculateLocalMetaDataLength()926 uint64_t Metadata::CalculateLocalMetaDataLength() 927 { 928 uint64_t length = Parcel::GetUInt32Len(); // version 929 length = Parcel::GetEightByteAlign(length); 930 length += Parcel::GetUInt64Len(); // local schema version 931 return length; 932 } 933 MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata)934 Metadata::MetaWaterMarkAutoLock::MetaWaterMarkAutoLock(std::shared_ptr<Metadata> metadata) 935 : metadataPtr_(std::move(metadata)) 936 { 937 if (metadataPtr_ != nullptr) { 938 metadataPtr_->LockWaterMark(); 939 } 940 } 941 ~MetaWaterMarkAutoLock()942 Metadata::MetaWaterMarkAutoLock::~MetaWaterMarkAutoLock() 943 { 944 if (metadataPtr_ != nullptr) { 945 metadataPtr_->UnlockWaterMark(); 946 } 947 } 948 InitLocalMetaData()949 int Metadata::InitLocalMetaData() 950 { 951 std::lock_guard<std::mutex> autoLock(localMetaDataMutex_); 952 auto [errCode, localMetaData] = GetLocalMetaData(); 953 if (errCode != E_OK) { 954 return errCode; 955 } 956 if (localMetaData.localSchemaVersion != 0) { 957 return E_OK; 958 } 959 uint64_t curTime = 0u; 960 errCode = OS::GetCurrentSysTimeInMicrosecond(curTime); 961 if (errCode != E_OK) { 962 LOGW("[Metadata] get system time failed when init schema version!"); 963 localMetaData.localSchemaVersion += 1; 964 } else { 965 localMetaData.localSchemaVersion = curTime; 966 } 967 errCode = SaveLocalMetaData(localMetaData); 968 if (errCode != E_OK) { 969 LOGE("[Metadata] init local schema version failed:%d", errCode); 970 } 971 return errCode; 972 } 973 } // namespace DistributedDB