1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "sqlite_single_ver_natural_store.h" 17 18 #include <algorithm> 19 #include <thread> 20 #include <chrono> 21 22 #include "data_compression.h" 23 #include "db_common.h" 24 #include "db_constant.h" 25 #include "db_dump_helper.h" 26 #include "db_dfx_adapter.h" 27 #include "db_errno.h" 28 #include "generic_single_ver_kv_entry.h" 29 #include "intercepted_data_impl.h" 30 #include "kvdb_utils.h" 31 #include "log_print.h" 32 #include "platform_specific.h" 33 #include "schema_object.h" 34 #include "single_ver_database_oper.h" 35 #include "single_ver_utils.h" 36 #include "storage_engine_manager.h" 37 #include "sqlite_single_ver_natural_store_connection.h" 38 #include "value_hash_calc.h" 39 40 namespace DistributedDB { GetDbPropertyForUpdate()41 KvDBProperties &SQLiteSingleVerNaturalStore::GetDbPropertyForUpdate() 42 { 43 return MyProp(); 44 } 45 HeartBeatForLifeCycle() const46 void SQLiteSingleVerNaturalStore::HeartBeatForLifeCycle() const 47 { 48 std::lock_guard<std::mutex> lock(lifeCycleMutex_); 49 int errCode = ResetLifeCycleTimer(); 50 if (errCode != E_OK) { 51 LOGE("Heart beat for life cycle failed:%d", errCode); 52 } 53 } 54 StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier) const55 int SQLiteSingleVerNaturalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier) const 56 { 57 auto runtimeCxt = RuntimeContext::GetInstance(); 58 if (runtimeCxt == nullptr) { 59 return -E_INVALID_ARGS; 60 } 61 RefObject::IncObjRef(this); 62 TimerId timerId = 0; 63 int errCode = runtimeCxt->SetTimer(autoLifeTime_, 64 [this](TimerId id) -> int { 65 std::lock_guard<std::mutex> lock(lifeCycleMutex_); 66 if (lifeCycleNotifier_) { 67 std::string identifier; 68 if (GetMyProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false)) { 69 identifier = GetMyProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, ""); 70 } else { 71 identifier = GetMyProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, ""); 72 } 73 auto userId = GetMyProperties().GetStringProp(DBProperties::USER_ID, ""); 74 lifeCycleNotifier_(identifier, userId); 75 } 76 return 0; 77 }, 78 [this]() { 79 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { 80 RefObject::DecObjRef(this); 81 }); 82 if (ret != E_OK) { 83 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret); 84 } 85 }, 86 timerId); 87 if (errCode != E_OK) { 88 lifeTimerId_ = 0; 89 LOGE("SetTimer failed:%d", errCode); 90 RefObject::DecObjRef(this); 91 return errCode; 92 } 93 94 lifeCycleNotifier_ = notifier; 95 lifeTimerId_ = timerId; 96 return E_OK; 97 } 98 ResetLifeCycleTimer() const99 int SQLiteSingleVerNaturalStore::ResetLifeCycleTimer() const 100 { 101 if (lifeTimerId_ == 0) { 102 return E_OK; 103 } 104 auto lifeNotifier = lifeCycleNotifier_; 105 lifeCycleNotifier_ = nullptr; 106 int errCode = StopLifeCycleTimer(); 107 if (errCode != E_OK) { 108 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode); 109 } 110 return StartLifeCycleTimer(lifeNotifier); 111 } 112 StopLifeCycleTimer() const113 int SQLiteSingleVerNaturalStore::StopLifeCycleTimer() const 114 { 115 auto runtimeCxt = RuntimeContext::GetInstance(); 116 if (runtimeCxt == nullptr) { 117 return -E_INVALID_ARGS; 118 } 119 if (lifeTimerId_ != 0) { 120 TimerId timerId = lifeTimerId_; 121 lifeTimerId_ = 0; 122 runtimeCxt->RemoveTimer(timerId, false); 123 } 124 return E_OK; 125 } 126 IsDataMigrating() const127 bool SQLiteSingleVerNaturalStore::IsDataMigrating() const 128 { 129 if (storageEngine_ == nullptr) { 130 return false; 131 } 132 133 if (storageEngine_->IsMigrating()) { 134 LOGD("Migrating now."); 135 return true; 136 } 137 return false; 138 } 139 SetConnectionFlag(bool isExisted) const140 void SQLiteSingleVerNaturalStore::SetConnectionFlag(bool isExisted) const 141 { 142 if (storageEngine_ != nullptr) { 143 storageEngine_->SetConnectionFlag(isExisted); 144 } 145 } 146 TriggerToMigrateData() const147 int SQLiteSingleVerNaturalStore::TriggerToMigrateData() const 148 { 149 SQLiteSingleVerStorageEngine *storageEngine = nullptr; 150 { 151 std::lock_guard<std::shared_mutex> autoLock(engineMutex_); 152 if (storageEngine_ == nullptr) { 153 return E_OK; 154 } 155 storageEngine = storageEngine_; 156 RefObject::IncObjRef(storageEngine); 157 } 158 RefObject::IncObjRef(this); 159 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, storageEngine]() { 160 AsyncDataMigration(storageEngine); 161 }); 162 if (errCode != E_OK) { 163 RefObject::DecObjRef(this); 164 RefObject::DecObjRef(storageEngine); 165 LOGE("[SingleVerNStore] Trigger to migrate data failed : %d.", errCode); 166 } 167 return errCode; 168 } 169 IsCacheDBMode() const170 bool SQLiteSingleVerNaturalStore::IsCacheDBMode() const 171 { 172 if (storageEngine_ == nullptr) { 173 LOGE("[SingleVerNStore] IsCacheDBMode storage engine is invalid."); 174 return false; 175 } 176 EngineState engineState = storageEngine_->GetEngineState(); 177 return (engineState == EngineState::CACHEDB); 178 } 179 IsExtendedCacheDBMode() const180 bool SQLiteSingleVerNaturalStore::IsExtendedCacheDBMode() const 181 { 182 if (storageEngine_ == nullptr) { 183 LOGE("[SingleVerNStore] storage engine is invalid."); 184 return false; 185 } 186 EngineState engineState = storageEngine_->GetEngineState(); 187 return (engineState == EngineState::CACHEDB || engineState == EngineState::MIGRATING || 188 engineState == EngineState::ATTACHING); 189 } 190 CheckReadDataControlled() const191 int SQLiteSingleVerNaturalStore::CheckReadDataControlled() const 192 { 193 if (IsExtendedCacheDBMode()) { 194 int err = IsCacheDBMode() ? -E_EKEYREVOKED : -E_BUSY; 195 LOGE("Existed cache database can not read data, errCode = [%d]!", err); 196 return err; 197 } 198 return E_OK; 199 } 200 IncreaseCacheRecordVersion() const201 void SQLiteSingleVerNaturalStore::IncreaseCacheRecordVersion() const 202 { 203 if (storageEngine_ == nullptr) { 204 LOGE("[SingleVerNStore] Increase cache version storage engine is invalid."); 205 return; 206 } 207 storageEngine_->IncreaseCacheRecordVersion(); 208 } 209 GetCacheRecordVersion() const210 uint64_t SQLiteSingleVerNaturalStore::GetCacheRecordVersion() const 211 { 212 if (storageEngine_ == nullptr) { 213 LOGE("[SingleVerNStore] Get cache version storage engine is invalid."); 214 return 0; 215 } 216 return storageEngine_->GetCacheRecordVersion(); 217 } 218 GetAndIncreaseCacheRecordVersion() const219 uint64_t SQLiteSingleVerNaturalStore::GetAndIncreaseCacheRecordVersion() const 220 { 221 if (storageEngine_ == nullptr) { 222 LOGE("[SingleVerNStore] Get and increase cache version storage engine is invalid."); 223 return 0; 224 } 225 return storageEngine_->GetAndIncreaseCacheRecordVersion(); 226 } 227 CheckAmendValueContentForSyncProcedure(std::vector<DataItem> & dataItems) const228 void SQLiteSingleVerNaturalStore::CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const 229 { 230 const SchemaObject &schemaObjRef = MyProp().GetSchemaConstRef(); 231 if (!schemaObjRef.IsSchemaValid()) { 232 // Not a schema database, do not need to check more 233 return; 234 } 235 uint32_t deleteCount = 0; 236 uint32_t amendCount = 0; 237 uint32_t neglectCount = 0; 238 for (auto &eachItem : dataItems) { 239 if ((eachItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG || 240 (eachItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) { 241 // Delete record not concerned 242 deleteCount++; 243 continue; 244 } 245 bool useAmendValue = false; 246 int errCode = CheckValueAndAmendIfNeed(ValueSource::FROM_SYNC, eachItem.value, eachItem.value, useAmendValue); 247 if (errCode != E_OK) { 248 eachItem.neglect = true; 249 neglectCount++; 250 continue; 251 } 252 if (useAmendValue) { 253 amendCount++; 254 } 255 } 256 LOGI("[SqlSinStore][CheckAmendForSync] OriCount=%zu, DeleteCount=%u, AmendCount=%u, NeglectCount=%u", 257 dataItems.size(), deleteCount, amendCount, neglectCount); 258 } 259 NotifyRemotePushFinished(const std::string & targetId) const260 void SQLiteSingleVerNaturalStore::NotifyRemotePushFinished(const std::string &targetId) const 261 { 262 std::string identifier = DBCommon::VectorToHexString(GetIdentifier()); 263 LOGI("label:%.6s sourceTarget: %s{private} push finished", identifier.c_str(), targetId.c_str()); 264 NotifyRemotePushFinishedInner(targetId); 265 } 266 CheckIntegrity() const267 int SQLiteSingleVerNaturalStore::CheckIntegrity() const 268 { 269 int errCode = E_OK; 270 auto handle = GetHandle(true, errCode); 271 if (handle == nullptr) { 272 return errCode; 273 } 274 275 errCode = handle->CheckIntegrity(); 276 ReleaseHandle(handle); 277 return errCode; 278 } 279 SaveCreateDBTimeIfNotExisted()280 int SQLiteSingleVerNaturalStore::SaveCreateDBTimeIfNotExisted() 281 { 282 Timestamp createDBTime = 0; 283 int errCode = GetDatabaseCreateTimestamp(createDBTime); 284 if (errCode == -E_NOT_FOUND) { 285 errCode = SaveCreateDBTime(); 286 } 287 if (errCode != E_OK) { 288 LOGE("SaveCreateDBTimeIfNotExisted failed, errCode=%d.", errCode); 289 } 290 return errCode; 291 } 292 DeleteMetaDataByPrefixKey(const Key & keyPrefix) const293 int SQLiteSingleVerNaturalStore::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const 294 { 295 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) { 296 return -E_INVALID_ARGS; 297 } 298 299 int errCode = E_OK; 300 auto handle = GetHandle(true, errCode); 301 if (handle == nullptr) { 302 return errCode; 303 } 304 305 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix); 306 if (errCode != E_OK) { 307 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode); 308 } 309 310 ReleaseHandle(handle); 311 HeartBeatForLifeCycle(); 312 return errCode; 313 } 314 GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const315 int SQLiteSingleVerNaturalStore::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const 316 { 317 needCompressOnSync = GetDbProperties().GetBoolProp(KvDBProperties::COMPRESS_ON_SYNC, false); 318 compressionRate = GetDbProperties().GetIntProp(KvDBProperties::COMPRESSION_RATE, 319 DBConstant::DEFAULT_COMPTRESS_RATE); 320 return E_OK; 321 } 322 GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const323 int SQLiteSingleVerNaturalStore::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const 324 { 325 algorithmSet.clear(); 326 DataCompression::GetCompressionAlgo(algorithmSet); 327 return E_OK; 328 } 329 CheckAndInitQueryCondition(QueryObject & query) const330 int SQLiteSingleVerNaturalStore::CheckAndInitQueryCondition(QueryObject &query) const 331 { 332 const SchemaObject &localSchema = MyProp().GetSchemaConstRef(); 333 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) { 334 // Flatbuffer schema is not support subscribe 335 return -E_NOT_SUPPORT; 336 } 337 query.SetSchema(localSchema); 338 339 int errCode = E_OK; 340 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode); 341 if (handle == nullptr) { 342 return errCode; 343 } 344 345 errCode = handle->CheckQueryObjectLegal(query); 346 if (errCode != E_OK) { 347 LOGE("Check query condition failed [%d]!", errCode); 348 } 349 ReleaseHandle(handle); 350 return errCode; 351 } 352 SetSendDataInterceptor(const PushDataInterceptor & interceptor)353 void SQLiteSingleVerNaturalStore::SetSendDataInterceptor(const PushDataInterceptor &interceptor) 354 { 355 std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_); 356 pushDataInterceptor_ = interceptor; 357 } 358 InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const359 int SQLiteSingleVerNaturalStore::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID, 360 const std::string &targetID, bool isPush) const 361 { 362 PushDataInterceptor interceptor = nullptr; 363 { 364 std::shared_lock<std::shared_mutex> lock(dataInterceptorMutex_); 365 interceptor = isPush ? pushDataInterceptor_ : receiveDataInterceptor_; 366 if (interceptor == nullptr) { 367 return E_OK; 368 } 369 } 370 371 InterceptedDataImpl data(entries, [this](const Value &newValue) -> int { 372 bool useAmendValue = false; 373 Value amendValue = newValue; 374 return this->CheckValueAndAmendIfNeed(ValueSource::FROM_LOCAL, newValue, amendValue, useAmendValue); 375 } 376 ); 377 378 int errCode = interceptor(data, sourceID, targetID); 379 if (data.IsError()) { 380 if (isPush) { 381 // receive data release by syncer 382 SingleVerKvEntry::Release(entries); 383 } 384 LOGE("Intercept data failed:%d.", errCode); 385 return -E_INTERCEPT_DATA_FAIL; 386 } 387 return E_OK; 388 } 389 AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)390 int SQLiteSingleVerNaturalStore::AddSubscribe(const std::string &subscribeId, const QueryObject &query, 391 bool needCacheSubscribe) 392 { 393 if (IsSupportSubscribe() != E_OK) { 394 return -E_NOT_SUPPORT; 395 } 396 const SchemaObject &localSchema = MyProp().GetSchemaConstRef(); 397 QueryObject queryInner = query; 398 queryInner.SetSchema(localSchema); 399 if (IsExtendedCacheDBMode() && needCacheSubscribe) { // cache auto subscribe when engine state is in CACHEDB mode 400 LOGI("Cache subscribe query and return ok when in cacheDB."); 401 storageEngine_->CacheSubscribe(subscribeId, queryInner); 402 return E_OK; 403 } 404 405 int errCode = E_OK; 406 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode); 407 if (handle == nullptr) { 408 return errCode; 409 } 410 411 errCode = handle->StartTransaction(TransactType::IMMEDIATE); 412 if (errCode != E_OK) { 413 ReleaseHandle(handle); 414 return errCode; 415 } 416 417 errCode = handle->AddSubscribeTrigger(queryInner, subscribeId); 418 if (errCode != E_OK) { 419 LOGE("Add subscribe trigger failed: %d", errCode); 420 (void)handle->Rollback(); 421 } else { 422 errCode = handle->Commit(); 423 } 424 ReleaseHandle(handle); 425 return errCode; 426 } 427 SetMaxLogSize(uint64_t limit)428 int SQLiteSingleVerNaturalStore::SetMaxLogSize(uint64_t limit) 429 { 430 LOGI("Set the max log size to %" PRIu64, limit); 431 maxLogSize_.store(limit); 432 return E_OK; 433 } GetMaxLogSize() const434 uint64_t SQLiteSingleVerNaturalStore::GetMaxLogSize() const 435 { 436 return maxLogSize_.load(); 437 } 438 Dump(int fd)439 void SQLiteSingleVerNaturalStore::Dump(int fd) 440 { 441 std::string userId = MyProp().GetStringProp(DBProperties::USER_ID, ""); 442 std::string appId = MyProp().GetStringProp(DBProperties::APP_ID, ""); 443 std::string storeId = MyProp().GetStringProp(DBProperties::STORE_ID, ""); 444 std::string label = MyProp().GetStringProp(DBProperties::IDENTIFIER_DATA, ""); 445 label = DBCommon::TransferStringToHex(label); 446 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", 447 userId.c_str(), appId.c_str(), storeId.c_str(), label.c_str()); 448 SyncAbleKvDB::Dump(fd); 449 } 450 IsSupportSubscribe() const451 int SQLiteSingleVerNaturalStore::IsSupportSubscribe() const 452 { 453 const SchemaObject &localSchema = MyProp().GetSchemaConstRef(); 454 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) { 455 // Flatbuffer schema is not support subscribe 456 return -E_NOT_SUPPORT; 457 } 458 return E_OK; 459 } 460 RemoveDeviceDataInner(const std::string & hashDev,bool isNeedNotify)461 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify) 462 { 463 int errCode = E_OK; 464 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode); 465 if (handle == nullptr) { 466 LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode); 467 return errCode; 468 } 469 uint64_t logFileSize = handle->GetLogFileSize(); 470 ReleaseHandle(handle); 471 if (logFileSize > GetMaxLogSize()) { 472 LOGW("[SingleVerNStore] RmDevData log size[%" PRIu64 "] over the limit", logFileSize); 473 return -E_LOG_OVER_LIMITS; 474 } 475 476 std::set<std::string> removeDevices; 477 if (hashDev.empty()) { 478 errCode = GetExistsDeviceList(removeDevices); 479 if (errCode != E_OK) { 480 LOGE("[SingleVerNStore] get remove device list failed:%d", errCode); 481 return errCode; 482 } 483 } else { 484 removeDevices.insert(hashDev); 485 } 486 487 LOGD("[SingleVerNStore] remove device data, size=%zu", removeDevices.size()); 488 for (const auto &iterDevice : removeDevices) { 489 // Call the syncer module to erase the water mark. 490 errCode = EraseDeviceWaterMark(iterDevice, false); 491 if (errCode != E_OK) { 492 LOGE("[SingleVerNStore] erase water mark failed:%d", errCode); 493 return errCode; 494 } 495 } 496 497 CleanAllWaterMark(); 498 if (IsExtendedCacheDBMode()) { 499 errCode = RemoveDeviceDataInCacheMode(hashDev, isNeedNotify); 500 } else { 501 errCode = RemoveDeviceDataNormally(hashDev, isNeedNotify); 502 } 503 if (errCode != E_OK) { 504 LOGE("[SingleVerNStore] RemoveDeviceData failed:%d", errCode); 505 } 506 507 return errCode; 508 } 509 RemoveDeviceDataInner(const std::string & hashDev,ClearMode mode)510 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode) 511 { 512 int errCode = E_OK; 513 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode); 514 if (handle == nullptr) { 515 LOGE("[SingleVerNStore] RemoveDeviceData with mode get handle failed:%d", errCode); 516 return errCode; 517 } 518 errCode = handle->StartTransaction(TransactType::IMMEDIATE); 519 if (errCode != E_OK) { 520 LOGE("Start transaction failed %d in RemoveDeviceData.", errCode); 521 ReleaseHandle(handle); 522 return errCode; 523 } 524 errCode = handle->RemoveDeviceData(hashDev, mode); 525 if (errCode != E_OK) { 526 LOGE("RemoveDeviceData failed: %d", errCode); 527 (void)handle->Rollback(); 528 } else { 529 errCode = handle->Commit(); 530 if (errCode != E_OK) { 531 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode); 532 } 533 } 534 ReleaseHandle(handle); 535 return errCode; 536 } 537 RemoveDeviceDataInner(const std::string & hashDev,const std::string & user,ClearMode mode)538 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, const std::string &user, 539 ClearMode mode) 540 { 541 int errCode = E_OK; 542 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode); 543 if (handle == nullptr) { 544 LOGE("[SingleVerNStore] RemoveDeviceData with user and mode get handle failed:%d", errCode); 545 return errCode; 546 } 547 errCode = handle->StartTransaction(TransactType::IMMEDIATE); 548 if (errCode != E_OK) { 549 LOGE("Start transaction failed %d in RemoveDeviceData.", errCode); 550 ReleaseHandle(handle); 551 return errCode; 552 } 553 errCode = handle->RemoveDeviceData(hashDev, user, mode); 554 if (errCode != E_OK) { 555 LOGE("RemoveDeviceData failed: %d", errCode); 556 (void)handle->Rollback(); 557 } else { 558 errCode = handle->Commit(); 559 if (errCode != E_OK) { 560 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode); 561 } 562 } 563 ReleaseHandle(handle); 564 return errCode; 565 } 566 AbortHandle()567 void SQLiteSingleVerNaturalStore::AbortHandle() 568 { 569 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_); 570 abortPerm_ = OperatePerm::RESTART_SYNC_PERM; 571 } 572 EnableHandle()573 void SQLiteSingleVerNaturalStore::EnableHandle() 574 { 575 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_); 576 abortPerm_ = OperatePerm::NORMAL_PERM; 577 } 578 TryHandle() const579 int SQLiteSingleVerNaturalStore::TryHandle() const 580 { 581 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_); 582 if (abortPerm_ == OperatePerm::RESTART_SYNC_PERM) { 583 LOGW("[SingleVerNStore] Restarting sync, handle id[%s] is busy", 584 DBCommon::TransferStringToHex(storageEngine_->GetIdentifier()).c_str()); 585 return -E_BUSY; 586 } 587 return E_OK; 588 } 589 GetStorageExecutor(bool isWrite)590 std::pair<int, SQLiteSingleVerStorageExecutor*> SQLiteSingleVerNaturalStore::GetStorageExecutor(bool isWrite) 591 { 592 int errCode = E_OK; 593 SQLiteSingleVerStorageExecutor *handle = GetHandle(isWrite, errCode); 594 return {errCode, handle}; 595 } 596 RecycleStorageExecutor(SQLiteSingleVerStorageExecutor * executor)597 void SQLiteSingleVerNaturalStore::RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor) 598 { 599 ReleaseHandle(executor); 600 } 601 GetLocalTimeOffsetForCloud()602 TimeOffset SQLiteSingleVerNaturalStore::GetLocalTimeOffsetForCloud() 603 { 604 return GetLocalTimeOffset(); 605 } 606 RegisterObserverAction(const KvStoreObserver * observer,const ObserverAction & action)607 int SQLiteSingleVerNaturalStore::RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action) 608 { 609 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_); 610 if (sqliteCloudKvStore_ == nullptr) { 611 return -E_INTERNAL_ERROR; 612 } 613 sqliteCloudKvStore_->RegisterObserverAction(observer, action); 614 return E_OK; 615 } 616 UnRegisterObserverAction(const KvStoreObserver * observer)617 int SQLiteSingleVerNaturalStore::UnRegisterObserverAction(const KvStoreObserver *observer) 618 { 619 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_); 620 if (sqliteCloudKvStore_ == nullptr) { 621 return -E_INTERNAL_ERROR; 622 } 623 sqliteCloudKvStore_->UnRegisterObserverAction(observer); 624 return E_OK; 625 } 626 GetCloudVersion(const std::string & device,std::map<std::string,std::string> & versionMap)627 int SQLiteSingleVerNaturalStore::GetCloudVersion(const std::string &device, 628 std::map<std::string, std::string> &versionMap) 629 { 630 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_); 631 if (sqliteCloudKvStore_ == nullptr) { 632 return -E_INTERNAL_ERROR; 633 } 634 return sqliteCloudKvStore_->GetCloudVersion(device, versionMap); 635 } 636 SetReceiveDataInterceptor(const DataInterceptor & interceptor)637 void SQLiteSingleVerNaturalStore::SetReceiveDataInterceptor(const DataInterceptor &interceptor) 638 { 639 std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_); 640 receiveDataInterceptor_ = interceptor; 641 } 642 SetCloudSyncConfig(const CloudSyncConfig & config)643 int SQLiteSingleVerNaturalStore::SetCloudSyncConfig(const CloudSyncConfig &config) 644 { 645 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_); 646 if (sqliteCloudKvStore_ == nullptr) { 647 LOGE("[SingleVerNStore] DB is null when set config"); 648 return -E_INTERNAL_ERROR; 649 } 650 sqliteCloudKvStore_->SetCloudSyncConfig(config); 651 return E_OK; 652 } 653 } 654