1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_natural_store.h"
17
18 #include <algorithm>
19 #include <thread>
20 #include <chrono>
21
22 #include "data_compression.h"
23 #include "db_common.h"
24 #include "db_constant.h"
25 #include "db_dump_helper.h"
26 #include "db_dfx_adapter.h"
27 #include "db_errno.h"
28 #include "generic_single_ver_kv_entry.h"
29 #include "intercepted_data_impl.h"
30 #include "kvdb_utils.h"
31 #include "log_print.h"
32 #include "platform_specific.h"
33 #include "schema_object.h"
34 #include "single_ver_database_oper.h"
35 #include "single_ver_utils.h"
36 #include "storage_engine_manager.h"
37 #include "sqlite_single_ver_natural_store_connection.h"
38 #include "value_hash_calc.h"
39
40 namespace DistributedDB {
GetDbPropertyForUpdate()41 KvDBProperties &SQLiteSingleVerNaturalStore::GetDbPropertyForUpdate()
42 {
43 return MyProp();
44 }
45
HeartBeatForLifeCycle() const46 void SQLiteSingleVerNaturalStore::HeartBeatForLifeCycle() const
47 {
48 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
49 int errCode = ResetLifeCycleTimer();
50 if (errCode != E_OK) {
51 LOGE("Heart beat for life cycle failed:%d", errCode);
52 }
53 }
54
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier) const55 int SQLiteSingleVerNaturalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier) const
56 {
57 auto runtimeCxt = RuntimeContext::GetInstance();
58 if (runtimeCxt == nullptr) {
59 return -E_INVALID_ARGS;
60 }
61 RefObject::IncObjRef(this);
62 TimerId timerId = 0;
63 int errCode = runtimeCxt->SetTimer(autoLifeTime_,
64 [this](TimerId id) -> int {
65 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
66 if (lifeCycleNotifier_) {
67 std::string identifier;
68 if (GetMyProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
69 identifier = GetMyProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
70 } else {
71 identifier = GetMyProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
72 }
73 auto userId = GetMyProperties().GetStringProp(DBProperties::USER_ID, "");
74 lifeCycleNotifier_(identifier, userId);
75 }
76 return 0;
77 },
78 [this]() {
79 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() {
80 RefObject::DecObjRef(this);
81 });
82 if (ret != E_OK) {
83 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
84 }
85 },
86 timerId);
87 if (errCode != E_OK) {
88 lifeTimerId_ = 0;
89 LOGE("SetTimer failed:%d", errCode);
90 RefObject::DecObjRef(this);
91 return errCode;
92 }
93
94 lifeCycleNotifier_ = notifier;
95 lifeTimerId_ = timerId;
96 return E_OK;
97 }
98
ResetLifeCycleTimer() const99 int SQLiteSingleVerNaturalStore::ResetLifeCycleTimer() const
100 {
101 if (lifeTimerId_ == 0) {
102 return E_OK;
103 }
104 auto lifeNotifier = lifeCycleNotifier_;
105 lifeCycleNotifier_ = nullptr;
106 int errCode = StopLifeCycleTimer();
107 if (errCode != E_OK) {
108 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
109 }
110 return StartLifeCycleTimer(lifeNotifier);
111 }
112
StopLifeCycleTimer() const113 int SQLiteSingleVerNaturalStore::StopLifeCycleTimer() const
114 {
115 auto runtimeCxt = RuntimeContext::GetInstance();
116 if (runtimeCxt == nullptr) {
117 return -E_INVALID_ARGS;
118 }
119 if (lifeTimerId_ != 0) {
120 TimerId timerId = lifeTimerId_;
121 lifeTimerId_ = 0;
122 runtimeCxt->RemoveTimer(timerId, false);
123 }
124 return E_OK;
125 }
126
IsDataMigrating() const127 bool SQLiteSingleVerNaturalStore::IsDataMigrating() const
128 {
129 if (storageEngine_ == nullptr) {
130 return false;
131 }
132
133 if (storageEngine_->IsMigrating()) {
134 LOGD("Migrating now.");
135 return true;
136 }
137 return false;
138 }
139
SetConnectionFlag(bool isExisted) const140 void SQLiteSingleVerNaturalStore::SetConnectionFlag(bool isExisted) const
141 {
142 if (storageEngine_ != nullptr) {
143 storageEngine_->SetConnectionFlag(isExisted);
144 }
145 }
146
TriggerToMigrateData() const147 int SQLiteSingleVerNaturalStore::TriggerToMigrateData() const
148 {
149 SQLiteSingleVerStorageEngine *storageEngine = nullptr;
150 {
151 std::lock_guard<std::shared_mutex> autoLock(engineMutex_);
152 if (storageEngine_ == nullptr) {
153 return E_OK;
154 }
155 storageEngine = storageEngine_;
156 RefObject::IncObjRef(storageEngine);
157 }
158 RefObject::IncObjRef(this);
159 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, storageEngine]() {
160 AsyncDataMigration(storageEngine);
161 });
162 if (errCode != E_OK) {
163 RefObject::DecObjRef(this);
164 RefObject::DecObjRef(storageEngine);
165 LOGE("[SingleVerNStore] Trigger to migrate data failed : %d.", errCode);
166 }
167 return errCode;
168 }
169
IsCacheDBMode() const170 bool SQLiteSingleVerNaturalStore::IsCacheDBMode() const
171 {
172 if (storageEngine_ == nullptr) {
173 LOGE("[SingleVerNStore] IsCacheDBMode storage engine is invalid.");
174 return false;
175 }
176 EngineState engineState = storageEngine_->GetEngineState();
177 return (engineState == EngineState::CACHEDB);
178 }
179
IsExtendedCacheDBMode() const180 bool SQLiteSingleVerNaturalStore::IsExtendedCacheDBMode() const
181 {
182 if (storageEngine_ == nullptr) {
183 LOGE("[SingleVerNStore] storage engine is invalid.");
184 return false;
185 }
186 EngineState engineState = storageEngine_->GetEngineState();
187 return (engineState == EngineState::CACHEDB || engineState == EngineState::MIGRATING ||
188 engineState == EngineState::ATTACHING);
189 }
190
CheckReadDataControlled() const191 int SQLiteSingleVerNaturalStore::CheckReadDataControlled() const
192 {
193 if (IsExtendedCacheDBMode()) {
194 int err = IsCacheDBMode() ? -E_EKEYREVOKED : -E_BUSY;
195 LOGE("Existed cache database can not read data, errCode = [%d]!", err);
196 return err;
197 }
198 return E_OK;
199 }
200
IncreaseCacheRecordVersion() const201 void SQLiteSingleVerNaturalStore::IncreaseCacheRecordVersion() const
202 {
203 if (storageEngine_ == nullptr) {
204 LOGE("[SingleVerNStore] Increase cache version storage engine is invalid.");
205 return;
206 }
207 storageEngine_->IncreaseCacheRecordVersion();
208 }
209
GetCacheRecordVersion() const210 uint64_t SQLiteSingleVerNaturalStore::GetCacheRecordVersion() const
211 {
212 if (storageEngine_ == nullptr) {
213 LOGE("[SingleVerNStore] Get cache version storage engine is invalid.");
214 return 0;
215 }
216 return storageEngine_->GetCacheRecordVersion();
217 }
218
GetAndIncreaseCacheRecordVersion() const219 uint64_t SQLiteSingleVerNaturalStore::GetAndIncreaseCacheRecordVersion() const
220 {
221 if (storageEngine_ == nullptr) {
222 LOGE("[SingleVerNStore] Get and increase cache version storage engine is invalid.");
223 return 0;
224 }
225 return storageEngine_->GetAndIncreaseCacheRecordVersion();
226 }
227
CheckAmendValueContentForSyncProcedure(std::vector<DataItem> & dataItems) const228 void SQLiteSingleVerNaturalStore::CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const
229 {
230 const SchemaObject &schemaObjRef = MyProp().GetSchemaConstRef();
231 if (!schemaObjRef.IsSchemaValid()) {
232 // Not a schema database, do not need to check more
233 return;
234 }
235 uint32_t deleteCount = 0;
236 uint32_t amendCount = 0;
237 uint32_t neglectCount = 0;
238 for (auto &eachItem : dataItems) {
239 if ((eachItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
240 (eachItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
241 // Delete record not concerned
242 deleteCount++;
243 continue;
244 }
245 bool useAmendValue = false;
246 int errCode = CheckValueAndAmendIfNeed(ValueSource::FROM_SYNC, eachItem.value, eachItem.value, useAmendValue);
247 if (errCode != E_OK) {
248 eachItem.neglect = true;
249 neglectCount++;
250 continue;
251 }
252 if (useAmendValue) {
253 amendCount++;
254 }
255 }
256 LOGI("[SqlSinStore][CheckAmendForSync] OriCount=%zu, DeleteCount=%u, AmendCount=%u, NeglectCount=%u",
257 dataItems.size(), deleteCount, amendCount, neglectCount);
258 }
259
NotifyRemotePushFinished(const std::string & targetId) const260 void SQLiteSingleVerNaturalStore::NotifyRemotePushFinished(const std::string &targetId) const
261 {
262 std::string identifier = DBCommon::VectorToHexString(GetIdentifier());
263 LOGI("label:%.6s sourceTarget: %s{private} push finished", identifier.c_str(), targetId.c_str());
264 NotifyRemotePushFinishedInner(targetId);
265 }
266
CheckIntegrity() const267 int SQLiteSingleVerNaturalStore::CheckIntegrity() const
268 {
269 int errCode = E_OK;
270 auto handle = GetHandle(true, errCode);
271 if (handle == nullptr) {
272 return errCode;
273 }
274
275 errCode = handle->CheckIntegrity();
276 ReleaseHandle(handle);
277 return errCode;
278 }
279
SaveCreateDBTimeIfNotExisted()280 int SQLiteSingleVerNaturalStore::SaveCreateDBTimeIfNotExisted()
281 {
282 Timestamp createDBTime = 0;
283 int errCode = GetDatabaseCreateTimestamp(createDBTime);
284 if (errCode == -E_NOT_FOUND) {
285 errCode = SaveCreateDBTime();
286 }
287 if (errCode != E_OK) {
288 LOGE("SaveCreateDBTimeIfNotExisted failed, errCode=%d.", errCode);
289 }
290 return errCode;
291 }
292
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const293 int SQLiteSingleVerNaturalStore::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
294 {
295 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
296 return -E_INVALID_ARGS;
297 }
298
299 int errCode = E_OK;
300 auto handle = GetHandle(true, errCode);
301 if (handle == nullptr) {
302 return errCode;
303 }
304
305 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
306 if (errCode != E_OK) {
307 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
308 }
309
310 ReleaseHandle(handle);
311 HeartBeatForLifeCycle();
312 return errCode;
313 }
314
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const315 int SQLiteSingleVerNaturalStore::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
316 {
317 needCompressOnSync = GetDbProperties().GetBoolProp(KvDBProperties::COMPRESS_ON_SYNC, false);
318 compressionRate = GetDbProperties().GetIntProp(KvDBProperties::COMPRESSION_RATE,
319 DBConstant::DEFAULT_COMPTRESS_RATE);
320 return E_OK;
321 }
322
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const323 int SQLiteSingleVerNaturalStore::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
324 {
325 algorithmSet.clear();
326 DataCompression::GetCompressionAlgo(algorithmSet);
327 return E_OK;
328 }
329
CheckAndInitQueryCondition(QueryObject & query) const330 int SQLiteSingleVerNaturalStore::CheckAndInitQueryCondition(QueryObject &query) const
331 {
332 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
333 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
334 // Flatbuffer schema is not support subscribe
335 return -E_NOT_SUPPORT;
336 }
337 query.SetSchema(localSchema);
338
339 int errCode = E_OK;
340 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
341 if (handle == nullptr) {
342 return errCode;
343 }
344
345 errCode = handle->CheckQueryObjectLegal(query);
346 if (errCode != E_OK) {
347 LOGE("Check query condition failed [%d]!", errCode);
348 }
349 ReleaseHandle(handle);
350 return errCode;
351 }
352
SetSendDataInterceptor(const PushDataInterceptor & interceptor)353 void SQLiteSingleVerNaturalStore::SetSendDataInterceptor(const PushDataInterceptor &interceptor)
354 {
355 std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_);
356 pushDataInterceptor_ = interceptor;
357 }
358
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const359 int SQLiteSingleVerNaturalStore::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
360 const std::string &targetID, bool isPush) const
361 {
362 PushDataInterceptor interceptor = nullptr;
363 {
364 std::shared_lock<std::shared_mutex> lock(dataInterceptorMutex_);
365 interceptor = isPush ? pushDataInterceptor_ : receiveDataInterceptor_;
366 if (interceptor == nullptr) {
367 return E_OK;
368 }
369 }
370
371 InterceptedDataImpl data(entries, [this](const Value &newValue) -> int {
372 bool useAmendValue = false;
373 Value amendValue = newValue;
374 return this->CheckValueAndAmendIfNeed(ValueSource::FROM_LOCAL, newValue, amendValue, useAmendValue);
375 }
376 );
377
378 int errCode = interceptor(data, sourceID, targetID);
379 if (data.IsError()) {
380 if (isPush) {
381 // receive data release by syncer
382 SingleVerKvEntry::Release(entries);
383 }
384 LOGE("Intercept data failed:%d.", errCode);
385 return -E_INTERCEPT_DATA_FAIL;
386 }
387 return E_OK;
388 }
389
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)390 int SQLiteSingleVerNaturalStore::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
391 bool needCacheSubscribe)
392 {
393 if (IsSupportSubscribe() != E_OK) {
394 return -E_NOT_SUPPORT;
395 }
396 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
397 QueryObject queryInner = query;
398 queryInner.SetSchema(localSchema);
399 if (IsExtendedCacheDBMode() && needCacheSubscribe) { // cache auto subscribe when engine state is in CACHEDB mode
400 LOGI("Cache subscribe query and return ok when in cacheDB.");
401 storageEngine_->CacheSubscribe(subscribeId, queryInner);
402 return E_OK;
403 }
404
405 int errCode = E_OK;
406 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
407 if (handle == nullptr) {
408 return errCode;
409 }
410
411 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
412 if (errCode != E_OK) {
413 ReleaseHandle(handle);
414 return errCode;
415 }
416
417 errCode = handle->AddSubscribeTrigger(queryInner, subscribeId);
418 if (errCode != E_OK) {
419 LOGE("Add subscribe trigger failed: %d", errCode);
420 (void)handle->Rollback();
421 } else {
422 errCode = handle->Commit();
423 }
424 ReleaseHandle(handle);
425 return errCode;
426 }
427
SetMaxLogSize(uint64_t limit)428 int SQLiteSingleVerNaturalStore::SetMaxLogSize(uint64_t limit)
429 {
430 LOGI("Set the max log size to %" PRIu64, limit);
431 maxLogSize_.store(limit);
432 return E_OK;
433 }
GetMaxLogSize() const434 uint64_t SQLiteSingleVerNaturalStore::GetMaxLogSize() const
435 {
436 return maxLogSize_.load();
437 }
438
Dump(int fd)439 void SQLiteSingleVerNaturalStore::Dump(int fd)
440 {
441 std::string userId = MyProp().GetStringProp(DBProperties::USER_ID, "");
442 std::string appId = MyProp().GetStringProp(DBProperties::APP_ID, "");
443 std::string storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
444 std::string label = MyProp().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
445 label = DBCommon::TransferStringToHex(label);
446 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n",
447 userId.c_str(), appId.c_str(), storeId.c_str(), label.c_str());
448 SyncAbleKvDB::Dump(fd);
449 }
450
IsSupportSubscribe() const451 int SQLiteSingleVerNaturalStore::IsSupportSubscribe() const
452 {
453 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
454 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
455 // Flatbuffer schema is not support subscribe
456 return -E_NOT_SUPPORT;
457 }
458 return E_OK;
459 }
460
RemoveDeviceDataInner(const std::string & hashDev,bool isNeedNotify)461 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify)
462 {
463 int errCode = E_OK;
464 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
465 if (handle == nullptr) {
466 LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode);
467 return errCode;
468 }
469 uint64_t logFileSize = handle->GetLogFileSize();
470 ReleaseHandle(handle);
471 if (logFileSize > GetMaxLogSize()) {
472 LOGW("[SingleVerNStore] RmDevData log size[%" PRIu64 "] over the limit", logFileSize);
473 return -E_LOG_OVER_LIMITS;
474 }
475
476 std::set<std::string> removeDevices;
477 if (hashDev.empty()) {
478 errCode = GetExistsDeviceList(removeDevices);
479 if (errCode != E_OK) {
480 LOGE("[SingleVerNStore] get remove device list failed:%d", errCode);
481 return errCode;
482 }
483 } else {
484 removeDevices.insert(hashDev);
485 }
486
487 LOGD("[SingleVerNStore] remove device data, size=%zu", removeDevices.size());
488 for (const auto &iterDevice : removeDevices) {
489 // Call the syncer module to erase the water mark.
490 errCode = EraseDeviceWaterMark(iterDevice, false);
491 if (errCode != E_OK) {
492 LOGE("[SingleVerNStore] erase water mark failed:%d", errCode);
493 return errCode;
494 }
495 }
496
497 CleanAllWaterMark();
498 if (IsExtendedCacheDBMode()) {
499 errCode = RemoveDeviceDataInCacheMode(hashDev, isNeedNotify);
500 } else {
501 errCode = RemoveDeviceDataNormally(hashDev, isNeedNotify);
502 }
503 if (errCode != E_OK) {
504 LOGE("[SingleVerNStore] RemoveDeviceData failed:%d", errCode);
505 }
506
507 return errCode;
508 }
509
RemoveDeviceDataInner(const std::string & hashDev,ClearMode mode)510 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode)
511 {
512 int errCode = E_OK;
513 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
514 if (handle == nullptr) {
515 LOGE("[SingleVerNStore] RemoveDeviceData with mode get handle failed:%d", errCode);
516 return errCode;
517 }
518 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
519 if (errCode != E_OK) {
520 LOGE("Start transaction failed %d in RemoveDeviceData.", errCode);
521 ReleaseHandle(handle);
522 return errCode;
523 }
524 errCode = handle->RemoveDeviceData(hashDev, mode);
525 if (errCode != E_OK) {
526 LOGE("RemoveDeviceData failed: %d", errCode);
527 (void)handle->Rollback();
528 } else {
529 errCode = handle->Commit();
530 if (errCode != E_OK) {
531 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode);
532 }
533 }
534 ReleaseHandle(handle);
535 return errCode;
536 }
537
RemoveDeviceDataInner(const std::string & hashDev,const std::string & user,ClearMode mode)538 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, const std::string &user,
539 ClearMode mode)
540 {
541 int errCode = E_OK;
542 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
543 if (handle == nullptr) {
544 LOGE("[SingleVerNStore] RemoveDeviceData with user and mode get handle failed:%d", errCode);
545 return errCode;
546 }
547 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
548 if (errCode != E_OK) {
549 LOGE("Start transaction failed %d in RemoveDeviceData.", errCode);
550 ReleaseHandle(handle);
551 return errCode;
552 }
553 errCode = handle->RemoveDeviceData(hashDev, user, mode);
554 if (errCode != E_OK) {
555 LOGE("RemoveDeviceData failed: %d", errCode);
556 (void)handle->Rollback();
557 } else {
558 errCode = handle->Commit();
559 if (errCode != E_OK) {
560 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode);
561 }
562 }
563 ReleaseHandle(handle);
564 return errCode;
565 }
566
AbortHandle()567 void SQLiteSingleVerNaturalStore::AbortHandle()
568 {
569 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
570 abortPerm_ = OperatePerm::RESTART_SYNC_PERM;
571 }
572
EnableHandle()573 void SQLiteSingleVerNaturalStore::EnableHandle()
574 {
575 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
576 abortPerm_ = OperatePerm::NORMAL_PERM;
577 }
578
TryHandle() const579 int SQLiteSingleVerNaturalStore::TryHandle() const
580 {
581 std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
582 if (abortPerm_ == OperatePerm::RESTART_SYNC_PERM) {
583 LOGW("[SingleVerNStore] Restarting sync, handle id[%s] is busy",
584 DBCommon::TransferStringToHex(storageEngine_->GetIdentifier()).c_str());
585 return -E_BUSY;
586 }
587 return E_OK;
588 }
589
GetStorageExecutor(bool isWrite)590 std::pair<int, SQLiteSingleVerStorageExecutor*> SQLiteSingleVerNaturalStore::GetStorageExecutor(bool isWrite)
591 {
592 int errCode = E_OK;
593 SQLiteSingleVerStorageExecutor *handle = GetHandle(isWrite, errCode);
594 return {errCode, handle};
595 }
596
RecycleStorageExecutor(SQLiteSingleVerStorageExecutor * executor)597 void SQLiteSingleVerNaturalStore::RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor)
598 {
599 ReleaseHandle(executor);
600 }
601
GetLocalTimeOffsetForCloud()602 TimeOffset SQLiteSingleVerNaturalStore::GetLocalTimeOffsetForCloud()
603 {
604 return GetLocalTimeOffset();
605 }
606
RegisterObserverAction(const KvStoreObserver * observer,const ObserverAction & action)607 int SQLiteSingleVerNaturalStore::RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action)
608 {
609 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
610 if (sqliteCloudKvStore_ == nullptr) {
611 return -E_INTERNAL_ERROR;
612 }
613 sqliteCloudKvStore_->RegisterObserverAction(observer, action);
614 return E_OK;
615 }
616
UnRegisterObserverAction(const KvStoreObserver * observer)617 int SQLiteSingleVerNaturalStore::UnRegisterObserverAction(const KvStoreObserver *observer)
618 {
619 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
620 if (sqliteCloudKvStore_ == nullptr) {
621 return -E_INTERNAL_ERROR;
622 }
623 sqliteCloudKvStore_->UnRegisterObserverAction(observer);
624 return E_OK;
625 }
626
GetCloudVersion(const std::string & device,std::map<std::string,std::string> & versionMap)627 int SQLiteSingleVerNaturalStore::GetCloudVersion(const std::string &device,
628 std::map<std::string, std::string> &versionMap)
629 {
630 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
631 if (sqliteCloudKvStore_ == nullptr) {
632 return -E_INTERNAL_ERROR;
633 }
634 return sqliteCloudKvStore_->GetCloudVersion(device, versionMap);
635 }
636
SetReceiveDataInterceptor(const DataInterceptor & interceptor)637 void SQLiteSingleVerNaturalStore::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
638 {
639 std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_);
640 receiveDataInterceptor_ = interceptor;
641 }
642
SetCloudSyncConfig(const CloudSyncConfig & config)643 int SQLiteSingleVerNaturalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
644 {
645 std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
646 if (sqliteCloudKvStore_ == nullptr) {
647 LOGE("[SingleVerNStore] DB is null when set config");
648 return -E_INTERNAL_ERROR;
649 }
650 sqliteCloudKvStore_->SetCloudSyncConfig(config);
651 return E_OK;
652 }
653 }
654