1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "process_notifier.h"
16 
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22     : syncer_(syncer)
23 {
24     RefObject::IncObjRef(syncer_);
25 }
26 
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29     RefObject::DecObjRef(syncer_);
30 }
31 
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices,const std::vector<std::string> & users)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33     const std::vector<std::string> &devices, const std::vector<std::string> &users)
34 {
35     std::lock_guard<std::mutex> autoLock(processMutex_);
36     InitSyncProcess(tableName, syncProcess_);
37     for (const auto &user : users) {
38         SyncProcess syncProcess;
39         InitSyncProcess(tableName, syncProcess);
40         multiSyncProcess_[user] = syncProcess;
41     }
42     devices_ = devices;
43 }
44 
InitSyncProcess(const std::vector<std::string> & tableName,SyncProcess & syncProcess)45 void ProcessNotifier::InitSyncProcess(const std::vector<std::string> &tableName, SyncProcess &syncProcess)
46 {
47     syncProcess.errCode = OK;
48     syncProcess.process = ProcessStatus::PROCESSING;
49     for (const auto &table: tableName) {
50         TableProcessInfo tableInfo;
51         tableInfo.process = ProcessStatus::PREPARED;
52         syncProcess.tableProcess[table] = tableInfo;
53     }
54 }
55 
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)56 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
57 {
58     if (process.tableName.empty()) {
59         return;
60     }
61     std::lock_guard<std::mutex> autoLock(processMutex_);
62     auto &syncProcess = user_.empty() ? syncProcess_ : multiSyncProcess_[user_];
63     syncProcess.tableProcess[process.tableName].process = process.tableStatus;
64     if (process.downLoadInfo.batchIndex != 0u) {
65         LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
66         syncProcess.tableProcess[process.tableName].downLoadInfo = process.downLoadInfo;
67     }
68     if (process.upLoadInfo.batchIndex != 0u) {
69         LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
70         syncProcess.tableProcess[process.tableName].upLoadInfo = process.upLoadInfo;
71     }
72 }
73 
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)74 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
75     const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
76 {
77     UpdateProcess(process);
78     std::map<std::string, SyncProcess> currentProcess;
79     {
80         std::lock_guard<std::mutex> autoLock(processMutex_);
81         if (!notifyWhenError && taskInfo.errCode != E_OK) {
82             LOGD("[ProcessNotifier] task has error, do not notify now");
83             return;
84         }
85         syncProcess_.errCode = TransferDBErrno(taskInfo.errCode, true);
86         syncProcess_.process = taskInfo.status;
87         multiSyncProcess_[user_].errCode = TransferDBErrno(taskInfo.errCode, true);
88         multiSyncProcess_[user_].process = taskInfo.status;
89         if (user_.empty()) {
90             for (const auto &device : devices_) {
91                 // make sure only one device
92                 currentProcess[device] = syncProcess_;
93             }
94         } else {
95             currentProcess = multiSyncProcess_;
96         }
97     }
98     SyncProcessCallback callback = taskInfo.callback;
99     if (!callback) {
100         LOGD("[ProcessNotifier] task hasn't callback");
101         return;
102     }
103     ICloudSyncer *syncer = syncer_;
104     if (syncer == nullptr) {
105         LOGW("[ProcessNotifier] cancel notify because syncer is nullptr");
106         return; // should not happen
107     }
108     RefObject::IncObjRef(syncer);
109     auto id = syncer->GetIdentify();
110     int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
111         LOGD("[ProcessNotifier] begin notify process");
112         if (syncer->IsClosed()) {
113             LOGI("[ProcessNotifier] db has closed, process return");
114             RefObject::DecObjRef(syncer);
115             return;
116         }
117         callback(currentProcess);
118         RefObject::DecObjRef(syncer);
119         LOGD("[ProcessNotifier] notify process finish");
120     });
121     if (errCode != E_OK) {
122         LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
123     }
124 }
125 
GetDevices() const126 std::vector<std::string> ProcessNotifier::GetDevices() const
127 {
128     return devices_;
129 }
130 
GetUploadBatchIndex(const std::string & tableName) const131 uint32_t ProcessNotifier::GetUploadBatchIndex(const std::string &tableName) const
132 {
133     std::lock_guard<std::mutex> autoLock(processMutex_);
134     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
135     if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
136         return 0u;
137     }
138     return syncProcess.tableProcess.at(tableName).upLoadInfo.batchIndex;
139 }
140 
ResetUploadBatchIndex(const std::string & tableName)141 void ProcessNotifier::ResetUploadBatchIndex(const std::string &tableName)
142 {
143     if (tableName.empty()) {
144         return;
145     }
146     std::lock_guard<std::mutex> autoLock(processMutex_);
147     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
148     if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
149         LOGW("[ProcessNotifier] The specified table was not found when reset UploadBatchIndex");
150         return;
151     }
152     syncProcess.tableProcess[tableName].upLoadInfo.batchIndex = 0;
153 }
154 
GetLastUploadSuccessCount(const std::string & tableName) const155 uint32_t ProcessNotifier::GetLastUploadSuccessCount(const std::string &tableName) const
156 {
157     std::lock_guard<std::mutex> autoLock(processMutex_);
158     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
159     if (syncProcess.tableProcess.find(tableName) == syncProcess_.tableProcess.end()) {
160         return 0u;
161     }
162     return syncProcess.tableProcess.at(tableName).upLoadInfo.successCount;
163 }
164 
GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo & process)165 void ProcessNotifier::GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo &process)
166 {
167     if (process.tableName.empty()) {
168         return;
169     }
170     std::lock_guard<std::mutex> autoLock(processMutex_);
171     SyncProcess syncProcess;
172     if (user_.empty()) {
173         syncProcess = syncProcess_;
174     } else {
175         syncProcess = multiSyncProcess_[user_];
176     }
177 
178     if (syncProcess.tableProcess.find(process.tableName) != syncProcess.tableProcess.end()) {
179         process.downLoadInfo = syncProcess.tableProcess[process.tableName].downLoadInfo;
180     }
181 }
182 
SetUser(const std::string & user)183 void ProcessNotifier::SetUser(const std::string &user)
184 {
185     user_ = user;
186 }
187 
SetAllTableFinish()188 void ProcessNotifier::SetAllTableFinish()
189 {
190     std::lock_guard<std::mutex> autoLock(processMutex_);
191     for (auto &item : syncProcess_.tableProcess) {
192         item.second.process = ProcessStatus::FINISHED;
193     }
194     for (auto &syncProcess : multiSyncProcess_) {
195         for (auto &item : syncProcess.second.tableProcess) {
196             item.second.process = ProcessStatus::FINISHED;
197         }
198     }
199 }
200 
IsMultiUser() const201 bool ProcessNotifier::IsMultiUser() const
202 {
203     return !user_.empty() && multiSyncProcess_.find(user_) != multiSyncProcess_.end();
204 }
205 
GetCurrentTableProcess() const206 std::map<std::string, TableProcessInfo> ProcessNotifier::GetCurrentTableProcess() const
207 {
208     std::lock_guard<std::mutex> autoLock(processMutex_);
209     return syncProcess_.tableProcess;
210 }
211 }
212