1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21 
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24     int mode, const UserCallback &userCallback, bool isBlockSync)
25     : devices_(devices),
26       syncId_(syncId),
27       mode_(mode),
28       userCallback_(userCallback),
29       isBlockSync_(isBlockSync),
30       isAutoSync_(false),
31       isFinished_(false),
32       semaphore_(nullptr),
33       query_(QuerySyncObject()),
34       isQuerySync_(false),
35       isAutoSubscribe_(false)
36 {
37 }
38 
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41     RefObject::DecObjRef(context_);
42     LOGD("SyncOperation::~SyncOperation()");
43     Finalize();
44 }
45 
Initialize()46 int SyncOperation::Initialize()
47 {
48     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49     AutoLock lockGuard(this);
50     for (const std::string &deviceId : devices_) {
51         statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
52     }
53 
54     if (mode_ == AUTO_PUSH) {
55         mode_ = PUSH;
56         isAutoSync_ = true;
57     } else if (mode_ == AUTO_PULL) {
58         mode_ = PULL;
59         isAutoSync_ = true;
60     } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
61         mode_ = SUBSCRIBE_QUERY;
62         isAutoSubscribe_ = true;
63     }
64     if (isBlockSync_) {
65         semaphore_ = std::make_unique<SemaphoreUtils>(0);
66     }
67 
68     return E_OK;
69 }
70 
SetOnSyncFinalize(const OnSyncFinalize & callback)71 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
72 {
73     onFinalize_ = callback;
74 }
75 
SetOnSyncFinished(const OnSyncFinished & callback)76 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
77 {
78     onFinished_ = callback;
79 }
80 
SetStatus(const std::string & deviceId,int status,int commErrCode)81 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
82 {
83     LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
84     AutoLock lockGuard(this);
85     if (IsKilled()) {
86         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
87         return;
88     }
89     if (isFinished_) {
90         LOGI("[SyncOperation] SetStatus already finished");
91         return;
92     }
93 
94     auto iter = statuses_.find(deviceId);
95     if (iter != statuses_.end()) {
96         if (iter->second >= OP_FINISHED_ALL) {
97             return;
98         }
99         iter->second = status;
100         if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
101             return;
102         }
103         commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
104     }
105 }
106 
SetUnfinishedDevStatus(int status)107 void SyncOperation::SetUnfinishedDevStatus(int status)
108 {
109     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
110     AutoLock lockGuard(this);
111     if (IsKilled()) {
112         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
113         return;
114     }
115     if (isFinished_) {
116         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
117         return;
118     }
119     for (auto &item : statuses_) {
120         if (item.second >= OP_FINISHED_ALL) {
121             continue;
122         }
123         item.second = status;
124     }
125 }
126 
GetStatus(const std::string & deviceId) const127 int SyncOperation::GetStatus(const std::string &deviceId) const
128 {
129     AutoLock lockGuard(this);
130     auto iter = statuses_.find(deviceId);
131     if (iter != statuses_.end()) {
132         return iter->second;
133     }
134     return -E_INVALID_ARGS;
135 }
136 
GetSyncId() const137 uint32_t SyncOperation::GetSyncId() const
138 {
139     return syncId_;
140 }
141 
GetMode() const142 int SyncOperation::GetMode() const
143 {
144     return mode_;
145 }
146 
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)147 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
148 {
149     for (auto &item : finishStatus) {
150         if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
151             continue;
152         }
153         std::string deviceId = item.first;
154         auto iter = commErrCodeMap_.find(deviceId);
155         if (iter != commErrCodeMap_.end()) {
156             item.second = iter->second;
157         }
158     }
159 }
160 
Finished()161 void SyncOperation::Finished()
162 {
163     std::map<std::string, int> tmpStatus;
164     {
165         AutoLock lockGuard(this);
166         if (IsKilled() || isFinished_) {
167             return;
168         }
169         isFinished_ = true;
170         tmpStatus = statuses_;
171         ReplaceCommErrCode(tmpStatus);
172     }
173     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
174     if (performance != nullptr) {
175         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
176     }
177     if (userCallback_) {
178         std::string msg = GetFinishDetailMsg(tmpStatus);
179         LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
180         if (IsBlockSync()) {
181             userCallback_(tmpStatus);
182         } else {
183             RefObject::IncObjRef(this);
184             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
185                 userCallback_(tmpStatus);
186                 RefObject::DecObjRef(this);
187             });
188             if (errCode != E_OK) {
189                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
190                 RefObject::DecObjRef(this);
191             }
192         }
193     }
194     if (onFinished_) {
195         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
196         onFinished_(syncId_);
197     }
198 }
199 
GetDevices() const200 const std::vector<std::string> &SyncOperation::GetDevices() const
201 {
202     return devices_;
203 }
204 
WaitIfNeed()205 void SyncOperation::WaitIfNeed()
206 {
207     if (isBlockSync_ && (semaphore_ != nullptr)) {
208         LOGD("[SyncOperation] Wait.");
209         semaphore_->WaitSemaphore();
210     }
211 }
212 
NotifyIfNeed()213 void SyncOperation::NotifyIfNeed()
214 {
215     if (isBlockSync_ && (semaphore_ != nullptr)) {
216         LOGD("[SyncOperation] Notify.");
217         semaphore_->SendSemaphore();
218     }
219 }
220 
IsAutoSync() const221 bool SyncOperation::IsAutoSync() const
222 {
223     return isAutoSync_;
224 }
225 
IsBlockSync() const226 bool SyncOperation::IsBlockSync() const
227 {
228     return isBlockSync_;
229 }
230 
IsAutoControlCmd() const231 bool SyncOperation::IsAutoControlCmd() const
232 {
233     return isAutoSubscribe_;
234 }
235 
SetSyncContext(RefObject * context)236 void SyncOperation::SetSyncContext(RefObject *context)
237 {
238     RefObject::DecObjRef(context_);
239     context_ = context;
240     RefObject::IncObjRef(context);
241 }
242 
CheckIsAllFinished() const243 bool SyncOperation::CheckIsAllFinished() const
244 {
245     AutoLock lockGuard(this);
246     for (const auto &iter : statuses_) {
247         if (iter.second < OP_FINISHED_ALL) {
248             return false;
249         }
250     }
251     return true;
252 }
253 
Finalize()254 void SyncOperation::Finalize()
255 {
256     if ((syncId_ > 0) && onFinalize_) {
257         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
258         onFinalize_();
259     }
260 }
261 
SetQuery(const QuerySyncObject & query)262 void SyncOperation::SetQuery(const QuerySyncObject &query)
263 {
264     std::lock_guard<std::mutex> lock(queryMutex_);
265     query_ = query;
266     isQuerySync_ = true;
267     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
268         mode_ += QUERY_SYNC_MODE_BASE;
269     }
270 }
271 
GetQuery(QuerySyncObject & targetObject) const272 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
273 {
274     std::lock_guard<std::mutex> lock(queryMutex_);
275     targetObject = query_;
276 }
277 
IsQuerySync() const278 bool SyncOperation::IsQuerySync() const
279 {
280     return isQuerySync_;
281 }
282 
SetIdentifier(const std::vector<uint8_t> & identifier)283 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
284 {
285     identifier_.assign(identifier.begin(), identifier.end());
286 }
287 
288 namespace {
289 struct SyncTypeNode {
290     int mode = static_cast<int>(SyncModeType::INVALID_MODE);
291     SyncType type = SyncType::INVALID_SYNC_TYPE;
292 };
293 struct SyncOperationStatusNode {
294     int operationStatus = 0;
295     DBStatus status = DBStatus::DB_ERROR;
296 };
297 }
298 
GetSyncType(int mode)299 SyncType SyncOperation::GetSyncType(int mode)
300 {
301     static const SyncTypeNode syncTypeNodes[] = {
302         {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
303         {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
304         {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
305         {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
306         {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
307         {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
308         {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
309         {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
310         {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
311     };
312     const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
313         return node.mode == mode;
314     });
315     return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
316 }
317 
TransferSyncMode(int mode)318 int SyncOperation::TransferSyncMode(int mode)
319 {
320     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
321     // so for the three mode, it is no need to transferred.
322     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
323         return (mode - QUERY_SYNC_MODE_BASE);
324     }
325     return mode;
326 }
327 
GetQueryId() const328 std::string SyncOperation::GetQueryId() const
329 {
330     std::lock_guard<std::mutex> lock(queryMutex_);
331     return query_.GetIdentify();
332 }
333 
DBStatusTrans(int operationStatus)334 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
335 {
336     static const SyncOperationStatusNode syncOperationStatusNodes[] = {
337         { static_cast<int>(OP_FINISHED_ALL),                  OK },
338         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
339         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
340         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
341         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
342         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
343         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
344         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
345         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
346         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
347         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
348         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
349         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
350         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
351         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
352         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
353         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
354         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
355         { static_cast<int>(OP_FAILED),                        DB_ERROR },
356     };
357     const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
358         [operationStatus](const auto &node) {
359             return node.operationStatus == operationStatus;
360         });
361     return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
362 }
363 
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)364 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
365 {
366     std::string msg = "Sync detail is:";
367     for (const auto &[dev, status]: finishStatus) {
368         msg += "dev=" + DBCommon::StringMasking(dev);
369         if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
370             msg += " sync failed, reason is " + std::to_string(status);
371         } else {
372             msg += " sync success";
373         }
374         msg += " ";
375     }
376     msg.pop_back();
377     return msg;
378 }
379 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
380 } // namespace DistributedDB