1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb.h"
17
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29 : started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 notifyChain_(nullptr),
34 userChangeListener_(nullptr),
35 cloudSyncer_(nullptr)
36 {}
37
~SyncAbleKvDB()38 SyncAbleKvDB::~SyncAbleKvDB()
39 {
40 if (notifyChain_ != nullptr) {
41 (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
42 KillAndDecObjRef(notifyChain_);
43 notifyChain_ = nullptr;
44 }
45 if (userChangeListener_ != nullptr) {
46 userChangeListener_->Drop(true);
47 userChangeListener_ = nullptr;
48 }
49 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
50 KillAndDecObjRef(cloudSyncer_);
51 cloudSyncer_ = nullptr;
52 }
53
DelConnection(GenericKvDBConnection * connection)54 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
55 {
56 auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
57 if (realConnection != nullptr) {
58 KillAndDecObjRef(realConnection);
59 realConnection = nullptr;
60 }
61 }
62
TriggerSync(int notifyEvent)63 void SyncAbleKvDB::TriggerSync(int notifyEvent)
64 {
65 if (!started_) {
66 StartSyncer();
67 }
68 if (started_) {
69 syncer_.LocalDataChanged(notifyEvent);
70 }
71 }
72
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)73 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
74 {
75 SyncAbleKvDB::TriggerSync(notifyEvent);
76
77 GenericKvDB::CommitNotify(notifyEvent, data);
78 }
79
Close()80 void SyncAbleKvDB::Close()
81 {
82 StopSyncer(true);
83 }
84
85 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)86 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
87 {
88 if (!started_) {
89 int errCode = StartSyncer();
90 if (!started_) {
91 return errCode;
92 }
93 }
94 return syncer_.Sync(parma, connectionId);
95 }
96
EnableAutoSync(bool enable)97 void SyncAbleKvDB::EnableAutoSync(bool enable)
98 {
99 if (NeedStartSyncer()) {
100 StartSyncer();
101 }
102 return syncer_.EnableAutoSync(enable);
103 }
104
WakeUpSyncer()105 void SyncAbleKvDB::WakeUpSyncer()
106 {
107 if (NeedStartSyncer()) {
108 StartSyncer();
109 }
110 }
111
112 // Stop a sync action in progress.
StopSync(uint64_t connectionId)113 void SyncAbleKvDB::StopSync(uint64_t connectionId)
114 {
115 if (started_) {
116 syncer_.StopSync(connectionId);
117 }
118 }
119
SetSyncModuleActive()120 void SyncAbleKvDB::SetSyncModuleActive()
121 {
122 if (isSyncModuleActiveCheck_) {
123 return;
124 }
125 IKvDBSyncInterface *syncInterface = GetSyncInterface();
126 if (syncInterface == nullptr) {
127 LOGD("KvDB got null sync interface.");
128 return;
129 }
130 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
131 false);
132 if (!isSyncDualTupleMode) {
133 isSyncNeedActive_ = true;
134 isSyncModuleActiveCheck_ = true;
135 return;
136 }
137 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
138 if (!isSyncNeedActive_) {
139 LOGI("syncer no need to active");
140 }
141 isSyncModuleActiveCheck_ = true;
142 }
143
GetSyncModuleActive()144 bool SyncAbleKvDB::GetSyncModuleActive()
145 {
146 return isSyncNeedActive_;
147 }
148
ReSetSyncModuleActive()149 void SyncAbleKvDB::ReSetSyncModuleActive()
150 {
151 isSyncModuleActiveCheck_ = false;
152 isSyncNeedActive_ = true;
153 }
154
155 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)156 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
157 {
158 StartCloudSyncer();
159 int errCode = E_OK;
160 {
161 std::unique_lock<std::mutex> lock(syncerOperateLock_);
162 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
163 closed_ = false;
164 }
165 UserChangeHandle();
166 return errCode;
167 }
168
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)169 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
170 {
171 IKvDBSyncInterface *syncInterface = GetSyncInterface();
172 if (syncInterface == nullptr) {
173 LOGD("KvDB got null sync interface.");
174 return -E_INVALID_ARGS;
175 }
176 if (!isCheckSyncActive) {
177 SetSyncModuleActive();
178 isNeedActive = GetSyncModuleActive();
179 }
180 int errCode = syncer_.Initialize(syncInterface, isNeedActive);
181 if (errCode == E_OK) {
182 started_ = true;
183 }
184 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
185 false);
186 std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
187 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
188 // active to non_active
189 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
190 [this](void *) { ChangeUserListener(); }, UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
191 LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener", label.c_str());
192 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
193 EventType event = isNeedActive ?
194 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
195 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
196 [this](void *) { UserChangeHandle(); }, event);
197 LOGI("[KVDB] [StartSyncerWithNoLock] [%.3s] After RegisterUserChangedListener event=%d", label.c_str(), event);
198 }
199 return errCode;
200 }
201
202 // Stop syncer
StopSyncer(bool isClosedOperation,bool isStopTaskOnly)203 void SyncAbleKvDB::StopSyncer(bool isClosedOperation, bool isStopTaskOnly)
204 {
205 {
206 std::unique_lock<std::mutex> lock(cloudSyncerLock_);
207 if (cloudSyncer_ != nullptr) {
208 if (isStopTaskOnly) {
209 cloudSyncer_->StopAllTasks();
210 } else if (isClosedOperation) {
211 cloudSyncer_->Close();
212 RefObject::KillAndDecObjRef(cloudSyncer_);
213 cloudSyncer_ = nullptr;
214 }
215 }
216 }
217 NotificationChain::Listener *userChangeListener = nullptr;
218 {
219 std::unique_lock<std::mutex> lock(syncerOperateLock_);
220 StopSyncerWithNoLock(isClosedOperation);
221 userChangeListener = userChangeListener_;
222 userChangeListener_ = nullptr;
223 }
224 if (userChangeListener != nullptr) {
225 userChangeListener->Drop(true);
226 userChangeListener = nullptr;
227 }
228 }
229
StopSyncerWithNoLock(bool isClosedOperation)230 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
231 {
232 if (!isClosedOperation && userChangeListener_ != nullptr) {
233 std::unique_lock<std::mutex> lock(cloudSyncerLock_);
234 if (cloudSyncer_ != nullptr) {
235 cloudSyncer_->StopAllTasks();
236 }
237 }
238 ReSetSyncModuleActive();
239 syncer_.Close(isClosedOperation);
240 if (started_) {
241 started_ = false;
242 }
243 closed_ = isClosedOperation;
244 if (!isClosedOperation && userChangeListener_ != nullptr) {
245 userChangeListener_->Drop(false);
246 userChangeListener_ = nullptr;
247 }
248 }
249
UserChangeHandle()250 void SyncAbleKvDB::UserChangeHandle()
251 {
252 bool isNeedChange;
253 bool isNeedActive = true;
254 IKvDBSyncInterface *syncInterface = GetSyncInterface();
255 if (syncInterface == nullptr) {
256 LOGD("KvDB got null sync interface.");
257 return;
258 }
259 bool isSyncDualTupleMode = syncInterface->GetDbProperties().
260 GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false);
261 if (!isSyncDualTupleMode) {
262 LOGD("[SyncAbleKvDB] no use syncDualTupleMode, abort userChange");
263 return;
264 }
265 std::unique_lock<std::mutex> lock(syncerOperateLock_);
266 if (closed_) {
267 LOGI("kvDB is already closed");
268 return;
269 }
270 isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
271 isNeedChange = (isNeedActive != isSyncNeedActive_);
272 // non_active to active or active to non_active
273 if (isNeedChange) {
274 StopSyncerWithNoLock(); // will drop userChangeListener
275 isSyncModuleActiveCheck_ = true;
276 isSyncNeedActive_ = isNeedActive;
277 StartSyncerWithNoLock(true, isNeedActive);
278 }
279 }
280
ChangeUserListener()281 void SyncAbleKvDB::ChangeUserListener()
282 {
283 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
284 if (userChangeListener_ != nullptr) {
285 userChangeListener_->Drop(false);
286 userChangeListener_ = nullptr;
287 }
288 if (userChangeListener_ == nullptr) {
289 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
290 [this](void *) { UserChangeHandle(); }, UserChangeMonitor::USER_NON_ACTIVE_EVENT);
291 IKvDBSyncInterface *syncInterface = GetSyncInterface();
292 std::string label = syncInterface->GetDbProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
293 LOGI("[KVDB] [ChangeUserListener] [%.3s] After RegisterUserChangedListener", label.c_str());
294 }
295 }
296
GetTimestampFromDB()297 uint64_t SyncAbleKvDB::GetTimestampFromDB()
298 {
299 return 0; //default is 0
300 }
301
302 // Get The current virtual timestamp
GetTimestamp(bool needStartSync)303 uint64_t SyncAbleKvDB::GetTimestamp(bool needStartSync)
304 {
305 if (NeedStartSyncer()) {
306 if (needStartSync) {
307 StartSyncer();
308 } else {
309 // if syncer not start, get offset time from database
310 return GetTimestampFromDB();
311 }
312 }
313 return syncer_.GetTimestamp();
314 }
315
316 // Get the dataItem's append length
GetAppendedLen() const317 uint32_t SyncAbleKvDB::GetAppendedLen() const
318 {
319 return Parcel::GetAppendedLen();
320 }
321
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)322 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
323 {
324 if (NeedStartSyncer()) {
325 int errCode = StartSyncer();
326 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
327 return errCode;
328 }
329 }
330 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
331 }
332
GetQueuedSyncSize(int * queuedSyncSize) const333 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
334 {
335 return syncer_.GetQueuedSyncSize(queuedSyncSize);
336 }
337
SetQueuedSyncLimit(const int * queuedSyncLimit)338 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
339 {
340 return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
341 }
342
GetQueuedSyncLimit(int * queuedSyncLimit) const343 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
344 {
345 return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
346 }
347
DisableManualSync(void)348 int SyncAbleKvDB::DisableManualSync(void)
349 {
350 return syncer_.DisableManualSync();
351 }
352
EnableManualSync(void)353 int SyncAbleKvDB::EnableManualSync(void)
354 {
355 return syncer_.EnableManualSync();
356 }
357
GetLocalIdentity(std::string & outTarget) const358 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget) const
359 {
360 return syncer_.GetLocalIdentity(outTarget);
361 }
362
SetStaleDataWipePolicy(WipePolicy policy)363 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
364 {
365 return syncer_.SetStaleDataWipePolicy(policy);
366 }
367
RegisterEventType(EventType type)368 int SyncAbleKvDB::RegisterEventType(EventType type)
369 {
370 if (notifyChain_ == nullptr) {
371 notifyChain_ = new (std::nothrow) NotificationChain;
372 if (notifyChain_ == nullptr) {
373 return -E_OUT_OF_MEMORY;
374 }
375 }
376
377 int errCode = notifyChain_->RegisterEventType(type);
378 if (errCode == -E_ALREADY_REGISTER) {
379 return E_OK;
380 }
381 if (errCode != E_OK) {
382 LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
383 KillAndDecObjRef(notifyChain_);
384 notifyChain_ = nullptr;
385 }
386 return errCode;
387 }
388
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)389 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier ¬ifier,
390 int &errCode)
391 {
392 std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
393 errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
394 if (errCode != E_OK) {
395 return nullptr;
396 }
397
398 auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
399 [notifier](void *arg) {
400 if (arg == nullptr) {
401 LOGE("PragmaRemotePushNotify is null.");
402 return;
403 }
404 notifier(*static_cast<RemotePushNotifyInfo *>(arg));
405 }, nullptr, errCode);
406 if (errCode != E_OK) {
407 LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
408 }
409 return listener;
410 }
411
NotifyRemotePushFinishedInner(const std::string & targetId) const412 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
413 {
414 NotificationChain *notify = nullptr;
415 {
416 std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
417 if (notifyChain_ == nullptr) {
418 return;
419 }
420 notify = notifyChain_;
421 RefObject::IncObjRef(notify);
422 }
423 RemotePushNotifyInfo info;
424 info.deviceId = targetId;
425 notify->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
426 RefObject::DecObjRef(notify);
427 }
428
SetSyncRetry(bool isRetry)429 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
430 {
431 IKvDBSyncInterface *syncInterface = GetSyncInterface();
432 if (syncInterface == nullptr) {
433 LOGD("KvDB got null sync interface.");
434 return -E_INVALID_DB;
435 }
436 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
437 if (localOnly) {
438 return -E_NOT_SUPPORT;
439 }
440 if (NeedStartSyncer()) {
441 StartSyncer();
442 }
443 return syncer_.SetSyncRetry(isRetry);
444 }
445
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)446 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
447 {
448 if (NeedStartSyncer()) {
449 StartSyncer();
450 }
451 return syncer_.SetEqualIdentifier(identifier, targets);
452 }
453
Dump(int fd)454 void SyncAbleKvDB::Dump(int fd)
455 {
456 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
457 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
458 basicInfo.isAutoSync);
459 if (basicInfo.isSyncActive) {
460 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
461 syncer_.Dump(fd);
462 }
463 }
464
GetSyncDataSize(const std::string & device,size_t & size) const465 int SyncAbleKvDB::GetSyncDataSize(const std::string &device, size_t &size) const
466 {
467 return syncer_.GetSyncDataSize(device, size);
468 }
469
NeedStartSyncer() const470 bool SyncAbleKvDB::NeedStartSyncer() const
471 {
472 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
473 return false;
474 }
475 // don't start when check callback got not active
476 // equivalent to !(!isSyncNeedActive_ && isSyncModuleActiveCheck_)
477 return !started_ && (isSyncNeedActive_ || !isSyncModuleActiveCheck_);
478 }
479
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)480 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
481 {
482 if (!NeedStartSyncer()) {
483 return syncer_.GetHashDeviceId(clientId, hashDevId);
484 }
485 int errCode = StartSyncer();
486 if (errCode != E_OK && errCode != -E_NO_NEED_ACTIVE) {
487 return errCode;
488 }
489 return syncer_.GetHashDeviceId(clientId, hashDevId);
490 }
491
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)492 int SyncAbleKvDB::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
493 {
494 if (NeedStartSyncer()) {
495 StartSyncer();
496 }
497 return syncer_.GetWatermarkInfo(device, info);
498 }
499
UpgradeSchemaVerInMeta()500 int SyncAbleKvDB::UpgradeSchemaVerInMeta()
501 {
502 return syncer_.UpgradeSchemaVerInMeta();
503 }
504
ResetSyncStatus()505 void SyncAbleKvDB::ResetSyncStatus()
506 {
507 syncer_.ResetSyncStatus();
508 }
509
GetICloudSyncInterface() const510 ICloudSyncStorageInterface *SyncAbleKvDB::GetICloudSyncInterface() const
511 {
512 return nullptr;
513 }
514
StartCloudSyncer()515 void SyncAbleKvDB::StartCloudSyncer()
516 {
517 auto cloudStorage = GetICloudSyncInterface();
518 if (cloudStorage == nullptr) {
519 return;
520 }
521 int conflictType = MyProp().GetIntProp(KvDBProperties::CONFLICT_RESOLVE_POLICY,
522 static_cast<int>(SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN));
523 {
524 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
525 if (cloudSyncer_ != nullptr) {
526 return;
527 }
528 cloudSyncer_ = new (std::nothrow) CloudSyncer(
529 StorageProxy::GetCloudDb(cloudStorage), true, static_cast<SingleVerConflictResolvePolicy>(conflictType));
530 if (cloudSyncer_ == nullptr) {
531 LOGW("[SyncAbleKvDB][StartCloudSyncer] start cloud syncer and cloud syncer was not initialized");
532 }
533 }
534 }
535
GetLocalTimeOffset()536 TimeOffset SyncAbleKvDB::GetLocalTimeOffset()
537 {
538 if (NeedStartSyncer()) {
539 StartSyncer();
540 }
541 return syncer_.GetLocalTimeOffset();
542 }
543
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)544 void SyncAbleKvDB::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
545 CloudSyncer::CloudTaskInfo &info)
546 {
547 QuerySyncObject query(Query::Select());
548 query.SetTableName(CloudDbConstant::CLOUD_KV_TABLE_NAME);
549 info.queryList.push_back(query);
550 info.table.push_back(CloudDbConstant::CLOUD_KV_TABLE_NAME);
551 info.callback = onProcess;
552 info.devices = option.devices;
553 info.mode = option.mode;
554 std::set<std::string> userSet(option.users.begin(), option.users.end());
555 info.users = std::vector<std::string>(userSet.begin(), userSet.end());
556 info.lockAction = option.lockAction;
557 info.storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
558 info.merge = option.merge;
559 info.prepareTraceId = option.prepareTraceId;
560 }
561
CheckSyncOption(const CloudSyncOption & option,const CloudSyncer & syncer)562 int SyncAbleKvDB::CheckSyncOption(const CloudSyncOption &option, const CloudSyncer &syncer)
563 {
564 if (option.users.empty()) {
565 LOGE("[SyncAbleKvDB][Sync] no user in sync option");
566 return -E_INVALID_ARGS;
567 }
568 const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs = syncer.GetCloudDB();
569 if (cloudDBs.empty()) {
570 LOGE("[SyncAbleKvDB][Sync] not set cloud db");
571 return -E_CLOUD_ERROR;
572 }
573 auto schemas = GetDataBaseSchemas();
574 if (schemas.empty()) {
575 LOGE("[SyncAbleKvDB][Sync] not set cloud schema");
576 return -E_SCHEMA_MISMATCH;
577 }
578 for (const auto &user : option.users) {
579 if (cloudDBs.find(user) == cloudDBs.end()) {
580 LOGE("[SyncAbleKvDB][Sync] cloud db with invalid user: %s", user.c_str());
581 return -E_INVALID_ARGS;
582 }
583 if (schemas.find(user) == schemas.end()) {
584 LOGE("[SyncAbleKvDB][Sync] cloud schema with invalid user: %s", user.c_str());
585 return -E_SCHEMA_MISMATCH;
586 }
587 }
588 if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
589 LOGE("[SyncAbleKvDB][Sync] invalid wait time of sync option: %lld", option.waitTime);
590 return -E_INVALID_ARGS;
591 }
592 if (!CheckSchemaSupportForCloudSync()) {
593 return -E_NOT_SUPPORT;
594 }
595 return E_OK;
596 }
597
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)598 int SyncAbleKvDB::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
599 {
600 auto syncer = GetAndIncCloudSyncer();
601 if (syncer == nullptr) {
602 LOGE("[SyncAbleKvDB][Sync] cloud syncer was not initialized");
603 return -E_INVALID_DB;
604 }
605 int errCode = CheckSyncOption(option, *syncer);
606 if (errCode != E_OK) {
607 RefObject::DecObjRef(syncer);
608 return errCode;
609 }
610 CloudSyncer::CloudTaskInfo info;
611 FillSyncInfo(option, onProcess, info);
612 errCode = syncer->Sync(info);
613 RefObject::DecObjRef(syncer);
614 return errCode;
615 }
616
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)617 int SyncAbleKvDB::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
618 {
619 auto syncer = GetAndIncCloudSyncer();
620 if (syncer == nullptr) {
621 LOGE("[SyncAbleKvDB][SetCloudDB] cloud syncer was not initialized");
622 return -E_INVALID_DB;
623 }
624 int errCode = syncer->SetCloudDB(cloudDBs);
625 RefObject::DecObjRef(syncer);
626 return errCode;
627 }
628
CleanAllWaterMark()629 int SyncAbleKvDB::CleanAllWaterMark()
630 {
631 auto syncer = GetAndIncCloudSyncer();
632 if (syncer == nullptr) {
633 LOGE("[SyncAbleKvDB][CleanAllWaterMark] cloud syncer was not initialized");
634 return -E_INVALID_DB;
635 }
636 syncer->CleanAllWaterMark();
637 RefObject::DecObjRef(syncer);
638 return E_OK;
639 }
640
GetTaskCount()641 int32_t SyncAbleKvDB::GetTaskCount()
642 {
643 int32_t taskCount = 0;
644 auto cloudSyncer = GetAndIncCloudSyncer();
645 if (cloudSyncer != nullptr) {
646 taskCount += cloudSyncer->GetCloudSyncTaskCount();
647 RefObject::DecObjRef(cloudSyncer);
648 }
649 if (NeedStartSyncer()) {
650 return taskCount;
651 }
652 taskCount += syncer_.GetTaskCount();
653 return taskCount;
654 }
655
GetAndIncCloudSyncer()656 CloudSyncer *SyncAbleKvDB::GetAndIncCloudSyncer()
657 {
658 std::lock_guard<std::mutex> autoLock(cloudSyncerLock_);
659 if (cloudSyncer_ == nullptr) {
660 return nullptr;
661 }
662 RefObject::IncObjRef(cloudSyncer_);
663 return cloudSyncer_;
664 }
665
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)666 void SyncAbleKvDB::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
667 {
668 auto cloudSyncer = GetAndIncCloudSyncer();
669 if (cloudSyncer == nullptr) {
670 LOGE("[SyncAbleKvDB][SetGenCloudVersionCallback] cloud syncer was not initialized");
671 return;
672 }
673 cloudSyncer->SetGenCloudVersionCallback(callback);
674 RefObject::DecObjRef(cloudSyncer);
675 }
676
GetDataBaseSchemas()677 std::map<std::string, DataBaseSchema> SyncAbleKvDB::GetDataBaseSchemas()
678 {
679 return {};
680 }
681
CheckSchemaSupportForCloudSync() const682 bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const
683 {
684 return true; // default is valid
685 }
686 }
687