1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_cloud_syncer.h"
17 
18 namespace DistributedDB {
VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)19 VirtualCloudSyncer::VirtualCloudSyncer(std::shared_ptr<StorageProxy> storageProxy)
20     : CloudSyncer(storageProxy)
21 {
22 }
23 
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)24 int VirtualCloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
25 {
26     if (!doDownload_) {
27         LOGI("[VirtualCloudSyncer] download just return ok");
28         return E_OK;
29     }
30     if (downloadFunc_) {
31         return downloadFunc_();
32     }
33     return CloudSyncer::DoDownload(taskId, isFirstDownload);
34 }
35 
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)36 int VirtualCloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
37 {
38     if (!doDownload_) {
39         LOGI("[VirtualCloudSyncer] download just return ok");
40         return E_OK;
41     }
42     if (downloadInNeedFunc_) {
43         return downloadInNeedFunc_();
44     }
45     return CloudSyncer::DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
46 }
47 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)48 int VirtualCloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
49 {
50     if (!doUpload_) {
51         LOGI("[VirtualCloudSyncer] upload just return ok");
52         return E_OK;
53     }
54     if (uploadFunc_) {
55         return uploadFunc_();
56     }
57     return CloudSyncer::DoUpload(taskId, lastTable, lockAction);
58 }
59 
SetSyncAction(bool doDownload,bool doUpload)60 void VirtualCloudSyncer::SetSyncAction(bool doDownload, bool doUpload)
61 {
62     doDownload_ = doDownload;
63     doUpload_ = doUpload;
64 }
65 
SetDownloadFunc(const std::function<int ()> & function)66 void VirtualCloudSyncer::SetDownloadFunc(const std::function<int()> &function)
67 {
68     downloadFunc_ = function;
69 }
70 
SetDownloadInNeedFunc(const std::function<int ()> & function)71 void VirtualCloudSyncer::SetDownloadInNeedFunc(const std::function<int()> &function)
72 {
73     downloadInNeedFunc_ = function;
74 }
75 
SetTaskNeedUpload()76 void VirtualCloudSyncer::SetTaskNeedUpload()
77 {
78     currentContext_.isNeedUpload = true;
79 }
80 
SetUploadFunc(const std::function<int ()> & function)81 void VirtualCloudSyncer::SetUploadFunc(const std::function<int()> &function)
82 {
83     uploadFunc_ = function;
84 }
85 
Notify(bool notifyIfError)86 void VirtualCloudSyncer::Notify(bool notifyIfError)
87 {
88     std::lock_guard<std::mutex> autoLock(dataLock_);
89     CloudTaskInfo taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
90     currentContext_.notifier->NotifyProcess(taskInfo, {}, notifyIfError);
91 }
92 
GetQueueCount()93 size_t VirtualCloudSyncer::GetQueueCount()
94 {
95     std::lock_guard<std::mutex> autoLock(dataLock_);
96     return taskQueue_.size();
97 }
98 
SetCurrentTaskInfo(const SyncProcessCallback & callback,CloudSyncer::TaskId taskId)99 void VirtualCloudSyncer::SetCurrentTaskInfo(const SyncProcessCallback &callback,
100     CloudSyncer::TaskId taskId)
101 {
102     std::lock_guard<std::mutex> autoLock(dataLock_);
103     currentContext_.currentTaskId = taskId;
104     currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
105     CloudTaskInfo taskInfo;
106     taskInfo.callback = callback;
107     cloudTaskInfos_[taskId] = taskInfo;
108 }
109 
CallTagStatusByStrategy(bool isExist,const DataInfoWithLog & localInfo,const LogInfo & cloudInfo,OpType & strategyOpResult)110 int VirtualCloudSyncer::CallTagStatusByStrategy(bool isExist, const DataInfoWithLog &localInfo,
111     const LogInfo &cloudInfo, OpType &strategyOpResult)
112 {
113     SyncParam param;
114     DataInfo dataInfo;
115     dataInfo.localInfo = localInfo;
116     dataInfo.cloudLogInfo = cloudInfo;
117     return CloudSyncer::TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
118 }
119 
PauseCurrentTask()120 void VirtualCloudSyncer::PauseCurrentTask()
121 {
122     std::lock_guard<std::mutex> autoLock(dataLock_);
123     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
124         return;
125     }
126     cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
127     LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
128 }
129 }