1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb_connection.h"
17
18 #include "log_print.h"
19 #include "db_errno.h"
20 #include "db_constant.h"
21 #include "kvdb_pragma.h"
22 #include "performance_analysis.h"
23 #include "runtime_context.h"
24 #include "sync_able_kvdb.h"
25
26 namespace DistributedDB {
SyncAbleKvDBConnection(SyncAbleKvDB * kvDB)27 SyncAbleKvDBConnection::SyncAbleKvDBConnection(SyncAbleKvDB *kvDB)
28 : GenericKvDBConnection(kvDB),
29 remotePushFinishedListener_(nullptr)
30 {
31 OnKill([this]() {
32 auto *db = GetDB<SyncAbleKvDB>();
33 if (db == nullptr) {
34 return;
35 }
36 // Drop the lock before we call RemoveSyncOperation().
37 UnlockObj();
38 db->StopSync(GetConnectionId());
39 LockObj();
40 });
41 }
42
~SyncAbleKvDBConnection()43 SyncAbleKvDBConnection::~SyncAbleKvDBConnection()
44 {
45 if (remotePushFinishedListener_ != nullptr) {
46 remotePushFinishedListener_->Drop(true);
47 }
48 remotePushFinishedListener_ = nullptr;
49 }
50
InitPragmaFunc()51 void SyncAbleKvDBConnection::InitPragmaFunc()
52 {
53 if (!pragmaFunc_.empty()) {
54 return;
55 }
56 pragmaFunc_ = {
57 {PRAGMA_SYNC_DEVICES, [this](void *parameter, int &errCode) {
58 errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
59 {PRAGMA_AUTO_SYNC, [this](void *parameter, int &errCode) {
60 errCode = EnableAutoSync(*(static_cast<bool *>(parameter))); }},
61 {PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT, [](void *parameter, int &errCode) {
62 *(static_cast<std::string *>(parameter)) = PerformanceAnalysis::GetInstance()->GetStatistics(); }},
63 {PRAGMA_PERFORMANCE_ANALYSIS_OPEN, [](void *parameter, int &errCode) {
64 PerformanceAnalysis::GetInstance()->OpenPerformanceAnalysis(); }},
65 {PRAGMA_PERFORMANCE_ANALYSIS_CLOSE, [](void *parameter, int &errCode) {
66 PerformanceAnalysis::GetInstance()->ClosePerformanceAnalysis(); }},
67 {PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME, [](void *parameter, int &errCode) {
68 PerformanceAnalysis::GetInstance()->SetFileName(*(static_cast<std::string *>(parameter))); }},
69 {PRAGMA_GET_QUEUED_SYNC_SIZE, [this](void *parameter, int &errCode) {
70 errCode = GetQueuedSyncSize(static_cast<int *>(parameter)); }},
71 {PRAGMA_SET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
72 errCode = SetQueuedSyncLimit(static_cast<int *>(parameter)); }},
73 {PRAGMA_GET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
74 errCode = GetQueuedSyncLimit(static_cast<int *>(parameter)); }},
75 {PRAGMA_SET_WIPE_POLICY, [this](void *parameter, int &errCode) {
76 errCode = SetStaleDataWipePolicy(static_cast<WipePolicy *>(parameter)); }},
77 {PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, [this](void *parameter, int &errCode) {
78 errCode = SetRemotePushFinishedNotify(static_cast<PragmaRemotePushNotify *>(parameter)); }},
79 {PRAGMA_SET_SYNC_RETRY, [this](void *parameter, int &errCode) {
80 errCode = SetSyncRetry(*(static_cast<bool *>(parameter))); }},
81 {PRAGMA_ADD_EQUAL_IDENTIFIER, [this](void *parameter, int &errCode) {
82 errCode = SetEqualIdentifier(static_cast<PragmaSetEqualIdentifier *>(parameter)); }},
83 {PRAGMA_INTERCEPT_SYNC_DATA, [this](void *parameter, int &errCode) {
84 errCode = SetPushDataInterceptor(*static_cast<PushDataInterceptor *>(parameter)); }},
85 {PRAGMA_SUBSCRIBE_QUERY, [this](void *parameter, int &errCode) {
86 errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
87 };
88 }
89
Pragma(int cmd,void * parameter)90 int SyncAbleKvDBConnection::Pragma(int cmd, void *parameter)
91 {
92 int errCode = PragmaParamCheck(cmd, parameter);
93 if (errCode != E_OK) {
94 return -E_INVALID_ARGS;
95 }
96
97 InitPragmaFunc();
98 auto iter = pragmaFunc_.find(cmd);
99 if (iter != pragmaFunc_.end()) {
100 iter->second(parameter, errCode);
101 return errCode;
102 }
103
104 // Call Pragma() of super class.
105 return GenericKvDBConnection::Pragma(cmd, parameter);
106 }
107
PragmaParamCheck(int cmd,const void * parameter)108 int SyncAbleKvDBConnection::PragmaParamCheck(int cmd, const void *parameter)
109 {
110 switch (cmd) {
111 case PRAGMA_AUTO_SYNC:
112 case PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT:
113 case PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME:
114 if (parameter == nullptr) {
115 return -E_INVALID_ARGS;
116 }
117 return E_OK;
118 default:
119 return E_OK;
120 }
121 }
122
PragmaSyncAction(const PragmaSync * syncParameter)123 int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
124 {
125 if (syncParameter == nullptr) {
126 return -E_INVALID_ARGS;
127 }
128 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
129 if (kvDB == nullptr) {
130 return -E_INVALID_CONNECTION;
131 }
132
133 if (isExclusive_.load()) {
134 return -E_BUSY;
135 }
136 {
137 AutoLock lockGuard(this);
138 if (IsKilled()) {
139 // If this happens, users are using a closed connection.
140 LOGE("Pragma sync on a closed connection.");
141 return -E_STALE;
142 }
143 IncObjRef(this);
144 }
145
146 ISyncer::SyncParma syncParam;
147 syncParam.devices = syncParameter->devices_;
148 syncParam.mode = syncParameter->mode_;
149 syncParam.wait = syncParameter->wait_;
150 syncParam.isQuerySync = syncParameter->isQuerySync_;
151 syncParam.syncQuery = syncParameter->query_;
152 syncParam.onFinalize = [this]() { DecObjRef(this); };
153 syncParam.onComplete = std::bind(&SyncAbleKvDBConnection::OnSyncComplete, this, std::placeholders::_1,
154 syncParameter->onComplete_, syncParameter->wait_);
155 int errCode = kvDB->Sync(syncParam, GetConnectionId());
156 if (errCode != E_OK) {
157 DecObjRef(this);
158 }
159 return errCode;
160 }
161
EnableAutoSync(bool enable)162 int SyncAbleKvDBConnection::EnableAutoSync(bool enable)
163 {
164 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
165 if (kvDB == nullptr) {
166 return -E_INVALID_CONNECTION;
167 }
168 kvDB->EnableAutoSync(enable);
169 return E_OK;
170 }
171
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,int> & devicesMap)> & onComplete,bool wait)172 void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &statuses,
173 const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait)
174 {
175 AutoLock lockGuard(this);
176 if (!IsKilled() && onComplete) {
177 // Drop the lock before invoking the callback.
178 // Do pragma-sync again in the prev sync callback is supported.
179 UnlockObj();
180 // The connection may be closed after UnlockObj().
181 // RACE: 'KillObj()' against 'onComplete()'.
182 if (!IsKilled()) {
183 onComplete(statuses);
184 }
185 LockObj();
186 }
187 }
188
GetQueuedSyncSize(int * queuedSyncSize) const189 int SyncAbleKvDBConnection::GetQueuedSyncSize(int *queuedSyncSize) const
190 {
191 if (queuedSyncSize == nullptr) {
192 return -E_INVALID_ARGS;
193 }
194 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
195 if (kvDB == nullptr) {
196 return -E_INVALID_CONNECTION;
197 }
198 return kvDB->GetQueuedSyncSize(queuedSyncSize);
199 }
200
SetQueuedSyncLimit(const int * queuedSyncLimit)201 int SyncAbleKvDBConnection::SetQueuedSyncLimit(const int *queuedSyncLimit)
202 {
203 if (queuedSyncLimit == nullptr) {
204 return -E_INVALID_ARGS;
205 }
206 if ((*queuedSyncLimit > DBConstant::QUEUED_SYNC_LIMIT_MAX) ||
207 (*queuedSyncLimit < DBConstant::QUEUED_SYNC_LIMIT_MIN)) {
208 return -E_INVALID_ARGS;
209 }
210 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
211 if (kvDB == nullptr) {
212 return -E_INVALID_CONNECTION;
213 }
214 return kvDB->SetQueuedSyncLimit(queuedSyncLimit);
215 }
216
GetQueuedSyncLimit(int * queuedSyncLimit) const217 int SyncAbleKvDBConnection::GetQueuedSyncLimit(int *queuedSyncLimit) const
218 {
219 if (queuedSyncLimit == nullptr) {
220 return -E_INVALID_ARGS;
221 }
222 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
223 if (kvDB == nullptr) {
224 return -E_INVALID_CONNECTION;
225 }
226 return kvDB->GetQueuedSyncLimit(queuedSyncLimit);
227 }
228
DisableManualSync(void)229 int SyncAbleKvDBConnection::DisableManualSync(void)
230 {
231 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
232 if (kvDB == nullptr) {
233 return -E_INVALID_CONNECTION;
234 }
235 return kvDB->DisableManualSync();
236 }
237
EnableManualSync(void)238 int SyncAbleKvDBConnection::EnableManualSync(void)
239 {
240 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
241 if (kvDB == nullptr) {
242 return -E_INVALID_CONNECTION;
243 }
244 return kvDB->EnableManualSync();
245 }
246
SetStaleDataWipePolicy(const WipePolicy * policy)247 int SyncAbleKvDBConnection::SetStaleDataWipePolicy(const WipePolicy *policy)
248 {
249 if (policy == nullptr) {
250 return -E_INVALID_ARGS;
251 }
252 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
253 if (kvDB == nullptr) {
254 return -E_INVALID_CONNECTION;
255 }
256 return kvDB->SetStaleDataWipePolicy(*policy);
257 }
258
SetRemotePushFinishedNotify(PragmaRemotePushNotify * notifyParma)259 int SyncAbleKvDBConnection::SetRemotePushFinishedNotify(PragmaRemotePushNotify *notifyParma)
260 {
261 if (notifyParma == nullptr) {
262 return -E_INVALID_ARGS;
263 }
264
265 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
266 if (kvDB == nullptr) {
267 return -E_INVALID_CONNECTION;
268 }
269
270 int errCode = E_OK;
271 NotificationChain::Listener *tmpListener = nullptr;
272 if (notifyParma->notifier_ != nullptr) {
273 tmpListener = kvDB->AddRemotePushFinishedNotify(notifyParma->notifier_, errCode);
274 if (tmpListener == nullptr) {
275 return errCode;
276 }
277 }
278
279 std::lock_guard<std::mutex> lock(remotePushFinishedListenerLock_);
280 // Drop old listener and set the new listener
281 if (remotePushFinishedListener_ != nullptr) {
282 errCode = remotePushFinishedListener_->Drop();
283 if (errCode != E_OK) {
284 LOGE("[SyncAbleConnection] Drop Remote push finished listener failed %d", errCode);
285 if (tmpListener != nullptr) {
286 tmpListener->Drop();
287 }
288 return errCode;
289 }
290 }
291 remotePushFinishedListener_ = tmpListener;
292 return errCode;
293 }
294
SetSyncRetry(bool isRetry)295 int SyncAbleKvDBConnection::SetSyncRetry(bool isRetry)
296 {
297 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
298 if (kvDB == nullptr) {
299 return -E_INVALID_CONNECTION;
300 }
301 return kvDB->SetSyncRetry(isRetry);
302 }
303
SetEqualIdentifier(const PragmaSetEqualIdentifier * param)304 int SyncAbleKvDBConnection::SetEqualIdentifier(const PragmaSetEqualIdentifier *param)
305 {
306 if (param == nullptr) {
307 return -E_INVALID_ARGS;
308 }
309
310 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
311 if (kvDB == nullptr) {
312 return -E_INVALID_CONNECTION;
313 }
314 return kvDB->SetEqualIdentifier(param->identifier_, param->targets_);
315 }
316
SetPushDataInterceptor(const PushDataInterceptor & interceptor)317 int SyncAbleKvDBConnection::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
318 {
319 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
320 if (kvDB == nullptr) {
321 return -E_INVALID_CONNECTION;
322 }
323 kvDB->SetSendDataInterceptor(interceptor);
324 return E_OK;
325 }
326
GetSyncDataSize(const std::string & device,size_t & size) const327 int SyncAbleKvDBConnection::GetSyncDataSize(const std::string &device, size_t &size) const
328 {
329 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
330 if (kvDB == nullptr) {
331 return -E_INVALID_CONNECTION;
332 }
333 return kvDB->GetSyncDataSize(device, size);
334 }
335
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)336 int SyncAbleKvDBConnection::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
337 {
338 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
339 if (kvDB == nullptr) {
340 return -E_INVALID_CONNECTION;
341 }
342 return kvDB->GetWatermarkInfo(device, info);
343 }
344
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)345 int SyncAbleKvDBConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
346 {
347 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
348 if (kvDB == nullptr) {
349 return -E_INVALID_CONNECTION;
350 }
351
352 int securityLabel = INVALID_SEC_LABEL;
353 int securityFlag = INVALID_SEC_FLAG;
354 GetSecurityOption(securityLabel, securityFlag);
355 if (securityLabel == S4) {
356 LOGE("The current data does not support synchronization.");
357 return -E_SECURITY_OPTION_CHECK_ERROR;
358 }
359 return kvDB->Sync(option, onProcess);
360 }
361
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)362 int SyncAbleKvDBConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
363 {
364 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
365 if (kvDB == nullptr) {
366 return -E_INVALID_CONNECTION;
367 }
368 return kvDB->SetCloudDB(cloudDBs);
369 }
370
GetTaskCount()371 int32_t SyncAbleKvDBConnection::GetTaskCount()
372 {
373 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
374 if (kvDB == nullptr) {
375 LOGW("[SyncAbleKvDBConnection] Get task count with null db");
376 return -1;
377 }
378 return kvDB->GetTaskCount();
379 }
380
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)381 void SyncAbleKvDBConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
382 {
383 auto *kvDB = GetDB<SyncAbleKvDB>();
384 if (kvDB == nullptr) {
385 LOGW("[SyncAbleKvDBConnection] Set generate cloud version callback with null db");
386 return;
387 }
388 kvDB->SetGenCloudVersionCallback(callback);
389 }
390
SetReceiveDataInterceptor(const DataInterceptor & interceptor)391 int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
392 {
393 auto kvDB = GetDB<SyncAbleKvDB>();
394 if (kvDB == nullptr) {
395 return -E_INVALID_CONNECTION;
396 }
397 kvDB->SetReceiveDataInterceptor(interceptor);
398 return E_OK;
399 }
400 }
401