1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb.h"
17 
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24 
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27 
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29     : started_(false),
30       closed_(false),
31       isSyncModuleActiveCheck_(false),
32       isSyncNeedActive_(true),
33       notifyChain_(nullptr),
34       userChangeListener_(nullptr),
35       cloudSyncer_(nullptr)
36 {}
37 
~SyncAbleKvDB()38 SyncAbleKvDB::~SyncAbleKvDB()
39 {
40     if (notifyChain_ != nullptr) {
41         (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
42         KillAndDecObjRef(notifyChain_);
43         notifyChain_ = nullptr;
44     }
45     if (userChangeListener_ != nullptr) {
46         userChangeListener_->Drop(true);
47         userChangeListener_ = nullptr;
48     }
49     std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
50     KillAndDecObjRef(cloudSyncer_);
51     cloudSyncer_ = nullptr;
52 }
53 
DelConnection(GenericKvDBConnection * connection)54 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
55 {
56     auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
57     if (realConnection != nullptr) {
58         KillAndDecObjRef(realConnection);
59         realConnection = nullptr;
60     }
61 }
62 
TriggerSync(int notifyEvent)63 void SyncAbleKvDB::TriggerSync(int notifyEvent)
64 {
65     if (!started_) {
66         StartSyncer();
67     }
68     if (started_) {
69         syncer_.LocalDataChanged(notifyEvent);
70     }
71 }
72 
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)73 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
74 {
75     SyncAbleKvDB::TriggerSync(notifyEvent);
76 
77     GenericKvDB::CommitNotify(notifyEvent, data);
78 }
79 
Close()80 void SyncAbleKvDB::Close()
81 {
82     StopSyncer(true);
83 }
84 
85 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)86 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
87 {
88     if (!started_) {
89         int errCode = StartSyncer();
90         if (!started_) {
91             return errCode;
92         }
93     }
94     return syncer_.Sync(parma, connectionId);
95 }
96 
EnableAutoSync(bool enable)97 void SyncAbleKvDB::EnableAutoSync(bool enable)
98 {
99     if (NeedStartSyncer()) {
100         StartSyncer();
101     }
102     return syncer_.EnableAutoSync(enable);
103 }
104 
WakeUpSyncer()105 void SyncAbleKvDB::WakeUpSyncer()
106 {
107     if (NeedStartSyncer()) {
108         StartSyncer();
109     }
110 }
111 
112 // Stop a sync action in progress.
StopSync(uint64_t connectionId)113 void SyncAbleKvDB::StopSync(uint64_t connectionId)
114 {
115     if (started_) {
116         syncer_.StopSync(connectionId);
117     }
118 }
119 
SetSyncModuleActive()120 void SyncAbleKvDB::SetSyncModuleActive()
121 {
122     if (isSyncModuleActiveCheck_) {
123         return;
124     }
125     IKvDBSyncInterface *syncInterface = GetSyncInterface();
126     if (syncInterface == nullptr) {
127         LOGD("KvDB got null sync interface.");
128         return;
129     }
130     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
131         false);
132     if (!isSyncDualTupleMode) {
133         isSyncNeedActive_ = true;
134         isSyncModuleActiveCheck_ = true;
135         return;
136     }
137     isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
138     if (!isSyncNeedActive_) {
139         LOGI("syncer no need to active");
140     }
141     isSyncModuleActiveCheck_ = true;
142 }
143 
GetSyncModuleActive()144 bool SyncAbleKvDB::GetSyncModuleActive()
145 {
146     return isSyncNeedActive_;
147 }
148 
ReSetSyncModuleActive()149 void SyncAbleKvDB::ReSetSyncModuleActive()
150 {
151     isSyncModuleActiveCheck_ = false;
152     isSyncNeedActive_ = true;
153 }
154 
155 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)156 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
157 {
158     StartCloudSyncer();
159     int errCode = E_OK;
160     {
161         std::unique_lock<std::mutex> lock(syncerOperateLock_);
162         errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
163         closed_ = false;
164     }
165     UserChangeHandle();
166     return errCode;
167 }
168 
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)169 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
170 {
171     IKvDBSyncInterface *syncInterface = GetSyncInterface();
172     if (syncInterface == nullptr) {
173         LOGD("KvDB got null sync interface.");
174         return -E_INVALID_ARGS;
175     }
176     if (!isCheckSyncActive) {
177         SetSyncModuleActive();
178         isNeedActive = GetSyncModuleActive();
179     }
180     int errCode = syncer_.Initialize(syncInterface, isNeedActive);
181     if (errCode == E_OK) {
182         started_ = true;
183     }
184     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
185         false);
186     std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
187     if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
188         // active to non_active
189         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
190             [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
191         LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener", label.c_str());
192     } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
193         EventType event = isNeedActive ?
194             UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
195         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
196             [this](void *) { UserChangeHandle(); }, event);
197         LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener event=%d", label.c_str(), event);
198     }
199     return errCode;
200 }
201 
202 // Stop syncer
StopSyncer(bool isClosedOperation,bool isStopTaskOnly)203 void SyncAbleKvDB::StopSyncer(bool isClosedOperation, bool isStopTaskOnly)
204 {
205     {
206         std::unique_lock<std::mutex> lock(cloudSyncerLock_);
207         if (cloudSyncer_ != nullptr) {
208             if (isStopTaskOnly) {
209                 cloudSyncer_->StopAllTasks();
210             } else if (isClosedOperation) {
211                 cloudSyncer_->Close();
212                 RefObject::KillAndDecObjRef(cloudSyncer_);
213                 cloudSyncer_ = nullptr;
214             }
215         }
216     }
217     NotificationChain::Listener *userChangeListener = nullptr;
218     {
219         std::unique_lock<std::mutex> lock(syncerOperateLock_);
220         StopSyncerWithNoLock(isClosedOperation);
221         userChangeListener = userChangeListener_;
222         userChangeListener_ = nullptr;
223     }
224     if (userChangeListener != nullptr) {
225         userChangeListener->Drop(true);
226         userChangeListener = nullptr;
227     }
228 }
229 
StopSyncerWithNoLock(bool isClosedOperation)230 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
231 {
232     if (!isClosedOperation && userChangeListener_ != nullptr) {
233         std::unique_lock<std::mutex> lock(cloudSyncerLock_);
234         if (cloudSyncer_ != nullptr) {
235             cloudSyncer_->StopAllTasks();
236         }
237     }
238     ReSetSyncModuleActive();
239     syncer_.Close(isClosedOperation);
240     if (started_) {
241         started_ = false;
242     }
243     closed_ = isClosedOperation;
244     if (!isClosedOperation && userChangeListener_ != nullptr) {
245         userChangeListener_->Drop(false);
246         userChangeListener_ = nullptr;
247     }
248 }
249 
UserChangeHandle()250 void SyncAbleKvDB::UserChangeHandle()
251 {
252     bool isNeedChange;
253     bool isNeedActive = true;
254     IKvDBSyncInterface *syncInterface = GetSyncInterface();
255     if (syncInterface == nullptr) {
256         LOGD("KvDB got null sync interface.");
257         return;
258     }
259     bool isSyncDualTupleMode = syncInterface->GetDbProperties().
260         GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
261     if (!isSyncDualTupleMode) {
262         LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
263         return;
264     }
265     std::unique_lock<std::mutex> lock(syncerOperateLock_);
266     if (closed_) {
267         LOGI("kvDB is already closed");
268         return;
269     }
270     isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
271     isNeedChange = (isNeedActive != isSyncNeedActive_);
272     // non_active to active or active to non_active
273     if (isNeedChange) {
274         StopSyncerWithNoLock(); // will drop userChangeListener
275         isSyncModuleActiveCheck_ = true;
276         isSyncNeedActive_ = isNeedActive;
277         StartSyncerWithNoLock(true, isNeedActive);
278     }
279 }
280 
ChangeUserListener()281 void SyncAbleKvDB::ChangeUserListener()
282 {
283     // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
284     if (userChangeListener_ != nullptr) {
285         userChangeListener_->Drop(false);
286         userChangeListener_ = nullptr;
287     }
288     if (userChangeListener_ == nullptr) {
289         userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
290             [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
291         IKvDBSyncInterface *syncInterface = GetSyncInterface();
292         std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
293         LOGI("[KVDB] [ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
294     }
295 }
296 
GetTimestampFromDB()297 uint64_t SyncAbleKvDB::GetTimestampFromDB()
298 {
299     return 0; //default is 0
300 }
301 
302 // Get The current virtual timestamp
GetTimestamp(bool needStartSync)303 uint64_t SyncAbleKvDB::GetTimestamp(bool needStartSync)
304 {
305     if (NeedStartSyncer()) {
306         if (needStartSync) {
307             StartSyncer();
308         } else {
309             // if syncer not start, get offset time from database
310             return GetTimestampFromDB();
311         }
312     }
313     return syncer_.GetTimestamp();
314 }
315 
316 // Get the dataItem's append length
GetAppendedLen() const317 uint32_t SyncAbleKvDB::GetAppendedLen() const
318 {
319     return Parcel::GetAppendedLen();
320 }
321 
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)322 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
323 {
324     if (NeedStartSyncer()) {
325         int errCode = StartSyncer();
326         if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
327             return errCode;
328         }
329     }
330     return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
331 }
332 
GetQueuedSyncSize(int * queuedSyncSize) const333 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
334 {
335     return syncer_.GetQueuedSyncSize(queuedSyncSize);
336 }
337 
SetQueuedSyncLimit(const int * queuedSyncLimit)338 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
339 {
340     return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
341 }
342 
GetQueuedSyncLimit(int * queuedSyncLimit) const343 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
344 {
345     return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
346 }
347 
DisableManualSync(void)348 int SyncAbleKvDB::DisableManualSync(void)
349 {
350     return syncer_.DisableManualSync();
351 }
352 
EnableManualSync(void)353 int SyncAbleKvDB::EnableManualSync(void)
354 {
355     return syncer_.EnableManualSync();
356 }
357 
GetLocalIdentity(std::string & outTarget) const358 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
359 {
360     return syncer_.GetLocalIdentity(outTarget);
361 }
362 
SetStaleDataWipePolicy(WipePolicy policy)363 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
364 {
365     return syncer_.SetStaleDataWipePolicy(policy);
366 }
367 
RegisterEventType(EventType type)368 int SyncAbleKvDB::RegisterEventType(EventType type)
369 {
370     if (notifyChain_ == nullptr) {
371         notifyChain_ = new (std::nothrow) NotificationChain;
372         if (notifyChain_ == nullptr) {
373             return -E_OUT_OF_MEMORY;
374         }
375     }
376 
377     int errCode = notifyChain_->RegisterEventType(type);
378     if (errCode == -E_ALREADY_REGISTER) {
379         return E_OK;
380     }
381     if (errCode != E_OK) {
382         LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
383         KillAndDecObjRef(notifyChain_);
384         notifyChain_ = nullptr;
385     }
386     return errCode;
387 }
388 
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)389 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier &notifier,
390     int &errCode)
391 {
392     std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
393     errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
394     if (errCode != E_OK) {
395         return nullptr;
396     }
397 
398     auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
399         [notifier](void *arg) {
400             if (arg == nullptr) {
401                 LOGE("PragmaRemotePushNotify is null.");
402                 return;
403             }
404             notifier(*static_cast<RemotePushNotifyInfo *>(arg));
405         }, nullptr, errCode);
406     if (errCode != E_OK) {
407         LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
408     }
409     return listener;
410 }
411 
NotifyRemotePushFinishedInner(const std::string & targetId) const412 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
413 {
414     NotificationChain *notify = nullptr;
415     {
416         std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
417         if (notifyChain_ == nullptr) {
418             return;
419         }
420         notify = notifyChain_;
421         RefObject::IncObjRef(notify);
422     }
423     RemotePushNotifyInfo info;
424     info.deviceId = targetId;
425     notify->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
426     RefObject::DecObjRef(notify);
427 }
428 
SetSyncRetry(bool isRetry)429 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
430 {
431     IKvDBSyncInterface *syncInterface = GetSyncInterface();
432     if (syncInterface == nullptr) {
433         LOGD("KvDB got null sync interface.");
434         return -E_INVALID_DB;
435     }
436     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
437     if (localOnly) {
438         return -E_NOT_SUPPORT;
439     }
440     if (NeedStartSyncer()) {
441         StartSyncer();
442     }
443     return syncer_.SetSyncRetry(isRetry);
444 }
445 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)446 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
447 {
448     if (NeedStartSyncer()) {
449         StartSyncer();
450     }
451     return syncer_.SetEqualIdentifier(identifier, targets);
452 }
453 
Dump(int fd)454 void SyncAbleKvDB::Dump(int fd)
455 {
456     SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
457     DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
458         basicInfo.isAutoSync);
459     if (basicInfo.isSyncActive) {
460         DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
461         syncer_.Dump(fd);
462     }
463 }
464 
GetSyncDataSize(const std::string & device,size_t & size) const465 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
466 {
467     return syncer_.GetSyncDataSize(device, size);
468 }
469 
NeedStartSyncer() const470 bool SyncAbleKvDB::NeedStartSyncer() const
471 {
472     if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
473         return false;
474     }
475     // don't start when check callback got not active
476     // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
477     return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
478 }
479 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)480 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
481 {
482     if (!NeedStartSyncer()) {
483         return syncer_.GetHashDeviceId(clientId, hashDevId);
484     }
485     int errCode = StartSyncer();
486     if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
487         return errCode;
488     }
489     return syncer_.GetHashDeviceId(clientId, hashDevId);
490 }
491 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)492 int SyncAbleKvDB::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
493 {
494     if (NeedStartSyncer()) {
495         StartSyncer();
496     }
497     return syncer_.GetWatermarkInfo(device, info);
498 }
499 
UpgradeSchemaVerInMeta()500 int SyncAbleKvDB::UpgradeSchemaVerInMeta()
501 {
502     return syncer_.UpgradeSchemaVerInMeta();
503 }
504 
ResetSyncStatus()505 void SyncAbleKvDB::ResetSyncStatus()
506 {
507     syncer_.ResetSyncStatus();
508 }
509 
GetICloudSyncInterface() const510 ICloudSyncStorageInterface *SyncAbleKvDB::GetICloudSyncInterface() const
511 {
512     return nullptr;
513 }
514 
StartCloudSyncer()515 void SyncAbleKvDB::StartCloudSyncer()
516 {
517     auto cloudStorage = GetICloudSyncInterface();
518     if (cloudStorage == nullptr) {
519         return;
520     }
521     int conflictType = MyProp().GetIntProp(KvDBProperties::CONFLICT_RESOLVE_POLICY,
522         static_cast<int>(SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN));
523     {
524         std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
525         if (cloudSyncer_ != nullptr) {
526             return;
527         }
528         cloudSyncer_ = new (std::nothrow) CloudSyncer(
529             StorageProxy::GetCloudDb(cloudStorage), true, static_cast<SingleVerConflictResolvePolicy>(conflictType));
530         if (cloudSyncer_ == nullptr) {
531             LOGW("[SyncAbleKvDB][StartCloudSyncer] start cloud syncer and cloud syncer was not initialized");
532         }
533     }
534 }
535 
GetLocalTimeOffset()536 TimeOffset SyncAbleKvDB::GetLocalTimeOffset()
537 {
538     if (NeedStartSyncer()) {
539         StartSyncer();
540     }
541     return syncer_.GetLocalTimeOffset();
542 }
543 
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)544 void SyncAbleKvDB::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
545     CloudSyncer::CloudTaskInfo &info)
546 {
547     QuerySyncObject query(Query::Select());
548     query.SetTableName(CloudDbConstant::CLOUD_KV_TABLE_NAME);
549     info.queryList.push_back(query);
550     info.table.push_back(CloudDbConstant::CLOUD_KV_TABLE_NAME);
551     info.callback = onProcess;
552     info.devices = option.devices;
553     info.mode = option.mode;
554     std::set<std::string> userSet(option.users.begin(), option.users.end());
555     info.users = std::vector<std::string>(userSet.begin(), userSet.end());
556     info.lockAction = option.lockAction;
557     info.storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
558     info.merge = option.merge;
559     info.prepareTraceId = option.prepareTraceId;
560 }
561 
CheckSyncOption(const CloudSyncOption & option,const CloudSyncer & syncer)562 int SyncAbleKvDB::CheckSyncOption(const CloudSyncOption &option, const CloudSyncer &syncer)
563 {
564     if (option.users.empty()) {
565         LOGE("[SyncAbleKvDB][Sync] no user in sync option");
566         return -E_INVALID_ARGS;
567     }
568     const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs = syncer.GetCloudDB();
569     if (cloudDBs.empty()) {
570         LOGE("[SyncAbleKvDB][Sync] not set cloud db");
571         return -E_CLOUD_ERROR;
572     }
573     auto schemas = GetDataBaseSchemas();
574     if (schemas.empty()) {
575         LOGE("[SyncAbleKvDB][Sync] not set cloud schema");
576         return -E_SCHEMA_MISMATCH;
577     }
578     for (const auto &user : option.users) {
579         if (cloudDBs.find(user) == cloudDBs.end()) {
580             LOGE("[SyncAbleKvDB][Sync] cloud db with invalid user: %s", user.c_str());
581             return -E_INVALID_ARGS;
582         }
583         if (schemas.find(user) == schemas.end()) {
584             LOGE("[SyncAbleKvDB][Sync] cloud schema with invalid user: %s", user.c_str());
585             return -E_SCHEMA_MISMATCH;
586         }
587     }
588     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
589         LOGE("[SyncAbleKvDB][Sync] invalid wait time of sync option: %lld", option.waitTime);
590         return -E_INVALID_ARGS;
591     }
592     if (!CheckSchemaSupportForCloudSync()) {
593         return -E_NOT_SUPPORT;
594     }
595     return E_OK;
596 }
597 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)598 int SyncAbleKvDB::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
599 {
600     auto syncer = GetAndIncCloudSyncer();
601     if (syncer == nullptr) {
602         LOGE("[SyncAbleKvDB][Sync] cloud syncer was not initialized");
603         return -E_INVALID_DB;
604     }
605     int errCode = CheckSyncOption(option, *syncer);
606     if (errCode != E_OK) {
607         RefObject::DecObjRef(syncer);
608         return errCode;
609     }
610     CloudSyncer::CloudTaskInfo info;
611     FillSyncInfo(option, onProcess, info);
612     errCode = syncer->Sync(info);
613     RefObject::DecObjRef(syncer);
614     return errCode;
615 }
616 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)617 int SyncAbleKvDB::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
618 {
619     auto syncer = GetAndIncCloudSyncer();
620     if (syncer == nullptr) {
621         LOGE("[SyncAbleKvDB][SetCloudDB] cloud syncer was not initialized");
622         return -E_INVALID_DB;
623     }
624     int errCode = syncer->SetCloudDB(cloudDBs);
625     RefObject::DecObjRef(syncer);
626     return errCode;
627 }
628 
CleanAllWaterMark()629 int SyncAbleKvDB::CleanAllWaterMark()
630 {
631     auto syncer = GetAndIncCloudSyncer();
632     if (syncer == nullptr) {
633         LOGE("[SyncAbleKvDB][CleanAllWaterMark] cloud syncer was not initialized");
634         return -E_INVALID_DB;
635     }
636     syncer->CleanAllWaterMark();
637     RefObject::DecObjRef(syncer);
638     return E_OK;
639 }
640 
GetTaskCount()641 int32_t SyncAbleKvDB::GetTaskCount()
642 {
643     int32_t taskCount = 0;
644     auto cloudSyncer = GetAndIncCloudSyncer();
645     if (cloudSyncer != nullptr) {
646         taskCount += cloudSyncer->GetCloudSyncTaskCount();
647         RefObject::DecObjRef(cloudSyncer);
648     }
649     if (NeedStartSyncer()) {
650         return taskCount;
651     }
652     taskCount += syncer_.GetTaskCount();
653     return taskCount;
654 }
655 
GetAndIncCloudSyncer()656 CloudSyncer *SyncAbleKvDB::GetAndIncCloudSyncer()
657 {
658     std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
659     if (cloudSyncer_ == nullptr) {
660         return nullptr;
661     }
662     RefObject::IncObjRef(cloudSyncer_);
663     return cloudSyncer_;
664 }
665 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)666 void SyncAbleKvDB::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
667 {
668     auto cloudSyncer = GetAndIncCloudSyncer();
669     if (cloudSyncer == nullptr) {
670         LOGE("[SyncAbleKvDB][SetGenCloudVersionCallback] cloud syncer was not initialized");
671         return;
672     }
673     cloudSyncer->SetGenCloudVersionCallback(callback);
674     RefObject::DecObjRef(cloudSyncer);
675 }
676 
GetDataBaseSchemas()677 std::map<std::string, DataBaseSchema> SyncAbleKvDB::GetDataBaseSchemas()
678 {
679     return {};
680 }
681 
CheckSchemaSupportForCloudSync() const682 bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const
683 {
684     return true; // default is valid
685 }
686 }
687