1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_cloud_syncer.h"
17
18 namespace DistributedDB {
VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)19 VirtualCloudSyncer::VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)
20 : CloudSyncer(storageProxy)
21 {
22 }
23
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)24 int VirtualCloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
25 {
26 if (!doDownload_) {
27 LOGI("[VirtualCloudSyncer] download just return ok");
28 return E_OK;
29 }
30 if (downloadFunc_) {
31 return downloadFunc_();
32 }
33 return CloudSyncer::DoDownload(taskId, isFirstDownload);
34 }
35
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)36 int VirtualCloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
37 {
38 if (!doDownload_) {
39 LOGI("[VirtualCloudSyncer] download just return ok");
40 return E_OK;
41 }
42 if (downloadInNeedFunc_) {
43 return downloadInNeedFunc_();
44 }
45 return CloudSyncer::DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
46 }
47
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)48 int VirtualCloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
49 {
50 if (!doUpload_) {
51 LOGI("[VirtualCloudSyncer] upload just return ok");
52 return E_OK;
53 }
54 if (uploadFunc_) {
55 return uploadFunc_();
56 }
57 return CloudSyncer::DoUpload(taskId, lastTable, lockAction);
58 }
59
SetSyncAction(bool doDownload,bool doUpload)60 void VirtualCloudSyncer::SetSyncAction(bool doDownload, bool doUpload)
61 {
62 doDownload_ = doDownload;
63 doUpload_ = doUpload;
64 }
65
SetDownloadFunc(const std::function<int ()> & function)66 void VirtualCloudSyncer::SetDownloadFunc(const std::function<int()> &function)
67 {
68 downloadFunc_ = function;
69 }
70
SetDownloadInNeedFunc(const std::function<int ()> & function)71 void VirtualCloudSyncer::SetDownloadInNeedFunc(const std::function<int()> &function)
72 {
73 downloadInNeedFunc_ = function;
74 }
75
SetTaskNeedUpload()76 void VirtualCloudSyncer::SetTaskNeedUpload()
77 {
78 currentContext_.isNeedUpload = true;
79 }
80
SetUploadFunc(const std::function<int ()> & function)81 void VirtualCloudSyncer::SetUploadFunc(const std::function<int()> &function)
82 {
83 uploadFunc_ = function;
84 }
85
Notify(bool notifyIfError)86 void VirtualCloudSyncer::Notify(bool notifyIfError)
87 {
88 std::lock_guard<std::mutex> autoLock(dataLock_);
89 CloudTaskInfo taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
90 currentContext_.notifier->NotifyProcess(taskInfo, {}, notifyIfError);
91 }
92
GetQueueCount()93 size_t VirtualCloudSyncer::GetQueueCount()
94 {
95 std::lock_guard<std::mutex> autoLock(dataLock_);
96 return taskQueue_.size();
97 }
98
SetCurrentTaskInfo(const SyncProcessCallback & callback,CloudSyncer::TaskId taskId)99 void VirtualCloudSyncer::SetCurrentTaskInfo(const SyncProcessCallback &callback,
100 CloudSyncer::TaskId taskId)
101 {
102 std::lock_guard<std::mutex> autoLock(dataLock_);
103 currentContext_.currentTaskId = taskId;
104 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
105 CloudTaskInfo taskInfo;
106 taskInfo.callback = callback;
107 cloudTaskInfos_[taskId] = taskInfo;
108 }
109
CallTagStatusByStrategy(bool isExist,const DataInfoWithLog & localInfo,const LogInfo & cloudInfo,OpType & strategyOpResult)110 int VirtualCloudSyncer::CallTagStatusByStrategy(bool isExist, const DataInfoWithLog &localInfo,
111 const LogInfo &cloudInfo, OpType &strategyOpResult)
112 {
113 SyncParam param;
114 DataInfo dataInfo;
115 dataInfo.localInfo = localInfo;
116 dataInfo.cloudLogInfo = cloudInfo;
117 return CloudSyncer::TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
118 }
119
PauseCurrentTask()120 void VirtualCloudSyncer::PauseCurrentTask()
121 {
122 std::lock_guard<std::mutex> autoLock(dataLock_);
123 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
124 return;
125 }
126 cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
127 LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
128 }
129 }