1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24 int mode, const UserCallback &userCallback, bool isBlockSync)
25 : devices_(devices),
26 syncId_(syncId),
27 mode_(mode),
28 userCallback_(userCallback),
29 isBlockSync_(isBlockSync),
30 isAutoSync_(false),
31 isFinished_(false),
32 semaphore_(nullptr),
33 query_(QuerySyncObject()),
34 isQuerySync_(false),
35 isAutoSubscribe_(false)
36 {
37 }
38
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41 RefObject::DecObjRef(context_);
42 LOGD("SyncOperation::~SyncOperation()");
43 Finalize();
44 }
45
Initialize()46 int SyncOperation::Initialize()
47 {
48 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49 AutoLock lockGuard(this);
50 for (const std::string &deviceId : devices_) {
51 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
52 }
53
54 if (mode_ == AUTO_PUSH) {
55 mode_ = PUSH;
56 isAutoSync_ = true;
57 } else if (mode_ == AUTO_PULL) {
58 mode_ = PULL;
59 isAutoSync_ = true;
60 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
61 mode_ = SUBSCRIBE_QUERY;
62 isAutoSubscribe_ = true;
63 }
64 if (isBlockSync_) {
65 semaphore_ = std::make_unique<SemaphoreUtils>(0);
66 }
67
68 return E_OK;
69 }
70
SetOnSyncFinalize(const OnSyncFinalize & callback)71 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
72 {
73 onFinalize_ = callback;
74 }
75
SetOnSyncFinished(const OnSyncFinished & callback)76 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
77 {
78 onFinished_ = callback;
79 }
80
SetStatus(const std::string & deviceId,int status,int commErrCode)81 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
82 {
83 LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
84 AutoLock lockGuard(this);
85 if (IsKilled()) {
86 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
87 return;
88 }
89 if (isFinished_) {
90 LOGI("[SyncOperation] SetStatus already finished");
91 return;
92 }
93
94 auto iter = statuses_.find(deviceId);
95 if (iter != statuses_.end()) {
96 if (iter->second >= OP_FINISHED_ALL) {
97 return;
98 }
99 iter->second = status;
100 if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
101 return;
102 }
103 commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
104 }
105 }
106
SetUnfinishedDevStatus(int status)107 void SyncOperation::SetUnfinishedDevStatus(int status)
108 {
109 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
110 AutoLock lockGuard(this);
111 if (IsKilled()) {
112 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
113 return;
114 }
115 if (isFinished_) {
116 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
117 return;
118 }
119 for (auto &item : statuses_) {
120 if (item.second >= OP_FINISHED_ALL) {
121 continue;
122 }
123 item.second = status;
124 }
125 }
126
GetStatus(const std::string & deviceId) const127 int SyncOperation::GetStatus(const std::string &deviceId) const
128 {
129 AutoLock lockGuard(this);
130 auto iter = statuses_.find(deviceId);
131 if (iter != statuses_.end()) {
132 return iter->second;
133 }
134 return -E_INVALID_ARGS;
135 }
136
GetSyncId() const137 uint32_t SyncOperation::GetSyncId() const
138 {
139 return syncId_;
140 }
141
GetMode() const142 int SyncOperation::GetMode() const
143 {
144 return mode_;
145 }
146
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)147 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
148 {
149 for (auto &item : finishStatus) {
150 if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
151 continue;
152 }
153 std::string deviceId = item.first;
154 auto iter = commErrCodeMap_.find(deviceId);
155 if (iter != commErrCodeMap_.end()) {
156 item.second = iter->second;
157 }
158 }
159 }
160
Finished()161 void SyncOperation::Finished()
162 {
163 std::map<std::string, int> tmpStatus;
164 {
165 AutoLock lockGuard(this);
166 if (IsKilled() || isFinished_) {
167 return;
168 }
169 isFinished_ = true;
170 tmpStatus = statuses_;
171 ReplaceCommErrCode(tmpStatus);
172 }
173 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
174 if (performance != nullptr) {
175 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
176 }
177 if (userCallback_) {
178 std::string msg = GetFinishDetailMsg(tmpStatus);
179 LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
180 if (IsBlockSync()) {
181 userCallback_(tmpStatus);
182 } else {
183 RefObject::IncObjRef(this);
184 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
185 userCallback_(tmpStatus);
186 RefObject::DecObjRef(this);
187 });
188 if (errCode != E_OK) {
189 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
190 RefObject::DecObjRef(this);
191 }
192 }
193 }
194 if (onFinished_) {
195 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
196 onFinished_(syncId_);
197 }
198 }
199
GetDevices() const200 const std::vector<std::string> &SyncOperation::GetDevices() const
201 {
202 return devices_;
203 }
204
WaitIfNeed()205 void SyncOperation::WaitIfNeed()
206 {
207 if (isBlockSync_ && (semaphore_ != nullptr)) {
208 LOGD("[SyncOperation] Wait.");
209 semaphore_->WaitSemaphore();
210 }
211 }
212
NotifyIfNeed()213 void SyncOperation::NotifyIfNeed()
214 {
215 if (isBlockSync_ && (semaphore_ != nullptr)) {
216 LOGD("[SyncOperation] Notify.");
217 semaphore_->SendSemaphore();
218 }
219 }
220
IsAutoSync() const221 bool SyncOperation::IsAutoSync() const
222 {
223 return isAutoSync_;
224 }
225
IsBlockSync() const226 bool SyncOperation::IsBlockSync() const
227 {
228 return isBlockSync_;
229 }
230
IsAutoControlCmd() const231 bool SyncOperation::IsAutoControlCmd() const
232 {
233 return isAutoSubscribe_;
234 }
235
SetSyncContext(RefObject * context)236 void SyncOperation::SetSyncContext(RefObject *context)
237 {
238 RefObject::DecObjRef(context_);
239 context_ = context;
240 RefObject::IncObjRef(context);
241 }
242
CheckIsAllFinished() const243 bool SyncOperation::CheckIsAllFinished() const
244 {
245 AutoLock lockGuard(this);
246 for (const auto &iter : statuses_) {
247 if (iter.second < OP_FINISHED_ALL) {
248 return false;
249 }
250 }
251 return true;
252 }
253
Finalize()254 void SyncOperation::Finalize()
255 {
256 if ((syncId_ > 0) && onFinalize_) {
257 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
258 onFinalize_();
259 }
260 }
261
SetQuery(const QuerySyncObject & query)262 void SyncOperation::SetQuery(const QuerySyncObject &query)
263 {
264 std::lock_guard<std::mutex> lock(queryMutex_);
265 query_ = query;
266 isQuerySync_ = true;
267 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
268 mode_ += QUERY_SYNC_MODE_BASE;
269 }
270 }
271
GetQuery(QuerySyncObject & targetObject) const272 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
273 {
274 std::lock_guard<std::mutex> lock(queryMutex_);
275 targetObject = query_;
276 }
277
IsQuerySync() const278 bool SyncOperation::IsQuerySync() const
279 {
280 return isQuerySync_;
281 }
282
SetIdentifier(const std::vector<uint8_t> & identifier)283 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
284 {
285 identifier_.assign(identifier.begin(), identifier.end());
286 }
287
288 namespace {
289 struct SyncTypeNode {
290 int mode = static_cast<int>(SyncModeType::INVALID_MODE);
291 SyncType type = SyncType::INVALID_SYNC_TYPE;
292 };
293 struct SyncOperationStatusNode {
294 int operationStatus = 0;
295 DBStatus status = DBStatus::DB_ERROR;
296 };
297 }
298
GetSyncType(int mode)299 SyncType SyncOperation::GetSyncType(int mode)
300 {
301 static const SyncTypeNode syncTypeNodes[] = {
302 {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
303 {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
304 {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
305 {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
306 {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
307 {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
308 {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
309 {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
310 {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
311 };
312 const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
313 return node.mode == mode;
314 });
315 return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
316 }
317
TransferSyncMode(int mode)318 int SyncOperation::TransferSyncMode(int mode)
319 {
320 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
321 // so for the three mode, it is no need to transferred.
322 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
323 return (mode - QUERY_SYNC_MODE_BASE);
324 }
325 return mode;
326 }
327
GetQueryId() const328 std::string SyncOperation::GetQueryId() const
329 {
330 std::lock_guard<std::mutex> lock(queryMutex_);
331 return query_.GetIdentify();
332 }
333
DBStatusTrans(int operationStatus)334 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
335 {
336 static const SyncOperationStatusNode syncOperationStatusNodes[] = {
337 { static_cast<int>(OP_FINISHED_ALL), OK },
338 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
339 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
340 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
341 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
342 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
343 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
344 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
345 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
346 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
347 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
348 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
349 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
350 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
351 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
352 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
353 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
354 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
355 { static_cast<int>(OP_FAILED), DB_ERROR },
356 };
357 const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
358 [operationStatus](const auto &node) {
359 return node.operationStatus == operationStatus;
360 });
361 return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
362 }
363
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)364 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
365 {
366 std::string msg = "Sync detail is:";
367 for (const auto &[dev, status]: finishStatus) {
368 msg += "dev=" + DBCommon::StringMasking(dev);
369 if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
370 msg += " sync failed, reason is " + std::to_string(status);
371 } else {
372 msg += " sync success";
373 }
374 msg += " ";
375 }
376 msg.pop_back();
377 return msg;
378 }
379 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
380 } // namespace DistributedDB