1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "generic_syncer.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "ref_object.h"
22 #include "sqlite_single_ver_natural_store.h"
23 #include "time_sync.h"
24 #include "single_ver_data_sync.h"
25 #ifndef OMIT_MULTI_VER
26 #include "commit_history_sync.h"
27 #include "multi_ver_data_sync.h"
28 #include "value_slice_sync.h"
29 #endif
30 #include "device_manager.h"
31 #include "db_constant.h"
32 #include "ability_sync.h"
33 #include "generic_single_ver_kv_entry.h"
34 #include "single_ver_serialize_manager.h"
35
36 namespace DistributedDB {
37 namespace {
38 constexpr uint32_t DEFAULT_MTU_SIZE = 1024u * 1024u; // 1M
39 }
40 const int GenericSyncer::MIN_VALID_SYNC_ID = 1;
41 std::mutex GenericSyncer::moduleInitLock_;
42 int GenericSyncer::currentSyncId_ = 0;
43 std::mutex GenericSyncer::syncIdLock_;
GenericSyncer()44 GenericSyncer::GenericSyncer()
45 : syncEngine_(nullptr),
46 syncInterface_(nullptr),
47 timeHelper_(nullptr),
48 metadata_(nullptr),
49 initialized_(false),
50 queuedManualSyncSize_(0),
51 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
52 manualSyncEnable_(true),
53 closing_(false),
54 engineFinalize_(false),
55 timeChangeListenerFinalize_(true),
56 timeChangedListener_(nullptr)
57 {
58 }
59
~GenericSyncer()60 GenericSyncer::~GenericSyncer()
61 {
62 LOGD("[GenericSyncer] ~GenericSyncer!");
63 if (syncEngine_ != nullptr) {
64 syncEngine_->OnKill([this]() { this->syncEngine_->Close(); });
65 RefObject::KillAndDecObjRef(syncEngine_);
66 // waiting all thread exist
67 std::unique_lock<std::mutex> cvLock(engineMutex_);
68 bool engineFinalize = engineFinalizeCv_.wait_for(cvLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT),
69 [this]() { return engineFinalize_; });
70 if (!engineFinalize) {
71 LOGW("syncer finalize before engine finalize!");
72 }
73 syncEngine_ = nullptr;
74 }
75 ReleaseInnerResource();
76 std::lock_guard<std::mutex> lock(syncerLock_);
77 syncInterface_ = nullptr;
78 }
79
Initialize(ISyncInterface * syncInterface,bool isNeedActive)80 int GenericSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
81 {
82 if (syncInterface == nullptr) {
83 LOGE("[Syncer] Init failed, the syncInterface is null!");
84 return -E_INVALID_ARGS;
85 }
86
87 {
88 std::lock_guard<std::mutex> lock(syncerLock_);
89 if (initialized_) {
90 return E_OK;
91 }
92 if (closing_) {
93 LOGE("[Syncer] Syncer is closing, return!");
94 return -E_BUSY;
95 }
96 std::vector<uint8_t> label = syncInterface->GetIdentifier();
97 label_ = DBCommon::StringMasking(DBCommon::VectorToHexString(label));
98
99 int errCode = InitStorageResource(syncInterface);
100 if (errCode != E_OK) {
101 return errCode;
102 }
103 // As timeChangedListener_ will record time change, it should not be clear even if engine init failed.
104 // It will be clear in destructor.
105 int errCodeTimeChangedListener = InitTimeChangedListener();
106 if (errCodeTimeChangedListener != E_OK) {
107 return -E_INTERNAL_ERROR;
108 }
109 errCode = CheckSyncActive(syncInterface, isNeedActive);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113
114 if (!RuntimeContext::GetInstance()->IsCommunicatorAggregatorValid()) {
115 return -E_NOT_INIT;
116 }
117
118 errCode = SyncModuleInit();
119 if (errCode != E_OK) {
120 LOGE("[Syncer] Sync ModuleInit ERR!");
121 return -E_INTERNAL_ERROR;
122 }
123
124 errCode = InitSyncEngine(syncInterface);
125 if (errCode != E_OK) {
126 return errCode;
127 }
128 syncEngine_->SetEqualIdentifier();
129 initialized_ = true;
130 }
131
132 // StartCommunicator may start an auto sync, this function can not in syncerLock_
133 syncEngine_->StartCommunicator();
134 if (RuntimeContext::GetInstance()->CheckDBTimeChange(syncInterface_->GetIdentifier())) {
135 ResetTimeSyncMarkByTimeChange(metadata_, *syncInterface_);
136 }
137 return E_OK;
138 }
139
Close(bool isClosedOperation)140 int GenericSyncer::Close(bool isClosedOperation)
141 {
142 int errCode = CloseInner(isClosedOperation);
143 if (errCode != -E_BUSY && isClosedOperation) {
144 ReleaseInnerResource();
145 }
146 return errCode;
147 }
148
Sync(const std::vector<std::string> & devices,int mode,const std::function<void (const std::map<std::string,int> &)> & onComplete,const std::function<void (void)> & onFinalize,bool wait=false)149 int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
150 const std::function<void(const std::map<std::string, int> &)> &onComplete,
151 const std::function<void(void)> &onFinalize, bool wait = false)
152 {
153 SyncParma param;
154 param.devices = devices;
155 param.mode = mode;
156 param.onComplete = onComplete;
157 param.onFinalize = onFinalize;
158 param.wait = wait;
159 return Sync(param);
160 }
161
Sync(const InternalSyncParma & param)162 int GenericSyncer::Sync(const InternalSyncParma ¶m)
163 {
164 SyncParma syncParam;
165 syncParam.devices = param.devices;
166 syncParam.mode = param.mode;
167 syncParam.isQuerySync = param.isQuerySync;
168 syncParam.syncQuery = param.syncQuery;
169 return Sync(syncParam);
170 }
171
Sync(const SyncParma & param)172 int GenericSyncer::Sync(const SyncParma ¶m)
173 {
174 return Sync(param, DBConstant::IGNORE_CONNECTION_ID);
175 }
176
Sync(const SyncParma & param,uint64_t connectionId)177 int GenericSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
178 {
179 int errCode = SyncPreCheck(param);
180 if (errCode != E_OK) {
181 return errCode;
182 }
183 errCode = AddQueuedManualSyncSize(param.mode, param.wait);
184 if (errCode != E_OK) {
185 return errCode;
186 }
187
188 uint32_t syncId = GenerateSyncId();
189 errCode = PrepareSync(param, syncId, connectionId);
190 if (errCode != E_OK) {
191 LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode);
192 return errCode;
193 }
194 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
195 return E_OK;
196 }
197
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)198 int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
199 {
200 auto *operation =
201 new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
202 if (operation == nullptr) {
203 SubQueuedSyncSize();
204 return -E_OUT_OF_MEMORY;
205 }
206 ISyncEngine *engine = nullptr;
207 {
208 std::lock_guard<std::mutex> autoLock(syncerLock_);
209 PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
210 InitSyncOperation(operation, param);
211 LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
212 param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
213 engine = syncEngine_;
214 RefObject::IncObjRef(engine);
215 }
216 AddSyncOperation(engine, operation);
217 RefObject::DecObjRef(engine);
218 PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
219 if (connectionId != DBConstant::IGNORE_CONNECTION_ID) {
220 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
221 connectionIdMap_[connectionId].push_back(static_cast<int>(syncId));
222 syncIdMap_[static_cast<int>(syncId)] = connectionId;
223 }
224 if (operation->CheckIsAllFinished()) {
225 operation->Finished();
226 RefObject::KillAndDecObjRef(operation);
227 } else {
228 operation->WaitIfNeed();
229 RefObject::DecObjRef(operation);
230 }
231 return E_OK;
232 }
233
RemoveSyncOperation(int syncId)234 int GenericSyncer::RemoveSyncOperation(int syncId)
235 {
236 SyncOperation *operation = nullptr;
237 std::unique_lock<std::mutex> lock(operationMapLock_);
238 auto iter = syncOperationMap_.find(syncId);
239 if (iter != syncOperationMap_.end()) {
240 LOGD("[Syncer] RemoveSyncOperation id:%d.", syncId);
241 operation = iter->second;
242 syncOperationMap_.erase(syncId);
243 lock.unlock();
244 if ((!operation->IsAutoSync()) && (!operation->IsBlockSync()) && (!operation->IsAutoControlCmd())) {
245 SubQueuedSyncSize();
246 }
247 operation->NotifyIfNeed();
248 RefObject::KillAndDecObjRef(operation);
249 operation = nullptr;
250 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
251 if (syncIdMap_.find(syncId) == syncIdMap_.end()) {
252 return E_OK;
253 }
254 uint64_t connectionId = syncIdMap_[syncId];
255 if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) {
256 connectionIdMap_[connectionId].remove(syncId);
257 }
258 syncIdMap_.erase(syncId);
259 return E_OK;
260 }
261 return -E_INVALID_ARGS;
262 }
263
StopSync(uint64_t connectionId)264 int GenericSyncer::StopSync(uint64_t connectionId)
265 {
266 std::list<int> syncIdList;
267 {
268 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
269 if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) {
270 return E_OK;
271 }
272 syncIdList = connectionIdMap_[connectionId];
273 connectionIdMap_.erase(connectionId);
274 }
275 for (auto syncId : syncIdList) {
276 RemoveSyncOperation(syncId);
277 if (syncEngine_ != nullptr) {
278 syncEngine_->AbortMachineIfNeed(syncId);
279 }
280 }
281 if (syncEngine_ != nullptr) {
282 syncEngine_->NotifyConnectionClosed(connectionId);
283 }
284 return E_OK;
285 }
286
GetTimestamp()287 uint64_t GenericSyncer::GetTimestamp()
288 {
289 std::shared_ptr<TimeHelper> timeHelper = nullptr;
290 ISyncInterface *storage = nullptr;
291 {
292 std::lock_guard<std::mutex> lock(syncerLock_);
293 timeHelper = timeHelper_;
294 if (syncInterface_ != nullptr) {
295 storage = syncInterface_;
296 storage->IncRefCount();
297 }
298 }
299 if (storage == nullptr) {
300 return TimeHelper::GetSysCurrentTime();
301 }
302 if (timeHelper == nullptr) {
303 storage->DecRefCount();
304 return TimeHelper::GetSysCurrentTime();
305 }
306 uint64_t timestamp = timeHelper->GetTime();
307 storage->DecRefCount();
308 return timestamp;
309 }
310
QueryAutoSync(const InternalSyncParma & param)311 void GenericSyncer::QueryAutoSync(const InternalSyncParma ¶m)
312 {
313 if (!initialized_) {
314 LOGW("[Syncer] Syncer has not Init");
315 return;
316 }
317 LOGI("[GenericSyncer] trigger query syncmode=%u,dev=%s", param.mode, GetSyncDevicesStr(param.devices).c_str());
318 ISyncInterface *syncInterface = nullptr;
319 ISyncEngine *engine = nullptr;
320 {
321 std::lock_guard<std::mutex> lock(syncerLock_);
322 if (syncInterface_ == nullptr || syncEngine_ == nullptr) {
323 LOGW("[Syncer] Syncer has not Init");
324 return;
325 }
326 syncInterface = syncInterface_;
327 engine = syncEngine_;
328 syncInterface->IncRefCount();
329 RefObject::IncObjRef(engine);
330 }
331 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, param, engine, syncInterface] {
332 int errCode = Sync(param);
333 if (errCode != E_OK) {
334 LOGE("[GenericSyncer] sync start by QueryAutoSync failed err %d", errCode);
335 }
336 RefObject::DecObjRef(engine);
337 syncInterface->DecRefCount();
338 });
339 if (retCode != E_OK) {
340 LOGE("[GenericSyncer] QueryAutoSync triggler sync retCode:%d", retCode);
341 RefObject::DecObjRef(engine);
342 syncInterface->DecRefCount();
343 }
344 }
345
AddSyncOperation(ISyncEngine * engine,SyncOperation * operation)346 void GenericSyncer::AddSyncOperation(ISyncEngine *engine, SyncOperation *operation)
347 {
348 if (operation == nullptr) {
349 return;
350 }
351
352 LOGD("[Syncer] AddSyncOperation.");
353 engine->AddSyncOperation(operation);
354
355 if (operation->CheckIsAllFinished()) {
356 return;
357 }
358
359 std::lock_guard<std::mutex> lock(operationMapLock_);
360 syncOperationMap_.insert(std::pair<int, SyncOperation *>(operation->GetSyncId(), operation));
361 // To make sure operation alive before WaitIfNeed out
362 RefObject::IncObjRef(operation);
363 }
364
SyncOperationKillCallbackInner(int syncId)365 void GenericSyncer::SyncOperationKillCallbackInner(int syncId)
366 {
367 if (syncEngine_ != nullptr) {
368 LOGI("[Syncer] Operation on kill id = %d", syncId);
369 syncEngine_->RemoveSyncOperation(syncId);
370 }
371 }
372
SyncOperationKillCallback(int syncId)373 void GenericSyncer::SyncOperationKillCallback(int syncId)
374 {
375 SyncOperationKillCallbackInner(syncId);
376 }
377
InitMetaData(ISyncInterface * syncInterface)378 int GenericSyncer::InitMetaData(ISyncInterface *syncInterface)
379 {
380 if (metadata_ != nullptr) {
381 return E_OK;
382 }
383
384 metadata_ = std::make_shared<Metadata>();
385 if (metadata_ == nullptr) {
386 LOGE("[Syncer] metadata make shared failed");
387 return -E_OUT_OF_MEMORY;
388 }
389 int errCode = metadata_->Initialize(syncInterface);
390 if (errCode != E_OK) {
391 LOGE("[Syncer] metadata Initializeate failed! err %d.", errCode);
392 metadata_ = nullptr;
393 }
394 syncInterface_ = syncInterface;
395 return errCode;
396 }
397
InitTimeHelper(ISyncInterface * syncInterface)398 int GenericSyncer::InitTimeHelper(ISyncInterface *syncInterface)
399 {
400 if (timeHelper_ != nullptr) {
401 return E_OK;
402 }
403
404 timeHelper_ = std::make_shared<TimeHelper>();
405 if (timeHelper_ == nullptr) {
406 return -E_OUT_OF_MEMORY;
407 }
408 int errCode = timeHelper_->Initialize(syncInterface, metadata_);
409 if (errCode != E_OK) {
410 LOGE("[Syncer] TimeHelper init failed! err:%d.", errCode);
411 timeHelper_ = nullptr;
412 }
413 return errCode;
414 }
415
InitSyncEngine(ISyncInterface * syncInterface)416 int GenericSyncer::InitSyncEngine(ISyncInterface *syncInterface)
417 {
418 if (syncEngine_ != nullptr && syncEngine_->IsEngineActive()) {
419 LOGI("[Syncer] syncEngine is active");
420 return E_OK;
421 }
422 int errCode = BuildSyncEngine();
423 if (errCode != E_OK) {
424 return errCode;
425 }
426 const std::function<void(std::string)> onlineFunc = [this](const std::string &device) {
427 RemoteDataChanged(device);
428 };
429 const std::function<void(std::string)> offlineFunc = [this](const std::string &device) {
430 RemoteDeviceOffline(device);
431 };
432 const std::function<void(const InternalSyncParma ¶m)> queryAutoSyncFunc =
433 [this](const InternalSyncParma &syncParam) { QueryAutoSync(syncParam); };
434 const ISyncEngine::InitCallbackParam param = { onlineFunc, offlineFunc, queryAutoSyncFunc };
435 errCode = syncEngine_->Initialize(syncInterface, metadata_, param);
436 if (errCode == E_OK) {
437 syncInterface->IncRefCount();
438 label_ = syncEngine_->GetLabel();
439 return E_OK;
440 } else {
441 LOGE("[Syncer] SyncEngine init failed! err:%d.", errCode);
442 RefObject::KillAndDecObjRef(syncEngine_);
443 syncEngine_ = nullptr;
444 return errCode;
445 }
446 }
447
CheckSyncActive(ISyncInterface * syncInterface,bool isNeedActive)448 int GenericSyncer::CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive)
449 {
450 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE,
451 false);
452 if (!isSyncDualTupleMode || isNeedActive) {
453 return E_OK;
454 }
455 LOGI("[Syncer] syncer no need to active");
456 int errCode = BuildSyncEngine();
457 if (errCode != E_OK) {
458 return errCode;
459 }
460 return -E_NO_NEED_ACTIVE;
461 }
462
GenerateSyncId()463 uint32_t GenericSyncer::GenerateSyncId()
464 {
465 std::lock_guard<std::mutex> lock(syncIdLock_);
466 currentSyncId_++;
467 // if overflow, reset to 1
468 if (currentSyncId_ <= 0) {
469 currentSyncId_ = MIN_VALID_SYNC_ID;
470 }
471 return currentSyncId_;
472 }
473
IsValidMode(int mode) const474 bool GenericSyncer::IsValidMode(int mode) const
475 {
476 if ((mode >= SyncModeType::INVALID_MODE) || (mode < SyncModeType::PUSH)) {
477 LOGE("[Syncer] Sync mode is not valid!");
478 return false;
479 }
480 return true;
481 }
482
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const483 int GenericSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine, ISyncInterface *storage) const
484 {
485 (void)param;
486 (void)engine;
487 (void)storage;
488 return E_OK;
489 }
490
IsValidDevices(const std::vector<std::string> & devices) const491 bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
492 {
493 if (devices.empty()) {
494 LOGE("[Syncer] devices is empty!");
495 return false;
496 }
497 return true;
498 }
499
ClearSyncOperations(bool isClosedOperation)500 void GenericSyncer::ClearSyncOperations(bool isClosedOperation)
501 {
502 std::vector<SyncOperation *> syncOperation;
503 {
504 std::lock_guard<std::mutex> lock(operationMapLock_);
505 for (auto &item : syncOperationMap_) {
506 bool isBlockSync = item.second->IsBlockSync();
507 if (isBlockSync || !isClosedOperation) {
508 int status = (!isClosedOperation) ? SyncOperation::OP_USER_CHANGED : SyncOperation::OP_FAILED;
509 item.second->SetUnfinishedDevStatus(status);
510 RefObject::IncObjRef(item.second);
511 syncOperation.push_back(item.second);
512 }
513 }
514 }
515
516 if (!isClosedOperation) { // means user changed
517 syncEngine_->NotifyUserChange();
518 }
519
520 for (auto &operation : syncOperation) {
521 // block sync operation or userChange will trigger remove sync operation
522 // caller won't blocked for block sync
523 // caller won't blocked for userChange operation no mater it is block or non-block sync
524 TriggerSyncFinished(operation);
525 RefObject::DecObjRef(operation);
526 }
527 ClearInnerResource(isClosedOperation);
528 }
529
ClearInnerResource(bool isClosedOperation)530 void GenericSyncer::ClearInnerResource(bool isClosedOperation)
531 {
532 {
533 std::lock_guard<std::mutex> lock(operationMapLock_);
534 for (auto &iter : syncOperationMap_) {
535 RefObject::KillAndDecObjRef(iter.second);
536 iter.second = nullptr;
537 }
538 syncOperationMap_.clear();
539 }
540 {
541 std::lock_guard<std::mutex> lock(syncIdLock_);
542 if (isClosedOperation) {
543 connectionIdMap_.clear();
544 } else { // only need to clear syncid when user change
545 for (auto &item : connectionIdMap_) {
546 item.second.clear();
547 }
548 }
549 syncIdMap_.clear();
550 }
551 }
552
TriggerSyncFinished(SyncOperation * operation)553 void GenericSyncer::TriggerSyncFinished(SyncOperation *operation)
554 {
555 if (operation != nullptr && operation->CheckIsAllFinished()) { // LCOV_EXCL_BR_LINE
556 operation->Finished();
557 }
558 }
559
OnSyncFinished(int syncId)560 void GenericSyncer::OnSyncFinished(int syncId)
561 {
562 (void)(RemoveSyncOperation(syncId));
563 }
564
SyncModuleInit()565 int GenericSyncer::SyncModuleInit()
566 {
567 static bool isInit = false;
568 std::lock_guard<std::mutex> lock(moduleInitLock_);
569 if (!isInit) {
570 int errCode = SyncResourceInit();
571 if (errCode != E_OK) {
572 return errCode;
573 }
574 isInit = true;
575 return E_OK;
576 }
577 return E_OK;
578 }
579
SyncResourceInit()580 int GenericSyncer::SyncResourceInit()
581 {
582 int errCode = TimeSync::RegisterTransformFunc();
583 if (errCode != E_OK) {
584 LOGE("Register timesync message transform func ERR!");
585 return errCode;
586 }
587 errCode = SingleVerSerializeManager::RegisterTransformFunc();
588 if (errCode != E_OK) {
589 LOGE("Register SingleVerDataSync message transform func ERR!");
590 return errCode;
591 }
592 #ifndef OMIT_MULTI_VER
593 errCode = CommitHistorySync::RegisterTransformFunc();
594 if (errCode != E_OK) {
595 LOGE("Register CommitHistorySync message transform func ERR!");
596 return errCode;
597 }
598 errCode = MultiVerDataSync::RegisterTransformFunc();
599 if (errCode != E_OK) {
600 LOGE("Register MultiVerDataSync message transform func ERR!");
601 return errCode;
602 }
603 errCode = ValueSliceSync::RegisterTransformFunc();
604 if (errCode != E_OK) {
605 LOGE("Register ValueSliceSync message transform func ERR!");
606 return errCode;
607 }
608 #endif
609 errCode = DeviceManager::RegisterTransformFunc();
610 if (errCode != E_OK) {
611 LOGE("Register DeviceManager message transform func ERR!");
612 return errCode;
613 }
614 errCode = AbilitySync::RegisterTransformFunc();
615 if (errCode != E_OK) {
616 LOGE("Register AbilitySync message transform func ERR!");
617 return errCode;
618 }
619 return E_OK;
620 }
621
GetQueuedSyncSize(int * queuedSyncSize) const622 int GenericSyncer::GetQueuedSyncSize(int *queuedSyncSize) const
623 {
624 if (queuedSyncSize == nullptr) {
625 return -E_INVALID_ARGS;
626 }
627 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
628 *queuedSyncSize = queuedManualSyncSize_;
629 LOGI("[GenericSyncer] GetQueuedSyncSize:%d", queuedManualSyncSize_);
630 return E_OK;
631 }
632
SetQueuedSyncLimit(const int * queuedSyncLimit)633 int GenericSyncer::SetQueuedSyncLimit(const int *queuedSyncLimit)
634 {
635 if (queuedSyncLimit == nullptr) {
636 return -E_INVALID_ARGS;
637 }
638 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
639 queuedManualSyncLimit_ = *queuedSyncLimit;
640 LOGI("[GenericSyncer] SetQueuedSyncLimit:%d", queuedManualSyncLimit_);
641 return E_OK;
642 }
643
GetQueuedSyncLimit(int * queuedSyncLimit) const644 int GenericSyncer::GetQueuedSyncLimit(int *queuedSyncLimit) const
645 {
646 if (queuedSyncLimit == nullptr) {
647 return -E_INVALID_ARGS;
648 }
649 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
650 *queuedSyncLimit = queuedManualSyncLimit_;
651 LOGI("[GenericSyncer] GetQueuedSyncLimit:%d", queuedManualSyncLimit_);
652 return E_OK;
653 }
654
IsManualSync(int inMode) const655 bool GenericSyncer::IsManualSync(int inMode) const
656 {
657 int mode = SyncOperation::TransferSyncMode(inMode);
658 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH) || (mode == SyncModeType::PUSH_AND_PULL) ||
659 (mode == SyncModeType::SUBSCRIBE_QUERY) || (mode == SyncModeType::UNSUBSCRIBE_QUERY)) {
660 return true;
661 }
662 return false;
663 }
664
AddQueuedManualSyncSize(int mode,bool wait)665 int GenericSyncer::AddQueuedManualSyncSize(int mode, bool wait)
666 {
667 if (IsManualSync(mode) && (!wait)) {
668 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
669 if (!manualSyncEnable_) {
670 LOGI("[GenericSyncer] manualSyncEnable is Disable");
671 return -E_BUSY;
672 }
673 queuedManualSyncSize_++;
674 }
675 return E_OK;
676 }
677
IsQueuedManualSyncFull(int mode,bool wait) const678 bool GenericSyncer::IsQueuedManualSyncFull(int mode, bool wait) const
679 {
680 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
681 if (IsManualSync(mode) && (!manualSyncEnable_)) { // LCOV_EXCL_BR_LINE
682 LOGI("[GenericSyncer] manualSyncEnable_:false");
683 return true;
684 }
685 if (IsManualSync(mode) && (!wait)) { // LCOV_EXCL_BR_LINE
686 if (queuedManualSyncSize_ < queuedManualSyncLimit_) {
687 return false;
688 } else {
689 LOGD("[GenericSyncer] queuedManualSyncSize_:%d < queuedManualSyncLimit_:%d", queuedManualSyncSize_,
690 queuedManualSyncLimit_);
691 return true;
692 }
693 } else {
694 return false;
695 }
696 }
697
SubQueuedSyncSize(void)698 void GenericSyncer::SubQueuedSyncSize(void)
699 {
700 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
701 queuedManualSyncSize_--;
702 if (queuedManualSyncSize_ < 0) {
703 LOGE("[GenericSyncer] queuedManualSyncSize_ < 0!");
704 queuedManualSyncSize_ = 0;
705 }
706 }
707
DisableManualSync(void)708 int GenericSyncer::DisableManualSync(void)
709 {
710 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
711 if (queuedManualSyncSize_ > 0) {
712 LOGD("[GenericSyncer] DisableManualSync fail, queuedManualSyncSize_:%d", queuedManualSyncSize_);
713 return -E_BUSY;
714 }
715 manualSyncEnable_ = false;
716 LOGD("[GenericSyncer] DisableManualSync ok");
717 return E_OK;
718 }
719
EnableManualSync(void)720 int GenericSyncer::EnableManualSync(void)
721 {
722 std::lock_guard<std::mutex> lock(queuedManualSyncLock_);
723 manualSyncEnable_ = true;
724 LOGD("[GenericSyncer] EnableManualSync ok");
725 return E_OK;
726 }
727
GetLocalIdentity(std::string & outTarget) const728 int GenericSyncer::GetLocalIdentity(std::string &outTarget) const
729 {
730 std::string deviceId;
731 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(deviceId);
732 if (errCode != E_OK) {
733 LOGE("[GenericSyncer] GetLocalIdentity fail errCode:%d", errCode);
734 return errCode;
735 }
736 outTarget = DBCommon::TransferHashString(deviceId);
737 return E_OK;
738 }
739
GetOnlineDevices(std::vector<std::string> & devices) const740 void GenericSyncer::GetOnlineDevices(std::vector<std::string> &devices) const
741 {
742 std::string identifier;
743 {
744 std::lock_guard<std::mutex> lock(syncerLock_);
745 // Get devices from AutoLaunch first.
746 if (syncInterface_ == nullptr) {
747 LOGI("[Syncer] GetOnlineDevices syncInterface_ is nullptr");
748 return;
749 }
750 bool isSyncDualTupleMode = syncInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
751 false);
752 if (isSyncDualTupleMode) {
753 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA,
754 "");
755 } else {
756 identifier = syncInterface_->GetDbProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
757 }
758 }
759 RuntimeContext::GetInstance()->GetAutoLaunchSyncDevices(identifier, devices);
760 if (!devices.empty()) {
761 return;
762 }
763 std::lock_guard<std::mutex> lock(syncerLock_);
764 if (closing_) {
765 LOGW("[Syncer] Syncer is closing, return!");
766 return;
767 }
768 if (syncEngine_ != nullptr) {
769 syncEngine_->GetOnlineDevices(devices);
770 }
771 }
772
SetSyncRetry(bool isRetry)773 int GenericSyncer::SetSyncRetry(bool isRetry)
774 {
775 if (syncEngine_ == nullptr) {
776 return -E_NOT_INIT;
777 }
778 syncEngine_->SetSyncRetry(isRetry);
779 return E_OK;
780 }
781
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)782 int GenericSyncer::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
783 {
784 std::lock_guard<std::mutex> lock(syncerLock_);
785 if (syncEngine_ == nullptr) {
786 return -E_NOT_INIT;
787 }
788 int errCode = syncEngine_->SetEqualIdentifier(identifier, targets);
789 if (errCode == E_OK) {
790 syncEngine_->SetEqualIdentifierMap(identifier, targets);
791 }
792 return errCode;
793 }
794
GetSyncDevicesStr(const std::vector<std::string> & devices) const795 std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &devices) const
796 {
797 std::string syncDevices;
798 for (const auto &dev:devices) {
799 syncDevices += DBCommon::StringMasking(dev);
800 syncDevices += ",";
801 }
802 if (syncDevices.empty()) {
803 return "";
804 }
805 return syncDevices.substr(0, syncDevices.size() - 1);
806 }
807
StatusCheck() const808 int GenericSyncer::StatusCheck() const
809 {
810 if (!initialized_) {
811 LOGE("[Syncer] Syncer is not initialized, return!");
812 return -E_BUSY;
813 }
814 if (closing_) {
815 LOGW("[Syncer] Syncer is closing, return!");
816 return -E_BUSY;
817 }
818 return E_OK;
819 }
820
SyncPreCheck(const SyncParma & param) const821 int GenericSyncer::SyncPreCheck(const SyncParma ¶m) const
822 {
823 ISyncEngine *engine = nullptr;
824 ISyncInterface *storage = nullptr;
825 int errCode = E_OK;
826 {
827 std::lock_guard<std::mutex> lock(syncerLock_);
828 errCode = StatusCheck();
829 if (errCode != E_OK) {
830 return errCode;
831 }
832 if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
833 return -E_INVALID_ARGS;
834 }
835 if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
836 LOGE("[Syncer] -E_BUSY");
837 return -E_BUSY;
838 }
839 storage = syncInterface_;
840 engine = syncEngine_;
841 if (storage == nullptr || engine == nullptr) {
842 return -E_BUSY;
843 }
844 storage->IncRefCount();
845 RefObject::IncObjRef(engine);
846 }
847 errCode = SyncConditionCheck(param, engine, storage);
848 storage->DecRefCount();
849 RefObject::DecObjRef(engine);
850 return errCode;
851 }
852
InitSyncOperation(SyncOperation * operation,const SyncParma & param)853 void GenericSyncer::InitSyncOperation(SyncOperation *operation, const SyncParma ¶m)
854 {
855 operation->SetIdentifier(syncInterface_->GetIdentifier());
856 operation->Initialize();
857 operation->OnKill(std::bind(&GenericSyncer::SyncOperationKillCallback, this, operation->GetSyncId()));
858 std::function<void(int)> onFinished = std::bind(&GenericSyncer::OnSyncFinished, this, std::placeholders::_1);
859 operation->SetOnSyncFinished(onFinished);
860 operation->SetOnSyncFinalize(param.onFinalize);
861 if (param.isQuerySync) {
862 operation->SetQuery(param.syncQuery);
863 }
864 }
865
BuildSyncEngine()866 int GenericSyncer::BuildSyncEngine()
867 {
868 if (syncEngine_ != nullptr) {
869 return E_OK;
870 }
871 syncEngine_ = CreateSyncEngine();
872 if (syncEngine_ == nullptr) {
873 return -E_OUT_OF_MEMORY;
874 }
875 syncEngine_->OnLastRef([this]() {
876 LOGD("[Syncer] SyncEngine finalized");
877 {
878 std::lock_guard<std::mutex> cvLock(engineMutex_);
879 engineFinalize_ = true;
880 }
881 engineFinalizeCv_.notify_all();
882 });
883 return E_OK;
884 }
885
Dump(int fd)886 void GenericSyncer::Dump(int fd)
887 {
888 if (syncEngine_ == nullptr) {
889 return;
890 }
891 RefObject::IncObjRef(syncEngine_);
892 syncEngine_->Dump(fd);
893 RefObject::DecObjRef(syncEngine_);
894 }
895
DumpSyncerBasicInfo()896 SyncerBasicInfo GenericSyncer::DumpSyncerBasicInfo()
897 {
898 SyncerBasicInfo baseInfo;
899 if (syncEngine_ == nullptr) {
900 return baseInfo;
901 }
902 RefObject::IncObjRef(syncEngine_);
903 baseInfo.isSyncActive = syncEngine_->IsEngineActive();
904 RefObject::DecObjRef(syncEngine_);
905 return baseInfo;
906 }
907
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)908 int GenericSyncer::RemoteQuery(const std::string &device, const RemoteCondition &condition,
909 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
910 {
911 ISyncEngine *syncEngine = nullptr;
912 {
913 std::lock_guard<std::mutex> lock(syncerLock_);
914 int errCode = StatusCheck();
915 if (errCode != E_OK) {
916 return errCode;
917 }
918 syncEngine = syncEngine_;
919 RefObject::IncObjRef(syncEngine);
920 }
921 if (syncEngine == nullptr) {
922 return -E_NOT_INIT;
923 }
924 int errCode = syncEngine->RemoteQuery(device, condition, timeout, connectionId, result);
925 RefObject::DecObjRef(syncEngine);
926 return errCode;
927 }
928
InitTimeChangedListener()929 int GenericSyncer::InitTimeChangedListener()
930 {
931 int errCode = E_OK;
932 if (timeChangedListener_ != nullptr) {
933 return errCode;
934 }
935 timeChangedListener_ = RuntimeContext::GetInstance()->RegisterTimeChangedLister(
936 [this](void *changedOffset) {
937 RecordTimeChangeOffset(changedOffset);
938 },
939 [this]() {
940 {
941 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
942 timeChangeListenerFinalize_ = true;
943 }
944 timeChangeCv_.notify_all();
945 }, errCode);
946 if (timeChangedListener_ == nullptr) {
947 LOGE("[GenericSyncer] Init RegisterTimeChangedLister failed");
948 return errCode;
949 }
950 {
951 std::lock_guard<std::mutex> autoLock(timeChangeListenerMutex_);
952 timeChangeListenerFinalize_ = false;
953 }
954 return E_OK;
955 }
956
ReleaseInnerResource()957 void GenericSyncer::ReleaseInnerResource()
958 {
959 NotificationChain::Listener *timeChangedListener = nullptr;
960 {
961 std::lock_guard<std::mutex> lock(syncerLock_);
962 if (timeChangedListener_ != nullptr) {
963 timeChangedListener = timeChangedListener_;
964 timeChangedListener_ = nullptr;
965 }
966 timeHelper_ = nullptr;
967 metadata_ = nullptr;
968 }
969 if (timeChangedListener != nullptr) {
970 timeChangedListener->Drop(true);
971 RuntimeContext::GetInstance()->StopTimeTickMonitorIfNeed();
972 }
973 std::unique_lock<std::mutex> uniqueLock(timeChangeListenerMutex_);
974 LOGD("[GenericSyncer] Begin wait time change listener finalize");
975 timeChangeCv_.wait(uniqueLock, [this]() {
976 return timeChangeListenerFinalize_;
977 });
978 LOGD("[GenericSyncer] End wait time change listener finalize");
979 }
980
RecordTimeChangeOffset(void * changedOffset)981 void GenericSyncer::RecordTimeChangeOffset(void *changedOffset)
982 {
983 std::shared_ptr<Metadata> metadata = nullptr;
984 ISyncInterface *storage = nullptr;
985 {
986 std::lock_guard<std::mutex> lock(syncerLock_);
987 if (changedOffset == nullptr || metadata_ == nullptr || syncInterface_ == nullptr) {
988 return;
989 }
990 storage = syncInterface_;
991 metadata = metadata_;
992 storage->IncRefCount();
993 }
994 TimeOffset changedTimeOffset = *(reinterpret_cast<TimeOffset *>(changedOffset)) *
995 static_cast<TimeOffset>(TimeHelper::TO_100_NS);
996 TimeOffset orgOffset = metadata->GetLocalTimeOffset() - changedTimeOffset;
997 TimeOffset currentSysTime = static_cast<TimeOffset>(TimeHelper::GetSysCurrentTime());
998 Timestamp maxItemTime = 0;
999 storage->GetMaxTimestamp(maxItemTime);
1000 if ((orgOffset + currentSysTime) > TimeHelper::BUFFER_VALID_TIME) {
1001 orgOffset = TimeHelper::BUFFER_VALID_TIME -
1002 currentSysTime + static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS);
1003 }
1004 if ((currentSysTime + orgOffset) <= static_cast<TimeOffset>(maxItemTime)) {
1005 orgOffset = static_cast<TimeOffset>(maxItemTime) - currentSysTime +
1006 static_cast<TimeOffset>(TimeHelper::MS_TO_100_NS); // 1ms
1007 }
1008 metadata->SaveLocalTimeOffset(orgOffset);
1009 ResetTimeSyncMarkByTimeChange(metadata, *storage);
1010 storage->DecRefCount();
1011 }
1012
CloseInner(bool isClosedOperation)1013 int GenericSyncer::CloseInner(bool isClosedOperation)
1014 {
1015 {
1016 std::lock_guard<std::mutex> lock(syncerLock_);
1017 if (!initialized_) {
1018 LOGW("[Syncer] Syncer[%s] don't need to close, because it has not been init", label_.c_str());
1019 return -E_NOT_INIT;
1020 }
1021 initialized_ = false;
1022 if (closing_) {
1023 LOGE("[Syncer] Syncer is closing, return!");
1024 return -E_BUSY;
1025 }
1026 closing_ = true;
1027 }
1028 ClearSyncOperations(isClosedOperation);
1029 if (syncEngine_ != nullptr) {
1030 syncEngine_->Close();
1031 LOGD("[Syncer] Close SyncEngine!");
1032 std::lock_guard<std::mutex> lock(syncerLock_);
1033 closing_ = false;
1034 }
1035 return E_OK;
1036 }
1037
GetSyncDataSize(const std::string & device,size_t & size) const1038 int GenericSyncer::GetSyncDataSize(const std::string &device, size_t &size) const
1039 {
1040 uint64_t localWaterMark = 0;
1041 std::shared_ptr<Metadata> metadata = nullptr;
1042 {
1043 std::lock_guard<std::mutex> lock(syncerLock_);
1044 if (metadata_ == nullptr || syncInterface_ == nullptr) {
1045 return -E_INTERNAL_ERROR;
1046 }
1047 if (closing_) {
1048 LOGE("[Syncer] Syncer is closing, return!");
1049 return -E_BUSY;
1050 }
1051 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->TryHandle();
1052 if (errCode != E_OK) {
1053 LOGE("[Syncer] syncer is restarting, return!");
1054 return errCode;
1055 }
1056 syncInterface_->IncRefCount();
1057 metadata = metadata_;
1058 }
1059 metadata->GetLocalWaterMark(device, localWaterMark);
1060 uint32_t expectedMtuSize = DEFAULT_MTU_SIZE;
1061 DataSizeSpecInfo syncDataSizeInfo = {expectedMtuSize, static_cast<size_t>(MAX_TIMESTAMP)};
1062 std::vector<SendDataItem> outData;
1063 ContinueToken token = nullptr;
1064 int errCode = static_cast<SyncGenericInterface *>(syncInterface_)->GetSyncData(localWaterMark, MAX_TIMESTAMP,
1065 outData, token, syncDataSizeInfo);
1066 if (token != nullptr) {
1067 static_cast<SyncGenericInterface *>(syncInterface_)->ReleaseContinueToken(token);
1068 token = nullptr;
1069 }
1070 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
1071 LOGE("calculate sync data size failed %d", errCode);
1072 syncInterface_->DecRefCount();
1073 return errCode;
1074 }
1075 uint32_t totalLen = 0;
1076 if (errCode == -E_UNFINISHED) {
1077 totalLen = expectedMtuSize;
1078 } else {
1079 totalLen = GenericSingleVerKvEntry::CalculateLens(outData, SOFTWARE_VERSION_CURRENT);
1080 }
1081 for (auto &entry : outData) {
1082 delete entry;
1083 entry = nullptr;
1084 }
1085 syncInterface_->DecRefCount();
1086 // if larger than 1M, return 1M
1087 size = (totalLen >= expectedMtuSize) ? expectedMtuSize : totalLen;
1088 return E_OK;
1089 }
1090
IsNeedActive(ISyncInterface * syncInterface)1091 bool GenericSyncer::IsNeedActive(ISyncInterface *syncInterface)
1092 {
1093 bool localOnly = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::LOCAL_ONLY, false);
1094 if (localOnly) {
1095 LOGD("[Syncer] Local only db, don't need active syncer");
1096 return false;
1097 }
1098 return true;
1099 }
1100
GetHashDeviceId(const std::string & clientId,std::string & hashDevId) const1101 int GenericSyncer::GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const
1102 {
1103 (void)clientId;
1104 (void)hashDevId;
1105 return -E_NOT_SUPPORT;
1106 }
1107
InitStorageResource(ISyncInterface * syncInterface)1108 int GenericSyncer::InitStorageResource(ISyncInterface *syncInterface)
1109 {
1110 // As metadata_ will be used in EraseDeviceWaterMark, it should not be clear even if engine init failed.
1111 // It will be clear in destructor.
1112 int errCode = InitMetaData(syncInterface);
1113 if (errCode != E_OK) {
1114 return errCode;
1115 }
1116
1117 // As timeHelper_ will be used in GetTimestamp, it should not be clear even if engine init failed.
1118 // It will be clear in destructor.
1119 errCode = InitTimeHelper(syncInterface);
1120 if (errCode != E_OK) {
1121 return errCode;
1122 }
1123
1124 if (!IsNeedActive(syncInterface)) {
1125 return -E_NO_NEED_ACTIVE;
1126 }
1127 return errCode;
1128 }
1129
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)1130 int GenericSyncer::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
1131 {
1132 std::shared_ptr<Metadata> metadata = nullptr;
1133 {
1134 std::lock_guard<std::mutex> autoLock(syncerLock_);
1135 metadata = metadata_;
1136 }
1137 if (metadata == nullptr) {
1138 LOGE("[Syncer] Metadata is not init");
1139 return -E_NOT_INIT;
1140 }
1141 std::string dev;
1142 bool devNeedHash = true;
1143 int errCode = metadata->GetHashDeviceId(device, dev);
1144 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1145 LOGE("[Syncer] Get watermark info failed %d", errCode);
1146 return errCode;
1147 } else if (errCode == E_OK) {
1148 devNeedHash = false;
1149 } else {
1150 dev = device;
1151 }
1152 return metadata->GetWaterMarkInfoFromDB(dev, devNeedHash, info);
1153 }
1154
UpgradeSchemaVerInMeta()1155 int GenericSyncer::UpgradeSchemaVerInMeta()
1156 {
1157 std::shared_ptr<Metadata> metadata = nullptr;
1158 {
1159 std::lock_guard<std::mutex> autoLock(syncerLock_);
1160 metadata = metadata_;
1161 }
1162 if (metadata == nullptr) {
1163 LOGE("[Syncer] metadata is not init");
1164 return -E_NOT_INIT;
1165 }
1166 int errCode = metadata->ClearAllAbilitySyncFinishMark();
1167 if (errCode != E_OK) {
1168 LOGE("[Syncer] clear ability mark failed:%d", errCode);
1169 return errCode;
1170 }
1171 auto [err, localSchemaVer] = metadata->GetLocalSchemaVersion();
1172 if (err != E_OK) {
1173 LOGE("[Syncer] get local schema version failed:%d", err);
1174 return err;
1175 }
1176 errCode = metadata->SetLocalSchemaVersion(localSchemaVer + 1);
1177 if (errCode != E_OK) {
1178 LOGE("[Syncer] increase local schema version failed:%d", errCode);
1179 }
1180 return errCode;
1181 }
1182
ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> & metadata,ISyncInterface & storage)1183 void GenericSyncer::ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage)
1184 {
1185 if (syncEngine_ != nullptr) {
1186 syncEngine_->TimeChange();
1187 }
1188 int errCode = metadata->ClearAllTimeSyncFinishMark();
1189 if (errCode != E_OK) {
1190 LOGW("[GenericSyncer] %s clear time sync finish mark failed %d", label_.c_str(), errCode);
1191 } else {
1192 LOGD("[GenericSyncer] ClearAllTimeSyncFinishMark finish");
1193 RuntimeContext::GetInstance()->ResetDBTimeChangeStatus(storage.GetIdentifier());
1194 }
1195 }
1196
ResetSyncStatus()1197 void GenericSyncer::ResetSyncStatus()
1198 {
1199 std::shared_ptr<Metadata> metadata = nullptr;
1200 {
1201 std::lock_guard<std::mutex> lock(syncerLock_);
1202 if (metadata_ == nullptr) {
1203 return;
1204 }
1205 metadata = metadata_;
1206 }
1207 metadata->ClearAllAbilitySyncFinishMark();
1208 }
1209
GetLocalTimeOffset()1210 int64_t GenericSyncer::GetLocalTimeOffset()
1211 {
1212 std::shared_ptr<Metadata> metadata = nullptr;
1213 {
1214 std::lock_guard<std::mutex> lock(syncerLock_);
1215 if (metadata_ == nullptr) {
1216 return 0;
1217 }
1218 metadata = metadata_;
1219 }
1220 return metadata->GetLocalTimeOffset();
1221 }
1222
GetTaskCount()1223 int32_t GenericSyncer::GetTaskCount()
1224 {
1225 int32_t count = 0;
1226 {
1227 std::lock_guard<std::mutex> autoLock(operationMapLock_);
1228 count += static_cast<int32_t>(syncOperationMap_.size());
1229 }
1230 ISyncEngine *syncEngine = nullptr;
1231 {
1232 std::lock_guard<std::mutex> lock(syncerLock_);
1233 if (syncEngine_ == nullptr) {
1234 return count;
1235 }
1236 syncEngine = syncEngine_;
1237 RefObject::IncObjRef(syncEngine);
1238 }
1239 count += syncEngine->GetResponseTaskCount();
1240 RefObject::DecObjRef(syncEngine);
1241 return count;
1242 }
1243 } // namespace DistributedDB
1244