1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb_connection.h"
17 
18 #include "log_print.h"
19 #include "db_errno.h"
20 #include "db_constant.h"
21 #include "kvdb_pragma.h"
22 #include "performance_analysis.h"
23 #include "runtime_context.h"
24 #include "sync_able_kvdb.h"
25 
26 namespace DistributedDB {
SyncAbleKvDBConnection(SyncAbleKvDB * kvDB)27 SyncAbleKvDBConnection::SyncAbleKvDBConnection(SyncAbleKvDB *kvDB)
28     : GenericKvDBConnection(kvDB),
29       remotePushFinishedListener_(nullptr)
30 {
31     OnKill([this]() {
32         auto *db = GetDB<SyncAbleKvDB>();
33         if (db == nullptr) {
34             return;
35         }
36         // Drop the lock before we call RemoveSyncOperation().
37         UnlockObj();
38         db->StopSync(GetConnectionId());
39         LockObj();
40     });
41 }
42 
~SyncAbleKvDBConnection()43 SyncAbleKvDBConnection::~SyncAbleKvDBConnection()
44 {
45     if (remotePushFinishedListener_ != nullptr) {
46         remotePushFinishedListener_->Drop(true);
47     }
48     remotePushFinishedListener_ = nullptr;
49 }
50 
InitPragmaFunc()51 void SyncAbleKvDBConnection::InitPragmaFunc()
52 {
53     if (!pragmaFunc_.empty()) {
54         return;
55     }
56     pragmaFunc_ = {
57         {PRAGMA_SYNC_DEVICES, [this](void *parameter, int &errCode) {
58             errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
59         {PRAGMA_AUTO_SYNC, [this](void *parameter, int &errCode) {
60             errCode = EnableAutoSync(*(static_cast<bool *>(parameter))); }},
61         {PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT, [](void *parameter, int &errCode) {
62             *(static_cast<std::string *>(parameter)) = PerformanceAnalysis::GetInstance()->GetStatistics(); }},
63         {PRAGMA_PERFORMANCE_ANALYSIS_OPEN, [](void *parameter, int &errCode) {
64             PerformanceAnalysis::GetInstance()->OpenPerformanceAnalysis(); }},
65         {PRAGMA_PERFORMANCE_ANALYSIS_CLOSE, [](void *parameter, int &errCode) {
66             PerformanceAnalysis::GetInstance()->ClosePerformanceAnalysis(); }},
67         {PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME,  [](void *parameter, int &errCode) {
68             PerformanceAnalysis::GetInstance()->SetFileName(*(static_cast<std::string *>(parameter))); }},
69         {PRAGMA_GET_QUEUED_SYNC_SIZE, [this](void *parameter, int &errCode) {
70             errCode = GetQueuedSyncSize(static_cast<int *>(parameter)); }},
71         {PRAGMA_SET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
72             errCode = SetQueuedSyncLimit(static_cast<int *>(parameter)); }},
73         {PRAGMA_GET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
74             errCode = GetQueuedSyncLimit(static_cast<int *>(parameter)); }},
75         {PRAGMA_SET_WIPE_POLICY, [this](void *parameter, int &errCode) {
76             errCode = SetStaleDataWipePolicy(static_cast<WipePolicy *>(parameter)); }},
77         {PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, [this](void *parameter, int &errCode) {
78             errCode = SetRemotePushFinishedNotify(static_cast<PragmaRemotePushNotify *>(parameter)); }},
79         {PRAGMA_SET_SYNC_RETRY, [this](void *parameter, int &errCode) {
80             errCode = SetSyncRetry(*(static_cast<bool *>(parameter))); }},
81         {PRAGMA_ADD_EQUAL_IDENTIFIER, [this](void *parameter, int &errCode) {
82             errCode = SetEqualIdentifier(static_cast<PragmaSetEqualIdentifier *>(parameter)); }},
83         {PRAGMA_INTERCEPT_SYNC_DATA, [this](void *parameter, int &errCode) {
84             errCode = SetPushDataInterceptor(*static_cast<PushDataInterceptor *>(parameter)); }},
85         {PRAGMA_SUBSCRIBE_QUERY, [this](void *parameter, int &errCode) {
86             errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
87     };
88 }
89 
Pragma(int cmd,void * parameter)90 int SyncAbleKvDBConnection::Pragma(int cmd, void *parameter)
91 {
92     int errCode = PragmaParamCheck(cmd, parameter);
93     if (errCode != E_OK) {
94         return -E_INVALID_ARGS;
95     }
96 
97     InitPragmaFunc();
98     auto iter = pragmaFunc_.find(cmd);
99     if (iter != pragmaFunc_.end()) {
100         iter->second(parameter, errCode);
101         return errCode;
102     }
103 
104     // Call Pragma() of super class.
105     return GenericKvDBConnection::Pragma(cmd, parameter);
106 }
107 
PragmaParamCheck(int cmd,const void * parameter)108 int SyncAbleKvDBConnection::PragmaParamCheck(int cmd, const void *parameter)
109 {
110     switch (cmd) {
111         case PRAGMA_AUTO_SYNC:
112         case PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT:
113         case PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME:
114             if (parameter == nullptr) {
115                 return -E_INVALID_ARGS;
116             }
117             return E_OK;
118         default:
119             return E_OK;
120     }
121 }
122 
PragmaSyncAction(const PragmaSync * syncParameter)123 int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
124 {
125     if (syncParameter == nullptr) {
126         return -E_INVALID_ARGS;
127     }
128     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
129     if (kvDB == nullptr) {
130         return -E_INVALID_CONNECTION;
131     }
132 
133     if (isExclusive_.load()) {
134         return -E_BUSY;
135     }
136     {
137         AutoLock lockGuard(this);
138         if (IsKilled()) {
139             // If this happens, users are using a closed connection.
140             LOGE("Pragma sync on a closed connection.");
141             return -E_STALE;
142         }
143         IncObjRef(this);
144     }
145 
146     ISyncer::SyncParma syncParam;
147     syncParam.devices = syncParameter->devices_;
148     syncParam.mode = syncParameter->mode_;
149     syncParam.wait = syncParameter->wait_;
150     syncParam.isQuerySync = syncParameter->isQuerySync_;
151     syncParam.syncQuery = syncParameter->query_;
152     syncParam.onFinalize =  [this]() { DecObjRef(this); };
153     syncParam.onComplete = std::bind(&SyncAbleKvDBConnection::OnSyncComplete, this, std::placeholders::_1,
154         syncParameter->onComplete_, syncParameter->wait_);
155     int errCode = kvDB->Sync(syncParam, GetConnectionId());
156     if (errCode != E_OK) {
157         DecObjRef(this);
158     }
159     return errCode;
160 }
161 
EnableAutoSync(bool enable)162 int SyncAbleKvDBConnection::EnableAutoSync(bool enable)
163 {
164     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
165     if (kvDB == nullptr) {
166         return -E_INVALID_CONNECTION;
167     }
168     kvDB->EnableAutoSync(enable);
169     return E_OK;
170 }
171 
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,int> & devicesMap)> & onComplete,bool wait)172 void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &statuses,
173     const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait)
174 {
175     AutoLock lockGuard(this);
176     if (!IsKilled() && onComplete) {
177         // Drop the lock before invoking the callback.
178         // Do pragma-sync again in the prev sync callback is supported.
179         UnlockObj();
180         // The connection may be closed after UnlockObj().
181         // RACE: 'KillObj()' against 'onComplete()'.
182         if (!IsKilled()) {
183             onComplete(statuses);
184         }
185         LockObj();
186     }
187 }
188 
GetQueuedSyncSize(int * queuedSyncSize) const189 int SyncAbleKvDBConnection::GetQueuedSyncSize(int *queuedSyncSize) const
190 {
191     if (queuedSyncSize == nullptr) {
192         return -E_INVALID_ARGS;
193     }
194     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
195     if (kvDB == nullptr) {
196         return -E_INVALID_CONNECTION;
197     }
198     return kvDB->GetQueuedSyncSize(queuedSyncSize);
199 }
200 
SetQueuedSyncLimit(const int * queuedSyncLimit)201 int SyncAbleKvDBConnection::SetQueuedSyncLimit(const int *queuedSyncLimit)
202 {
203     if (queuedSyncLimit == nullptr) {
204         return -E_INVALID_ARGS;
205     }
206     if ((*queuedSyncLimit > DBConstant::QUEUED_SYNC_LIMIT_MAX) ||
207         (*queuedSyncLimit < DBConstant::QUEUED_SYNC_LIMIT_MIN)) {
208         return -E_INVALID_ARGS;
209     }
210     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
211     if (kvDB == nullptr) {
212         return -E_INVALID_CONNECTION;
213     }
214     return kvDB->SetQueuedSyncLimit(queuedSyncLimit);
215 }
216 
GetQueuedSyncLimit(int * queuedSyncLimit) const217 int SyncAbleKvDBConnection::GetQueuedSyncLimit(int *queuedSyncLimit) const
218 {
219     if (queuedSyncLimit == nullptr) {
220         return -E_INVALID_ARGS;
221     }
222     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
223     if (kvDB == nullptr) {
224         return -E_INVALID_CONNECTION;
225     }
226     return kvDB->GetQueuedSyncLimit(queuedSyncLimit);
227 }
228 
DisableManualSync(void)229 int SyncAbleKvDBConnection::DisableManualSync(void)
230 {
231     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
232     if (kvDB == nullptr) {
233         return -E_INVALID_CONNECTION;
234     }
235     return kvDB->DisableManualSync();
236 }
237 
EnableManualSync(void)238 int SyncAbleKvDBConnection::EnableManualSync(void)
239 {
240     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
241     if (kvDB == nullptr) {
242         return -E_INVALID_CONNECTION;
243     }
244     return kvDB->EnableManualSync();
245 }
246 
SetStaleDataWipePolicy(const WipePolicy * policy)247 int SyncAbleKvDBConnection::SetStaleDataWipePolicy(const WipePolicy *policy)
248 {
249     if (policy == nullptr) {
250         return -E_INVALID_ARGS;
251     }
252     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
253     if (kvDB == nullptr) {
254         return -E_INVALID_CONNECTION;
255     }
256     return kvDB->SetStaleDataWipePolicy(*policy);
257 }
258 
SetRemotePushFinishedNotify(PragmaRemotePushNotify * notifyParma)259 int SyncAbleKvDBConnection::SetRemotePushFinishedNotify(PragmaRemotePushNotify *notifyParma)
260 {
261     if (notifyParma == nullptr) {
262         return -E_INVALID_ARGS;
263     }
264 
265     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
266     if (kvDB == nullptr) {
267         return -E_INVALID_CONNECTION;
268     }
269 
270     int errCode = E_OK;
271     NotificationChain::Listener *tmpListener = nullptr;
272     if (notifyParma->notifier_ != nullptr) {
273         tmpListener = kvDB->AddRemotePushFinishedNotify(notifyParma->notifier_, errCode);
274         if (tmpListener == nullptr) {
275             return errCode;
276         }
277     }
278 
279     std::lock_guard<std::mutex> lock(remotePushFinishedListenerLock_);
280     // Drop old listener and set the new listener
281     if (remotePushFinishedListener_ != nullptr) {
282         errCode = remotePushFinishedListener_->Drop();
283         if (errCode != E_OK) {
284             LOGE("[SyncAbleConnection] Drop Remote push finished listener failed %d", errCode);
285             if (tmpListener != nullptr) {
286                 tmpListener->Drop();
287             }
288             return errCode;
289         }
290     }
291     remotePushFinishedListener_ = tmpListener;
292     return errCode;
293 }
294 
SetSyncRetry(bool isRetry)295 int SyncAbleKvDBConnection::SetSyncRetry(bool isRetry)
296 {
297     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
298     if (kvDB == nullptr) {
299         return -E_INVALID_CONNECTION;
300     }
301     return kvDB->SetSyncRetry(isRetry);
302 }
303 
SetEqualIdentifier(const PragmaSetEqualIdentifier * param)304 int SyncAbleKvDBConnection::SetEqualIdentifier(const PragmaSetEqualIdentifier *param)
305 {
306     if (param == nullptr) {
307         return -E_INVALID_ARGS;
308     }
309 
310     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
311     if (kvDB == nullptr) {
312         return -E_INVALID_CONNECTION;
313     }
314     return kvDB->SetEqualIdentifier(param->identifier_, param->targets_);
315 }
316 
SetPushDataInterceptor(const PushDataInterceptor & interceptor)317 int SyncAbleKvDBConnection::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
318 {
319     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
320     if (kvDB == nullptr) {
321         return -E_INVALID_CONNECTION;
322     }
323     kvDB->SetSendDataInterceptor(interceptor);
324     return E_OK;
325 }
326 
GetSyncDataSize(const std::string & device,size_t & size) const327 int SyncAbleKvDBConnection::GetSyncDataSize(const std::string &device, size_t &size) const
328 {
329     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
330     if (kvDB == nullptr) {
331         return -E_INVALID_CONNECTION;
332     }
333     return kvDB->GetSyncDataSize(device, size);
334 }
335 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)336 int SyncAbleKvDBConnection::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
337 {
338     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
339     if (kvDB == nullptr) {
340         return -E_INVALID_CONNECTION;
341     }
342     return kvDB->GetWatermarkInfo(device, info);
343 }
344 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)345 int SyncAbleKvDBConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
346 {
347     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
348     if (kvDB == nullptr) {
349         return -E_INVALID_CONNECTION;
350     }
351 
352     int securityLabel = INVALID_SEC_LABEL;
353     int securityFlag = INVALID_SEC_FLAG;
354     GetSecurityOption(securityLabel, securityFlag);
355     if (securityLabel == S4) {
356         LOGE("The current data does not support synchronization.");
357         return -E_SECURITY_OPTION_CHECK_ERROR;
358     }
359     return kvDB->Sync(option, onProcess);
360 }
361 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)362 int SyncAbleKvDBConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
363 {
364     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
365     if (kvDB == nullptr) {
366         return -E_INVALID_CONNECTION;
367     }
368     return kvDB->SetCloudDB(cloudDBs);
369 }
370 
GetTaskCount()371 int32_t SyncAbleKvDBConnection::GetTaskCount()
372 {
373     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
374     if (kvDB == nullptr) {
375         LOGW("[SyncAbleKvDBConnection] Get task count with null db");
376         return -1;
377     }
378     return kvDB->GetTaskCount();
379 }
380 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)381 void SyncAbleKvDBConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
382 {
383     auto *kvDB = GetDB<SyncAbleKvDB>();
384     if (kvDB == nullptr) {
385         LOGW("[SyncAbleKvDBConnection] Set generate cloud version callback with null db");
386         return;
387     }
388     kvDB->SetGenCloudVersionCallback(callback);
389 }
390 
SetReceiveDataInterceptor(const DataInterceptor & interceptor)391 int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
392 {
393     auto kvDB = GetDB<SyncAbleKvDB>();
394     if (kvDB == nullptr) {
395         return -E_INVALID_CONNECTION;
396     }
397     kvDB->SetReceiveDataInterceptor(interceptor);
398     return E_OK;
399 }
400 }
401