1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "generic_syncer.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35 
36 namespace DistributedDB {
37 namespace {
38     constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45     : syncEngine_(nullptr),
46       syncInterface_(nullptr),
47       timeHelper_(nullptr),
48       metadata_(nullptr),
49       initialized_(false),
50       queuedManualSyncSize_(0),
51       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52       manualSyncEnable_(true),
53       closing_(false),
54       engineFinalize_(false),
55       timeChangeListenerFinalize_(true),
56       timeChangedListener_(nullptr)
57 {
58 }
59 
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62     LOGD("[GenericSyncer] ~GenericSyncer!");
63     if (syncEngine_ != nullptr) {
64         syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65         RefObject::KillAndDecObjRef(syncEngine_);
66         // waiting all thread exist
67         std::unique_lock<std::mutex> cvLock(engineMutex_);
68         bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69             [this]() { return engineFinalize_; });
70         if (!engineFinalize) {
71             LOGW("syncer finalize before engine finalize!");
72         }
73         syncEngine_ = nullptr;
74     }
75     ReleaseInnerResource();
76     std::lock_guard<std::mutex> lock(syncerLock_);
77     syncInterface_ = nullptr;
78 }
79 
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82     if (syncInterface == nullptr) {
83         LOGE("[Syncer] Init failed, the syncInterface is null!");
84         return -E_INVALID_ARGS;
85     }
86 
87     {
88         std::lock_guard<std::mutex> lock(syncerLock_);
89         if (initialized_) {
90             return E_OK;
91         }
92         if (closing_) {
93             LOGE("[Syncer] Syncer is closing, return!");
94             return -E_BUSY;
95         }
96         std::vector<uint8_t> label = syncInterface->GetIdentifier();
97         label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98 
99         int errCode = InitStorageResource(syncInterface);
100         if (errCode != E_OK) {
101             return errCode;
102         }
103         // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104         // It will be clear in destructor.
105         int errCodeTimeChangedListener = InitTimeChangedListener();
106         if (errCodeTimeChangedListener != E_OK) {
107             return -E_INTERNAL_ERROR;
108         }
109         errCode = CheckSyncActive(syncInterface, isNeedActive);
110         if (errCode != E_OK) {
111             return errCode;
112         }
113 
114         if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115             return -E_NOT_INIT;
116         }
117 
118         errCode = SyncModuleInit();
119         if (errCode != E_OK) {
120             LOGE("[Syncer] Sync ModuleInit ERR!");
121             return -E_INTERNAL_ERROR;
122         }
123 
124         errCode = InitSyncEngine(syncInterface);
125         if (errCode != E_OK) {
126             return errCode;
127         }
128         syncEngine_->SetEqualIdentifier();
129         initialized_ = true;
130     }
131 
132     // StartCommunicator may start an auto sync, this function can not in syncerLock_
133     syncEngine_->StartCommunicator();
134     if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135         ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136     }
137     return E_OK;
138 }
139 
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142     int errCode = CloseInner(isClosedOperation);
143     if (errCode != -E_BUSY && isClosedOperation) {
144         ReleaseInnerResource();
145     }
146     return errCode;
147 }
148 
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150     const std::function<void(const std::map<std::string, int> &)> &onComplete,
151     const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153     SyncParma param;
154     param.devices = devices;
155     param.mode = mode;
156     param.onComplete = onComplete;
157     param.onFinalize = onFinalize;
158     param.wait = wait;
159     return Sync(param);
160 }
161 
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma &param)
163 {
164     SyncParma syncParam;
165     syncParam.devices = param.devices;
166     syncParam.mode = param.mode;
167     syncParam.isQuerySync = param.isQuerySync;
168     syncParam.syncQuery = param.syncQuery;
169     return Sync(syncParam);
170 }
171 
Sync(const SyncParma & param)172 int GenericSyncer::Sync(const SyncParma &param)
173 {
174     return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176 
Sync(const SyncParma & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParma &param, uint64_t connectionId)
178 {
179     int errCode = SyncPreCheck(param);
180     if (errCode != E_OK) {
181         return errCode;
182     }
183     errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184     if (errCode != E_OK) {
185         return errCode;
186     }
187 
188     uint32_t syncId = GenerateSyncId();
189     errCode = PrepareSync(param, syncId, connectionId);
190     if (errCode != E_OK) {
191         LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192         return errCode;
193     }
194     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195     return E_OK;
196 }
197 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)198 int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
199 {
200     auto *operation =
201         new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
202     if (operation == nullptr) {
203         SubQueuedSyncSize();
204         return -E_OUT_OF_MEMORY;
205     }
206     ISyncEngine *engine = nullptr;
207     {
208         std::lock_guard<std::mutex> autoLock(syncerLock_);
209         PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
210         InitSyncOperation(operation, param);
211         LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
212             param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
213         engine = syncEngine_;
214         RefObject::IncObjRef(engine);
215     }
216     AddSyncOperation(engine, operation);
217     RefObject::DecObjRef(engine);
218     PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
219     if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
220         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
221         connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
222         syncIdMap_[static_cast<int>(syncId)] = connectionId;
223     }
224     if (operation->CheckIsAllFinished()) {
225         operation->Finished();
226         RefObject::KillAndDecObjRef(operation);
227     } else {
228         operation->WaitIfNeed();
229         RefObject::DecObjRef(operation);
230     }
231     return E_OK;
232 }
233 
RemoveSyncOperation(int syncId)234 int GenericSyncer::RemoveSyncOperation(int syncId)
235 {
236     SyncOperation *operation = nullptr;
237     std::unique_lock<std::mutex> lock(operationMapLock_);
238     auto iter = syncOperationMap_.find(syncId);
239     if (iter != syncOperationMap_.end()) {
240         LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
241         operation = iter->second;
242         syncOperationMap_.erase(syncId);
243         lock.unlock();
244         if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
245             SubQueuedSyncSize();
246         }
247         operation->NotifyIfNeed();
248         RefObject::KillAndDecObjRef(operation);
249         operation = nullptr;
250         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
251         if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
252             return E_OK;
253         }
254         uint64_t connectionId = syncIdMap_[syncId];
255         if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
256             connectionIdMap_[connectionId].remove(syncId);
257         }
258         syncIdMap_.erase(syncId);
259         return E_OK;
260     }
261     return -E_INVALID_ARGS;
262 }
263 
StopSync(uint64_t connectionId)264 int GenericSyncer::StopSync(uint64_t connectionId)
265 {
266     std::list<int> syncIdList;
267     {
268         std::lock_guard<std::mutex> lockGuard(syncIdLock_);
269         if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
270             return E_OK;
271         }
272         syncIdList = connectionIdMap_[connectionId];
273         connectionIdMap_.erase(connectionId);
274     }
275     for (auto syncId : syncIdList) {
276         RemoveSyncOperation(syncId);
277         if (syncEngine_ != nullptr) {
278             syncEngine_->AbortMachineIfNeed(syncId);
279         }
280     }
281     if (syncEngine_ != nullptr) {
282         syncEngine_->NotifyConnectionClosed(connectionId);
283     }
284     return E_OK;
285 }
286 
GetTimestamp()287 uint64_t GenericSyncer::GetTimestamp()
288 {
289     std::shared_ptr<TimeHelper> timeHelper = nullptr;
290     ISyncInterface *storage = nullptr;
291     {
292         std::lock_guard<std::mutex> lock(syncerLock_);
293         timeHelper = timeHelper_;
294         if (syncInterface_ != nullptr) {
295             storage = syncInterface_;
296             storage->IncRefCount();
297         }
298     }
299     if (storage == nullptr) {
300         return TimeHelper::GetSysCurrentTime();
301     }
302     if (timeHelper == nullptr) {
303         storage->DecRefCount();
304         return TimeHelper::GetSysCurrentTime();
305     }
306     uint64_t timestamp = timeHelper->GetTime();
307     storage->DecRefCount();
308     return timestamp;
309 }
310 
QueryAutoSync(const InternalSyncParma & param)311 void GenericSyncer::QueryAutoSync(const InternalSyncParma &param)
312 {
313     if (!initialized_) {
314         LOGW("[Syncer] Syncer has not Init");
315         return;
316     }
317     LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
318     ISyncInterface *syncInterface = nullptr;
319     ISyncEngine *engine = nullptr;
320     {
321         std::lock_guard<std::mutex> lock(syncerLock_);
322         if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
323             LOGW("[Syncer] Syncer has not Init");
324             return;
325         }
326         syncInterface = syncInterface_;
327         engine = syncEngine_;
328         syncInterface->IncRefCount();
329         RefObject::IncObjRef(engine);
330     }
331     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
332         int errCode = Sync(param);
333         if (errCode != E_OK) {
334             LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
335         }
336         RefObject::DecObjRef(engine);
337         syncInterface->DecRefCount();
338     });
339     if (retCode != E_OK) {
340         LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
341         RefObject::DecObjRef(engine);
342         syncInterface->DecRefCount();
343     }
344 }
345 
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)346 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
347 {
348     if (operation == nullptr) {
349         return;
350     }
351 
352     LOGD("[Syncer] AddSyncOperation.");
353     engine->AddSyncOperation(operation);
354 
355     if (operation->CheckIsAllFinished()) {
356         return;
357     }
358 
359     std::lock_guard<std::mutex> lock(operationMapLock_);
360     syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
361     // To make sure operation alive before WaitIfNeed out
362     RefObject::IncObjRef(operation);
363 }
364 
SyncOperationKillCallbackInner(int syncId)365 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
366 {
367     if (syncEngine_ != nullptr) {
368         LOGI("[Syncer] Operation on kill id = %d", syncId);
369         syncEngine_->RemoveSyncOperation(syncId);
370     }
371 }
372 
SyncOperationKillCallback(int syncId)373 void GenericSyncer::SyncOperationKillCallback(int syncId)
374 {
375     SyncOperationKillCallbackInner(syncId);
376 }
377 
InitMetaData(ISyncInterface * syncInterface)378 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
379 {
380     if (metadata_ != nullptr) {
381         return E_OK;
382     }
383 
384     metadata_ = std::make_shared<Metadata>();
385     if (metadata_ == nullptr) {
386         LOGE("[Syncer] metadata make shared failed");
387         return -E_OUT_OF_MEMORY;
388     }
389     int errCode = metadata_->Initialize(syncInterface);
390     if (errCode != E_OK) {
391         LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
392         metadata_ = nullptr;
393     }
394     syncInterface_ = syncInterface;
395     return errCode;
396 }
397 
InitTimeHelper(ISyncInterface * syncInterface)398 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
399 {
400     if (timeHelper_ != nullptr) {
401         return E_OK;
402     }
403 
404     timeHelper_ = std::make_shared<TimeHelper>();
405     if (timeHelper_ == nullptr) {
406         return -E_OUT_OF_MEMORY;
407     }
408     int errCode = timeHelper_->Initialize(syncInterface, metadata_);
409     if (errCode != E_OK) {
410         LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
411         timeHelper_ = nullptr;
412     }
413     return errCode;
414 }
415 
InitSyncEngine(ISyncInterface * syncInterface)416 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
417 {
418     if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
419         LOGI("[Syncer] syncEngine is active");
420         return E_OK;
421     }
422     int errCode = BuildSyncEngine();
423     if (errCode != E_OK) {
424         return errCode;
425     }
426     const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
427         RemoteDataChanged(device);
428     };
429     const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
430         RemoteDeviceOffline(device);
431     };
432     const std::function<void(const InternalSyncParma &param)> queryAutoSyncFunc =
433         [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
434     const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
435     errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
436     if (errCode == E_OK) {
437         syncInterface->IncRefCount();
438         label_ = syncEngine_->GetLabel();
439         return E_OK;
440     } else {
441         LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
442         RefObject::KillAndDecObjRef(syncEngine_);
443         syncEngine_ = nullptr;
444         return errCode;
445     }
446 }
447 
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)448 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
449 {
450     bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
451         false);
452     if (!isSyncDualTupleMode || isNeedActive) {
453         return E_OK;
454     }
455     LOGI("[Syncer] syncer no need to active");
456     int errCode = BuildSyncEngine();
457     if (errCode != E_OK) {
458         return errCode;
459     }
460     return -E_NO_NEED_ACTIVE;
461 }
462 
GenerateSyncId()463 uint32_t GenericSyncer::GenerateSyncId()
464 {
465     std::lock_guard<std::mutex> lock(syncIdLock_);
466     currentSyncId_++;
467     // if overflow, reset to 1
468     if (currentSyncId_ <= 0) {
469         currentSyncId_ = MIN_VALID_SYNC_ID;
470     }
471     return currentSyncId_;
472 }
473 
IsValidMode(int mode) const474 bool GenericSyncer::IsValidMode(int mode) const
475 {
476     if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
477         LOGE("[Syncer] Sync mode is not valid!");
478         return false;
479     }
480     return true;
481 }
482 
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const483 int GenericSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const
484 {
485     (void)param;
486     (void)engine;
487     (void)storage;
488     return E_OK;
489 }
490 
IsValidDevices(const std::vector<std::string> & devices) const491 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
492 {
493     if (devices.empty()) {
494         LOGE("[Syncer] devices is empty!");
495         return false;
496     }
497     return true;
498 }
499 
ClearSyncOperations(bool isClosedOperation)500 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
501 {
502     std::vector<SyncOperation *> syncOperation;
503     {
504         std::lock_guard<std::mutex> lock(operationMapLock_);
505         for (auto &item : syncOperationMap_) {
506             bool isBlockSync = item.second->IsBlockSync();
507             if (isBlockSync || !isClosedOperation) {
508                 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
509                 item.second->SetUnfinishedDevStatus(status);
510                 RefObject::IncObjRef(item.second);
511                 syncOperation.push_back(item.second);
512             }
513         }
514     }
515 
516     if (!isClosedOperation) { // means user changed
517         syncEngine_->NotifyUserChange();
518     }
519 
520     for (auto &operation : syncOperation) {
521         // block sync operation or userChange will trigger remove sync operation
522         // caller won't blocked for block sync
523         // caller won't blocked for userChange operation no mater it is block or non-block sync
524         TriggerSyncFinished(operation);
525         RefObject::DecObjRef(operation);
526     }
527     ClearInnerResource(isClosedOperation);
528 }
529 
ClearInnerResource(bool isClosedOperation)530 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
531 {
532     {
533         std::lock_guard<std::mutex> lock(operationMapLock_);
534         for (auto &iter : syncOperationMap_) {
535             RefObject::KillAndDecObjRef(iter.second);
536             iter.second = nullptr;
537         }
538         syncOperationMap_.clear();
539     }
540     {
541         std::lock_guard<std::mutex> lock(syncIdLock_);
542         if (isClosedOperation) {
543             connectionIdMap_.clear();
544         } else { // only need to clear syncid when user change
545             for (auto &item : connectionIdMap_) {
546                 item.second.clear();
547             }
548         }
549         syncIdMap_.clear();
550     }
551 }
552 
TriggerSyncFinished(SyncOperation * operation)553 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
554 {
555     if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
556         operation->Finished();
557     }
558 }
559 
OnSyncFinished(int syncId)560 void GenericSyncer::OnSyncFinished(int syncId)
561 {
562     (void)(RemoveSyncOperation(syncId));
563 }
564 
SyncModuleInit()565 int GenericSyncer::SyncModuleInit()
566 {
567     static bool isInit = false;
568     std::lock_guard<std::mutex> lock(moduleInitLock_);
569     if (!isInit) {
570         int errCode = SyncResourceInit();
571         if (errCode != E_OK) {
572             return errCode;
573         }
574         isInit = true;
575         return E_OK;
576     }
577     return E_OK;
578 }
579 
SyncResourceInit()580 int GenericSyncer::SyncResourceInit()
581 {
582     int errCode = TimeSync::RegisterTransformFunc();
583     if (errCode != E_OK) {
584         LOGE("Register timesync message transform func ERR!");
585         return errCode;
586     }
587     errCode = SingleVerSerializeManager::RegisterTransformFunc();
588     if (errCode != E_OK) {
589         LOGE("Register SingleVerDataSync message transform func ERR!");
590         return errCode;
591     }
592 #ifndef OMIT_MULTI_VER
593     errCode = CommitHistorySync::RegisterTransformFunc();
594     if (errCode != E_OK) {
595         LOGE("Register CommitHistorySync message transform func ERR!");
596         return errCode;
597     }
598     errCode = MultiVerDataSync::RegisterTransformFunc();
599     if (errCode != E_OK) {
600         LOGE("Register MultiVerDataSync message transform func ERR!");
601         return errCode;
602     }
603     errCode = ValueSliceSync::RegisterTransformFunc();
604     if (errCode != E_OK) {
605         LOGE("Register ValueSliceSync message transform func ERR!");
606         return errCode;
607     }
608 #endif
609     errCode = DeviceManager::RegisterTransformFunc();
610     if (errCode != E_OK) {
611         LOGE("Register DeviceManager message transform func ERR!");
612         return errCode;
613     }
614     errCode = AbilitySync::RegisterTransformFunc();
615     if (errCode != E_OK) {
616         LOGE("Register AbilitySync message transform func ERR!");
617         return errCode;
618     }
619     return E_OK;
620 }
621 
GetQueuedSyncSize(int * queuedSyncSize) const622 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
623 {
624     if (queuedSyncSize == nullptr) {
625         return -E_INVALID_ARGS;
626     }
627     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
628     *queuedSyncSize = queuedManualSyncSize_;
629     LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
630     return E_OK;
631 }
632 
SetQueuedSyncLimit(const int * queuedSyncLimit)633 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
634 {
635     if (queuedSyncLimit == nullptr) {
636         return -E_INVALID_ARGS;
637     }
638     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
639     queuedManualSyncLimit_ = *queuedSyncLimit;
640     LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
641     return E_OK;
642 }
643 
GetQueuedSyncLimit(int * queuedSyncLimit) const644 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
645 {
646     if (queuedSyncLimit == nullptr) {
647         return -E_INVALID_ARGS;
648     }
649     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
650     *queuedSyncLimit = queuedManualSyncLimit_;
651     LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
652     return E_OK;
653 }
654 
IsManualSync(int inMode) const655 bool GenericSyncer::IsManualSync(int inMode) const
656 {
657     int mode = SyncOperation::TransferSyncMode(inMode);
658     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
659         (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
660         return true;
661     }
662     return false;
663 }
664 
AddQueuedManualSyncSize(int mode,bool wait)665 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
666 {
667     if (IsManualSync(mode) && (!wait)) {
668         std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
669         if (!manualSyncEnable_) {
670             LOGI("[GenericSyncer] manualSyncEnable is Disable");
671             return -E_BUSY;
672         }
673         queuedManualSyncSize_++;
674     }
675     return E_OK;
676 }
677 
IsQueuedManualSyncFull(int mode,bool wait) const678 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
679 {
680     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
681     if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
682         LOGI("[GenericSyncer] manualSyncEnable_:false");
683         return true;
684     }
685     if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
686         if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
687             return false;
688         } else {
689             LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
690                 queuedManualSyncLimit_);
691             return true;
692         }
693     } else {
694         return false;
695     }
696 }
697 
SubQueuedSyncSize(void)698 void GenericSyncer::SubQueuedSyncSize(void)
699 {
700     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
701     queuedManualSyncSize_--;
702     if (queuedManualSyncSize_ < 0) {
703         LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
704         queuedManualSyncSize_ = 0;
705     }
706 }
707 
DisableManualSync(void)708 int GenericSyncer::DisableManualSync(void)
709 {
710     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
711     if (queuedManualSyncSize_ > 0) {
712         LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
713         return -E_BUSY;
714     }
715     manualSyncEnable_ = false;
716     LOGD("[GenericSyncer] DisableManualSync ok");
717     return E_OK;
718 }
719 
EnableManualSync(void)720 int GenericSyncer::EnableManualSync(void)
721 {
722     std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
723     manualSyncEnable_ = true;
724     LOGD("[GenericSyncer] EnableManualSync ok");
725     return E_OK;
726 }
727 
GetLocalIdentity(std::string & outTarget) const728 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
729 {
730     std::string deviceId;
731     int errCode =  RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
732     if (errCode != E_OK) {
733         LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
734         return errCode;
735     }
736     outTarget = DBCommon::TransferHashString(deviceId);
737     return E_OK;
738 }
739 
GetOnlineDevices(std::vector<std::string> & devices) const740 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
741 {
742     std::string identifier;
743     {
744         std::lock_guard<std::mutex> lock(syncerLock_);
745         // Get devices from AutoLaunch first.
746         if (syncInterface_ == nullptr) {
747             LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
748             return;
749         }
750         bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
751             false);
752         if (isSyncDualTupleMode) {
753             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
754                 "");
755         } else {
756             identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
757         }
758     }
759     RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
760     if (!devices.empty()) {
761         return;
762     }
763     std::lock_guard<std::mutex> lock(syncerLock_);
764     if (closing_) {
765         LOGW("[Syncer] Syncer is closing, return!");
766         return;
767     }
768     if (syncEngine_ != nullptr) {
769         syncEngine_->GetOnlineDevices(devices);
770     }
771 }
772 
SetSyncRetry(bool isRetry)773 int GenericSyncer::SetSyncRetry(bool isRetry)
774 {
775     if (syncEngine_ == nullptr) {
776         return -E_NOT_INIT;
777     }
778     syncEngine_->SetSyncRetry(isRetry);
779     return E_OK;
780 }
781 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)782 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
783 {
784     std::lock_guard<std::mutex> lock(syncerLock_);
785     if (syncEngine_ == nullptr) {
786         return -E_NOT_INIT;
787     }
788     int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
789     if (errCode == E_OK) {
790         syncEngine_->SetEqualIdentifierMap(identifier, targets);
791     }
792     return errCode;
793 }
794 
GetSyncDevicesStr(const std::vector<std::string> & devices) const795 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
796 {
797     std::string syncDevices;
798     for (const auto &dev:devices) {
799         syncDevices += DBCommon::StringMasking(dev);
800         syncDevices += ",";
801     }
802     if (syncDevices.empty()) {
803         return "";
804     }
805     return syncDevices.substr(0, syncDevices.size() - 1);
806 }
807 
StatusCheck() const808 int GenericSyncer::StatusCheck() const
809 {
810     if (!initialized_) {
811         LOGE("[Syncer] Syncer is not initialized, return!");
812         return -E_BUSY;
813     }
814     if (closing_) {
815         LOGW("[Syncer] Syncer is closing, return!");
816         return -E_BUSY;
817     }
818     return E_OK;
819 }
820 
SyncPreCheck(const SyncParma & param) const821 int GenericSyncer::SyncPreCheck(const SyncParma &param) const
822 {
823     ISyncEngine *engine = nullptr;
824     ISyncInterface *storage = nullptr;
825     int errCode = E_OK;
826     {
827         std::lock_guard<std::mutex> lock(syncerLock_);
828         errCode = StatusCheck();
829         if (errCode != E_OK) {
830             return errCode;
831         }
832         if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
833             return -E_INVALID_ARGS;
834         }
835         if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
836             LOGE("[Syncer] -E_BUSY");
837             return -E_BUSY;
838         }
839         storage = syncInterface_;
840         engine = syncEngine_;
841         if (storage == nullptr || engine == nullptr) {
842             return -E_BUSY;
843         }
844         storage->IncRefCount();
845         RefObject::IncObjRef(engine);
846     }
847     errCode = SyncConditionCheck(param, engine, storage);
848     storage->DecRefCount();
849     RefObject::DecObjRef(engine);
850     return errCode;
851 }
852 
InitSyncOperation(SyncOperation * operation,const SyncParma & param)853 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma &param)
854 {
855     operation->SetIdentifier(syncInterface_->GetIdentifier());
856     operation->Initialize();
857     operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
858     std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
859     operation->SetOnSyncFinished(onFinished);
860     operation->SetOnSyncFinalize(param.onFinalize);
861     if (param.isQuerySync) {
862         operation->SetQuery(param.syncQuery);
863     }
864 }
865 
BuildSyncEngine()866 int GenericSyncer::BuildSyncEngine()
867 {
868     if (syncEngine_ != nullptr) {
869         return E_OK;
870     }
871     syncEngine_ = CreateSyncEngine();
872     if (syncEngine_ == nullptr) {
873         return -E_OUT_OF_MEMORY;
874     }
875     syncEngine_->OnLastRef([this]() {
876         LOGD("[Syncer] SyncEngine finalized");
877         {
878             std::lock_guard<std::mutex> cvLock(engineMutex_);
879             engineFinalize_ = true;
880         }
881         engineFinalizeCv_.notify_all();
882     });
883     return E_OK;
884 }
885 
Dump(int fd)886 void GenericSyncer::Dump(int fd)
887 {
888     if (syncEngine_ == nullptr) {
889         return;
890     }
891     RefObject::IncObjRef(syncEngine_);
892     syncEngine_->Dump(fd);
893     RefObject::DecObjRef(syncEngine_);
894 }
895 
DumpSyncerBasicInfo()896 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
897 {
898     SyncerBasicInfo baseInfo;
899     if (syncEngine_ == nullptr) {
900         return baseInfo;
901     }
902     RefObject::IncObjRef(syncEngine_);
903     baseInfo.isSyncActive = syncEngine_->IsEngineActive();
904     RefObject::DecObjRef(syncEngine_);
905     return baseInfo;
906 }
907 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)908 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
909     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
910 {
911     ISyncEngine *syncEngine = nullptr;
912     {
913         std::lock_guard<std::mutex> lock(syncerLock_);
914         int errCode = StatusCheck();
915         if (errCode != E_OK) {
916             return errCode;
917         }
918         syncEngine = syncEngine_;
919         RefObject::IncObjRef(syncEngine);
920     }
921     if (syncEngine == nullptr) {
922         return -E_NOT_INIT;
923     }
924     int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
925     RefObject::DecObjRef(syncEngine);
926     return errCode;
927 }
928 
InitTimeChangedListener()929 int GenericSyncer::InitTimeChangedListener()
930 {
931     int errCode = E_OK;
932     if (timeChangedListener_ != nullptr) {
933         return errCode;
934     }
935     timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
936         [this](void *changedOffset) {
937             RecordTimeChangeOffset(changedOffset);
938         },
939         [this]() {
940             {
941                 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
942                 timeChangeListenerFinalize_ = true;
943             }
944             timeChangeCv_.notify_all();
945         }, errCode);
946     if (timeChangedListener_ == nullptr) {
947         LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
948         return errCode;
949     }
950     {
951         std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
952         timeChangeListenerFinalize_ = false;
953     }
954     return E_OK;
955 }
956 
ReleaseInnerResource()957 void GenericSyncer::ReleaseInnerResource()
958 {
959     NotificationChain::Listener *timeChangedListener = nullptr;
960     {
961         std::lock_guard<std::mutex> lock(syncerLock_);
962         if (timeChangedListener_ != nullptr) {
963             timeChangedListener = timeChangedListener_;
964             timeChangedListener_ = nullptr;
965         }
966         timeHelper_ = nullptr;
967         metadata_ = nullptr;
968     }
969     if (timeChangedListener != nullptr) {
970         timeChangedListener->Drop(true);
971         RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
972     }
973     std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
974     LOGD("[GenericSyncer] Begin wait time change listener finalize");
975     timeChangeCv_.wait(uniqueLock, [this]() {
976         return timeChangeListenerFinalize_;
977     });
978     LOGD("[GenericSyncer] End wait time change listener finalize");
979 }
980 
RecordTimeChangeOffset(void * changedOffset)981 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
982 {
983     std::shared_ptr<Metadata> metadata = nullptr;
984     ISyncInterface *storage = nullptr;
985     {
986         std::lock_guard<std::mutex> lock(syncerLock_);
987         if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
988             return;
989         }
990         storage = syncInterface_;
991         metadata = metadata_;
992         storage->IncRefCount();
993     }
994     TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
995         static_cast<TimeOffset>(TimeHelper::TO_100_NS);
996     TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
997     TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
998     Timestamp maxItemTime = 0;
999     storage->GetMaxTimestamp(maxItemTime);
1000     if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1001         orgOffset = TimeHelper::BUFFER_VALID_TIME -
1002             currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1003     }
1004     if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1005         orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1006             static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1007     }
1008     metadata->SaveLocalTimeOffset(orgOffset);
1009     ResetTimeSyncMarkByTimeChange(metadata, *storage);
1010     storage->DecRefCount();
1011 }
1012 
CloseInner(bool isClosedOperation)1013 int GenericSyncer::CloseInner(bool isClosedOperation)
1014 {
1015     {
1016         std::lock_guard<std::mutex> lock(syncerLock_);
1017         if (!initialized_) {
1018             LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1019             return -E_NOT_INIT;
1020         }
1021         initialized_ = false;
1022         if (closing_) {
1023             LOGE("[Syncer] Syncer is closing, return!");
1024             return -E_BUSY;
1025         }
1026         closing_ = true;
1027     }
1028     ClearSyncOperations(isClosedOperation);
1029     if (syncEngine_ != nullptr) {
1030         syncEngine_->Close();
1031         LOGD("[Syncer] Close SyncEngine!");
1032         std::lock_guard<std::mutex> lock(syncerLock_);
1033         closing_ = false;
1034     }
1035     return E_OK;
1036 }
1037 
GetSyncDataSize(const std::string & device,size_t & size) const1038 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1039 {
1040     uint64_t localWaterMark = 0;
1041     std::shared_ptr<Metadata> metadata = nullptr;
1042     {
1043         std::lock_guard<std::mutex> lock(syncerLock_);
1044         if (metadata_ == nullptr || syncInterface_ == nullptr) {
1045             return -E_INTERNAL_ERROR;
1046         }
1047         if (closing_) {
1048             LOGE("[Syncer] Syncer is closing, return!");
1049             return -E_BUSY;
1050         }
1051         int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1052         if (errCode != E_OK) {
1053             LOGE("[Syncer] syncer is restarting, return!");
1054             return errCode;
1055         }
1056         syncInterface_->IncRefCount();
1057         metadata = metadata_;
1058     }
1059     metadata->GetLocalWaterMark(device, localWaterMark);
1060     uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1061     DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1062     std::vector<SendDataItem> outData;
1063     ContinueToken token = nullptr;
1064     int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1065         outData, token, syncDataSizeInfo);
1066     if (token != nullptr) {
1067         static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1068         token = nullptr;
1069     }
1070     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1071         LOGE("calculate sync data size failed %d", errCode);
1072         syncInterface_->DecRefCount();
1073         return errCode;
1074     }
1075     uint32_t totalLen = 0;
1076     if (errCode == -E_UNFINISHED) {
1077         totalLen = expectedMtuSize;
1078     } else {
1079         totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1080     }
1081     for (auto &entry : outData) {
1082         delete entry;
1083         entry = nullptr;
1084     }
1085     syncInterface_->DecRefCount();
1086     // if larger than 1M, return 1M
1087     size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1088     return E_OK;
1089 }
1090 
IsNeedActive(ISyncInterface * syncInterface)1091 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1092 {
1093     bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1094     if (localOnly) {
1095         LOGD("[Syncer] Local only db, don't need active syncer");
1096         return false;
1097     }
1098     return true;
1099 }
1100 
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1101 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1102 {
1103     (void)clientId;
1104     (void)hashDevId;
1105     return -E_NOT_SUPPORT;
1106 }
1107 
InitStorageResource(ISyncInterface * syncInterface)1108 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1109 {
1110     // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1111     // It will be clear in destructor.
1112     int errCode = InitMetaData(syncInterface);
1113     if (errCode != E_OK) {
1114         return errCode;
1115     }
1116 
1117     // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1118     // It will be clear in destructor.
1119     errCode = InitTimeHelper(syncInterface);
1120     if (errCode != E_OK) {
1121         return errCode;
1122     }
1123 
1124     if (!IsNeedActive(syncInterface)) {
1125         return -E_NO_NEED_ACTIVE;
1126     }
1127     return errCode;
1128 }
1129 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1130 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1131 {
1132     std::shared_ptr<Metadata> metadata = nullptr;
1133     {
1134         std::lock_guard<std::mutex> autoLock(syncerLock_);
1135         metadata = metadata_;
1136     }
1137     if (metadata == nullptr) {
1138         LOGE("[Syncer] Metadata is not init");
1139         return -E_NOT_INIT;
1140     }
1141     std::string dev;
1142     bool devNeedHash = true;
1143     int errCode = metadata->GetHashDeviceId(device, dev);
1144     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1145         LOGE("[Syncer] Get watermark info failed %d", errCode);
1146         return errCode;
1147     } else if (errCode == E_OK) {
1148         devNeedHash = false;
1149     } else {
1150         dev = device;
1151     }
1152     return metadata->GetWaterMarkInfoFromDB(dev, devNeedHash, info);
1153 }
1154 
UpgradeSchemaVerInMeta()1155 int GenericSyncer::UpgradeSchemaVerInMeta()
1156 {
1157     std::shared_ptr<Metadata> metadata = nullptr;
1158     {
1159         std::lock_guard<std::mutex> autoLock(syncerLock_);
1160         metadata = metadata_;
1161     }
1162     if (metadata == nullptr) {
1163         LOGE("[Syncer] metadata is not init");
1164         return -E_NOT_INIT;
1165     }
1166     int errCode = metadata->ClearAllAbilitySyncFinishMark();
1167     if (errCode != E_OK) {
1168         LOGE("[Syncer] clear ability mark failed:%d", errCode);
1169         return errCode;
1170     }
1171     auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1172     if (err != E_OK) {
1173         LOGE("[Syncer] get local schema version failed:%d", err);
1174         return err;
1175     }
1176     errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1177     if (errCode != E_OK) {
1178         LOGE("[Syncer] increase local schema version failed:%d", errCode);
1179     }
1180     return errCode;
1181 }
1182 
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1183 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1184 {
1185     if (syncEngine_ != nullptr) {
1186         syncEngine_->TimeChange();
1187     }
1188     int errCode = metadata->ClearAllTimeSyncFinishMark();
1189     if (errCode != E_OK) {
1190         LOGW("[GenericSyncer] %s clear time sync finish mark failed %d", label_.c_str(), errCode);
1191     } else {
1192         LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1193         RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1194     }
1195 }
1196 
ResetSyncStatus()1197 void GenericSyncer::ResetSyncStatus()
1198 {
1199     std::shared_ptr<Metadata> metadata = nullptr;
1200     {
1201         std::lock_guard<std::mutex> lock(syncerLock_);
1202         if (metadata_ == nullptr) {
1203             return;
1204         }
1205         metadata = metadata_;
1206     }
1207     metadata->ClearAllAbilitySyncFinishMark();
1208 }
1209 
GetLocalTimeOffset()1210 int64_t GenericSyncer::GetLocalTimeOffset()
1211 {
1212     std::shared_ptr<Metadata> metadata = nullptr;
1213     {
1214         std::lock_guard<std::mutex> lock(syncerLock_);
1215         if (metadata_ == nullptr) {
1216             return 0;
1217         }
1218         metadata = metadata_;
1219     }
1220     return metadata->GetLocalTimeOffset();
1221 }
1222 
GetTaskCount()1223 int32_t GenericSyncer::GetTaskCount()
1224 {
1225     int32_t count = 0;
1226     {
1227         std::lock_guard<std::mutex> autoLock(operationMapLock_);
1228         count += static_cast<int32_t>(syncOperationMap_.size());
1229     }
1230     ISyncEngine *syncEngine = nullptr;
1231     {
1232         std::lock_guard<std::mutex> lock(syncerLock_);
1233         if (syncEngine_ == nullptr) {
1234             return count;
1235         }
1236         syncEngine = syncEngine_;
1237         RefObject::IncObjRef(syncEngine);
1238     }
1239     count += syncEngine->GetResponseTaskCount();
1240     RefObject::DecObjRef(syncEngine);
1241     return count;
1242 }
1243 } // namespace DistributedDB
1244