1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "process_notifier.h"
16
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22 : syncer_(syncer)
23 {
24 RefObject::IncObjRef(syncer_);
25 }
26
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29 RefObject::DecObjRef(syncer_);
30 }
31
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices,const std::vector<std::string> & users)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33 const std::vector<std::string> &devices, const std::vector<std::string> &users)
34 {
35 std::lock_guard<std::mutex> autoLock(processMutex_);
36 InitSyncProcess(tableName, syncProcess_);
37 for (const auto &user : users) {
38 SyncProcess syncProcess;
39 InitSyncProcess(tableName, syncProcess);
40 multiSyncProcess_[user] = syncProcess;
41 }
42 devices_ = devices;
43 }
44
InitSyncProcess(const std::vector<std::string> & tableName,SyncProcess & syncProcess)45 void ProcessNotifier::InitSyncProcess(const std::vector<std::string> &tableName, SyncProcess &syncProcess)
46 {
47 syncProcess.errCode = OK;
48 syncProcess.process = ProcessStatus::PROCESSING;
49 for (const auto &table: tableName) {
50 TableProcessInfo tableInfo;
51 tableInfo.process = ProcessStatus::PREPARED;
52 syncProcess.tableProcess[table] = tableInfo;
53 }
54 }
55
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)56 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
57 {
58 if (process.tableName.empty()) {
59 return;
60 }
61 std::lock_guard<std::mutex> autoLock(processMutex_);
62 auto &syncProcess = user_.empty() ? syncProcess_ : multiSyncProcess_[user_];
63 syncProcess.tableProcess[process.tableName].process = process.tableStatus;
64 if (process.downLoadInfo.batchIndex != 0u) {
65 LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
66 syncProcess.tableProcess[process.tableName].downLoadInfo = process.downLoadInfo;
67 }
68 if (process.upLoadInfo.batchIndex != 0u) {
69 LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
70 syncProcess.tableProcess[process.tableName].upLoadInfo = process.upLoadInfo;
71 }
72 }
73
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)74 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
75 const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
76 {
77 UpdateProcess(process);
78 std::map<std::string, SyncProcess> currentProcess;
79 {
80 std::lock_guard<std::mutex> autoLock(processMutex_);
81 if (!notifyWhenError && taskInfo.errCode != E_OK) {
82 LOGD("[ProcessNotifier] task has error, do not notify now");
83 return;
84 }
85 syncProcess_.errCode = TransferDBErrno(taskInfo.errCode, true);
86 syncProcess_.process = taskInfo.status;
87 multiSyncProcess_[user_].errCode = TransferDBErrno(taskInfo.errCode, true);
88 multiSyncProcess_[user_].process = taskInfo.status;
89 if (user_.empty()) {
90 for (const auto &device : devices_) {
91 // make sure only one device
92 currentProcess[device] = syncProcess_;
93 }
94 } else {
95 currentProcess = multiSyncProcess_;
96 }
97 }
98 SyncProcessCallback callback = taskInfo.callback;
99 if (!callback) {
100 LOGD("[ProcessNotifier] task hasn't callback");
101 return;
102 }
103 ICloudSyncer *syncer = syncer_;
104 if (syncer == nullptr) {
105 LOGW("[ProcessNotifier] cancel notify because syncer is nullptr");
106 return; // should not happen
107 }
108 RefObject::IncObjRef(syncer);
109 auto id = syncer->GetIdentify();
110 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
111 LOGD("[ProcessNotifier] begin notify process");
112 if (syncer->IsClosed()) {
113 LOGI("[ProcessNotifier] db has closed, process return");
114 RefObject::DecObjRef(syncer);
115 return;
116 }
117 callback(currentProcess);
118 RefObject::DecObjRef(syncer);
119 LOGD("[ProcessNotifier] notify process finish");
120 });
121 if (errCode != E_OK) {
122 LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
123 }
124 }
125
GetDevices() const126 std::vector<std::string> ProcessNotifier::GetDevices() const
127 {
128 return devices_;
129 }
130
GetUploadBatchIndex(const std::string & tableName) const131 uint32_t ProcessNotifier::GetUploadBatchIndex(const std::string &tableName) const
132 {
133 std::lock_guard<std::mutex> autoLock(processMutex_);
134 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
135 if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
136 return 0u;
137 }
138 return syncProcess.tableProcess.at(tableName).upLoadInfo.batchIndex;
139 }
140
ResetUploadBatchIndex(const std::string & tableName)141 void ProcessNotifier::ResetUploadBatchIndex(const std::string &tableName)
142 {
143 if (tableName.empty()) {
144 return;
145 }
146 std::lock_guard<std::mutex> autoLock(processMutex_);
147 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
148 if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
149 LOGW("[ProcessNotifier] The specified table was not found when reset UploadBatchIndex");
150 return;
151 }
152 syncProcess.tableProcess[tableName].upLoadInfo.batchIndex = 0;
153 }
154
GetLastUploadSuccessCount(const std::string & tableName) const155 uint32_t ProcessNotifier::GetLastUploadSuccessCount(const std::string &tableName) const
156 {
157 std::lock_guard<std::mutex> autoLock(processMutex_);
158 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
159 if (syncProcess.tableProcess.find(tableName) == syncProcess_.tableProcess.end()) {
160 return 0u;
161 }
162 return syncProcess.tableProcess.at(tableName).upLoadInfo.successCount;
163 }
164
GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo & process)165 void ProcessNotifier::GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo &process)
166 {
167 if (process.tableName.empty()) {
168 return;
169 }
170 std::lock_guard<std::mutex> autoLock(processMutex_);
171 SyncProcess syncProcess;
172 if (user_.empty()) {
173 syncProcess = syncProcess_;
174 } else {
175 syncProcess = multiSyncProcess_[user_];
176 }
177
178 if (syncProcess.tableProcess.find(process.tableName) != syncProcess.tableProcess.end()) {
179 process.downLoadInfo = syncProcess.tableProcess[process.tableName].downLoadInfo;
180 }
181 }
182
SetUser(const std::string & user)183 void ProcessNotifier::SetUser(const std::string &user)
184 {
185 user_ = user;
186 }
187
SetAllTableFinish()188 void ProcessNotifier::SetAllTableFinish()
189 {
190 std::lock_guard<std::mutex> autoLock(processMutex_);
191 for (auto &item : syncProcess_.tableProcess) {
192 item.second.process = ProcessStatus::FINISHED;
193 }
194 for (auto &syncProcess : multiSyncProcess_) {
195 for (auto &item : syncProcess.second.tableProcess) {
196 item.second.process = ProcessStatus::FINISHED;
197 }
198 }
199 }
200
IsMultiUser() const201 bool ProcessNotifier::IsMultiUser() const
202 {
203 return !user_.empty() && multiSyncProcess_.find(user_) != multiSyncProcess_.end();
204 }
205
GetCurrentTableProcess() const206 std::map<std::string, TableProcessInfo> ProcessNotifier::GetCurrentTableProcess() const
207 {
208 std::lock_guard<std::mutex> autoLock(processMutex_);
209 return syncProcess_.tableProcess;
210 }
211 }
212