1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbServiceProxy"
16 #include "rdb_service_proxy.h"
17 
18 #include "itypes_util.h"
19 #include "logger.h"
20 #include "sqlite_utils.h"
21 #include "result_set_proxy.h"
22 
23 namespace OHOS::DistributedRdb {
24 using namespace OHOS::Rdb;
25 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
26 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
27 
28 #define IPC_SEND(code, reply, ...)                                          \
29 ({                                                                          \
30     int32_t __status = RDB_OK;                                              \
31     do {                                                                    \
32         MessageParcel request;                                              \
33         if (!request.WriteInterfaceToken(GetDescriptor())) {                \
34             __status = RDB_ERROR;                                           \
35             break;                                                          \
36         }                                                                   \
37         if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) {                 \
38             __status = RDB_ERROR;                                           \
39             break;                                                          \
40         }                                                                   \
41         MessageOption option;                                               \
42         auto result = remote_->SendRequest((code), request, reply, option); \
43         if (result != 0) {                                                  \
44             __status = RDB_ERROR;                                           \
45             break;                                                          \
46         }                                                                   \
47                                                                             \
48         ITypesUtil::Unmarshal(reply, __status);                             \
49     } while (0);                                                            \
50     __status;                                                               \
51 })
52 
RdbServiceProxy(const sptr<IRemoteObject> & object)53 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
54     : IRemoteProxy<IRdbService>(object)
55 {
56     remote_ = Remote();
57 }
58 
ObtainDistributedTableName(const std::string & device,const std::string & table)59 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
60 {
61     return "";
62 }
63 
InitNotifier(const RdbSyncerParam & param)64 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param)
65 {
66     notifier_ = new (std::nothrow) RdbNotifierStub(
67         [this] (uint32_t seqNum, Details &&result) {
68             OnSyncComplete(seqNum, std::move(result));
69         },
70         [this] (std::string storeName, Details &&result) {
71             OnSyncComplete(storeName, std::move(result));
72         },
73         [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
74             OnDataChange(origin, primaries, std::move(changeInfo));
75         });
76     if (notifier_ == nullptr) {
77         LOG_ERROR("create notifier failed.");
78         return RDB_ERROR;
79     }
80 
81     if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
82         notifier_ = nullptr;
83         LOG_ERROR("init notifier error.");
84         return RDB_ERROR;
85     }
86 
87     return RDB_OK;
88 }
89 
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)90 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param, sptr<IRemoteObject> notifier)
91 {
92     MessageParcel reply;
93     int32_t status = IPC_SEND(
94         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
95     if (status != RDB_OK) {
96         LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
97     }
98     return status;
99 }
100 
GetSeqNum()101 uint32_t RdbServiceProxy::GetSeqNum()
102 {
103     uint32_t value = ++seqNum_;
104     if (value == 0) {
105         value = ++seqNum_;
106     }
107     return value;
108 }
109 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)110 std::pair<int32_t, Details> RdbServiceProxy::DoSync(const RdbSyncerParam& param, const Option &option,
111     const PredicatesMemo &predicates)
112 {
113     std::pair<int32_t, Details> result{RDB_ERROR, {}};
114     MessageParcel reply;
115     auto &[status, details] = result;
116     status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
117     if (status != RDB_OK) {
118         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
119             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
120         return result;
121     }
122 
123     if (!ITypesUtil::Unmarshal(reply, details)) {
124         LOG_ERROR("read result failed.");
125         status = RDB_ERROR;
126         return result;
127     }
128     return result;
129 }
130 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)131 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates,
132     const AsyncDetail &async)
133 {
134     auto [status, details] = DoSync(param, option, predicates);
135     if (status != RDB_OK) {
136         LOG_INFO("failed.");
137         return RDB_ERROR;
138     }
139     LOG_INFO("success.");
140     if (async != nullptr) {
141         async(std::move(details));
142     }
143     return RDB_OK;
144 }
145 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)146 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
147 {
148     MessageParcel reply;
149     int32_t status = IPC_SEND(
150         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
151     if (status != RDB_OK) {
152         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
153             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
154     }
155     return status;
156 }
157 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)158 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, const Option &option,
159                                  const PredicatesMemo &predicates, const AsyncDetail & callback)
160 {
161     Option asyncOption = option;
162     if (callback != nullptr) {
163         asyncOption.seqNum = GetSeqNum();
164         if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
165             LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, insert callback failed", param.bundleName_.c_str(),
166                 SqliteUtils::Anonymous(param.storeName_).c_str());
167             return RDB_ERROR;
168         }
169     }
170     LOG_INFO("bundleName:%{public}s, storeName:%{public}s, num=%{public}u, start DoAsync", param.bundleName_.c_str(),
171         SqliteUtils::Anonymous(param.storeName_).c_str(), asyncOption.seqNum);
172     if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
173         syncCallbacks_.Erase(asyncOption.seqNum);
174         return RDB_ERROR;
175     }
176     return RDB_OK;
177 }
178 
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,const std::vector<Reference> & references,bool isRebuild,int32_t type)179 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam& param, const std::vector<std::string> &tables,
180     const std::vector<Reference> &references, bool isRebuild, int32_t type)
181 {
182     MessageParcel reply;
183     int32_t status = IPC_SEND(
184         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param, tables, references,
185             type, isRebuild);
186     if (status != RDB_OK) {
187         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d",
188             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
189     }
190     return status;
191 }
192 
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)193 int32_t RdbServiceProxy::Sync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates,
194                               const AsyncDetail &async)
195 {
196     if (option.isAsync) {
197         return DoAsync(param, option, predicates, async);
198     }
199     return DoSync(param, option, predicates, async);
200 }
201 
RemoveSuffix(const std::string & name)202 std::string RdbServiceProxy::RemoveSuffix(const std::string& name)
203 {
204     std::string suffix(".db");
205     auto pos = name.rfind(suffix);
206     if (pos == std::string::npos || pos < name.length() - suffix.length()) {
207         return name;
208     }
209     return { name, 0, pos };
210 }
211 
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)212 int32_t RdbServiceProxy::Subscribe(const RdbSyncerParam &param, const SubscribeOption &option,
213                                    RdbStoreObserver *observer)
214 {
215     if (observer == nullptr) {
216         return RDB_ERROR;
217     }
218     if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
219         LOG_ERROR("subscribe mode invalid.");
220         return RDB_ERROR;
221     }
222     if (DoSubscribe(param, option) != RDB_OK) {
223         return RDB_ERROR;
224     }
225     auto name = RemoveSuffix(param.storeName_);
226     observers_.Compute(name, [observer, &param, &option](const auto &key, std::list<ObserverParam> &value) {
227         for (const auto &element : value) {
228             if (element.observer == observer) {
229                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(key).c_str());
230                 return true;
231             }
232         }
233         value.push_back({ observer, param.bundleName_, option });
234         return true;
235     });
236     return RDB_OK;
237 }
238 
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)239 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
240 {
241     MessageParcel reply;
242     int32_t status = IPC_SEND(
243         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
244     if (status != RDB_OK) {
245         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
246             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
247     }
248     return status;
249 }
250 
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)251 int32_t RdbServiceProxy::UnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option,
252                                      RdbStoreObserver *observer)
253 {
254     if (observer == nullptr) {
255         LOG_ERROR("observer is null.");
256         return RDB_ERROR;
257     }
258     if (DoUnSubscribe(param, option) != RDB_OK) {
259         return RDB_ERROR;
260     }
261     auto name = RemoveSuffix(param.storeName_);
262     observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
263         LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
264         value.remove_if([observer](const ObserverParam &param) {
265             return param.observer == observer;
266         });
267         LOG_INFO("after  remove size=%{public}d", static_cast<int>(value.size()));
268         return !(value.empty());
269     });
270     return RDB_OK;
271 }
272 
DoUnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)273 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
274 {
275     MessageParcel reply;
276     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param, option);
277     if (status != RDB_OK) {
278         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
279             SqliteUtils::Anonymous(param.storeName_).c_str());
280     }
281     return status;
282 }
283 
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs)284 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::RemoteQuery(
285     const RdbSyncerParam &param, const std::string &device, const std::string &sql,
286     const std::vector<std::string> &selectionArgs)
287 {
288     MessageParcel reply;
289     int32_t status = IPC_SEND(
290         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
291     if (status != RDB_OK) {
292         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s", status,
293             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
294             SqliteUtils::Anonymous(device).c_str());
295         return { status, nullptr };
296     }
297 
298     sptr<IRemoteObject> remote = reply.ReadRemoteObject();
299     if (remote == nullptr) {
300         LOG_ERROR("read remote object is null.");
301         return { RDB_ERROR, nullptr };
302     }
303     sptr<NativeRdb::ResultSetProxy> instance = new NativeRdb::ResultSetProxy(remote);
304     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [holder = instance](const auto *) {}) };
305 }
306 
ExportObservers()307 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
308 {
309     return observers_;
310 }
311 
ImportObservers(Observers & observers)312 void RdbServiceProxy::ImportObservers(Observers &observers)
313 {
314     observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
315         RdbSyncerParam syncerParam;
316         for (const auto &param : value) {
317             syncerParam.bundleName_ = param.bundleName;
318             syncerParam.storeName_ = key;
319             Subscribe(syncerParam, param.subscribeOption, param.observer);
320         }
321         return false;
322     });
323 }
324 
BeforeOpen(RdbSyncerParam & param)325 int32_t RdbServiceProxy::BeforeOpen(RdbSyncerParam &param)
326 {
327     MessageParcel reply;
328     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_BEFORE_OPEN), reply, param);
329     if (status != RDB_OK) {
330         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
331             SqliteUtils::Anonymous(param.storeName_).c_str());
332         return status;
333     }
334     if (!ITypesUtil::Unmarshal(reply, param)) {
335         LOG_ERROR("read result failed.");
336         status = RDB_ERROR;
337     }
338     return status;
339 }
340 
AfterOpen(const RdbSyncerParam & param)341 int32_t RdbServiceProxy::AfterOpen(const RdbSyncerParam &param)
342 {
343     MessageParcel reply;
344     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_AFTER_OPEN), reply, param);
345     if (status != RDB_OK) {
346         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
347             SqliteUtils::Anonymous(param.storeName_).c_str());
348     }
349     return status;
350 }
351 
Delete(const RdbSyncerParam & param)352 int32_t RdbServiceProxy::Delete(const RdbSyncerParam &param)
353 {
354     MessageParcel reply;
355     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
356     if (status != RDB_OK) {
357         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
358             SqliteUtils::Anonymous(param.storeName_).c_str());
359     }
360     return status;
361 }
362 
QuerySharingResource(const RdbSyncerParam & param,const PredicatesMemo & predicates,const std::vector<std::string> & columns)363 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::QuerySharingResource(
364     const RdbSyncerParam &param, const PredicatesMemo &predicates, const std::vector<std::string> &columns)
365 {
366     MessageParcel reply;
367     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_QUERY_SHARING_RESOURCE), reply,
368         param, predicates, columns);
369     sptr<IRemoteObject> remote;
370     bool success = ITypesUtil::Unmarshal(reply, remote);
371     if (status != RDB_OK || !success || remote == nullptr) {
372         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, success:%{public}d, remote is "
373                   "%{public}s nullptr",
374             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), success,
375             remote != nullptr ? "not" : "");
376         return { RDB_ERROR, {} };
377     }
378     sptr<NativeRdb::ResultSetProxy> instance = new NativeRdb::ResultSetProxy(remote);
379     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [instance](const auto *) {}) };
380 }
381 
RegisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)382 int32_t RdbServiceProxy::RegisterAutoSyncCallback(
383     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
384 {
385     if (observer == nullptr || param.storeName_.empty()) {
386         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
387             SqliteUtils::Anonymous(param.storeName_).c_str());
388         return RDB_ERROR;
389     }
390     int32_t status = RDB_OK;
391     auto name = RemoveSuffix(param.storeName_);
392     syncObservers_.Compute(name, [this, &param, &status, observer](const auto &store, auto &observers) {
393         for (const auto &element : observers) {
394             if (element.get() == observer.get()) {
395                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(store).c_str());
396                 return true;
397             }
398         }
399         status = DoRegister(param);
400         if (status == RDB_OK) {
401             observers.push_back(observer);
402         }
403         return !observers.empty();
404     });
405     return status;
406 }
407 
DoRegister(const RdbSyncerParam & param)408 int32_t RdbServiceProxy::DoRegister(const RdbSyncerParam &param)
409 {
410     MessageParcel reply;
411     int32_t status = IPC_SEND(
412         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
413     if (status != RDB_OK) {
414         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
415             SqliteUtils::Anonymous(param.storeName_).c_str());
416     }
417     return status;
418 }
419 
UnregisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)420 int32_t RdbServiceProxy::UnregisterAutoSyncCallback(
421     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
422 {
423     if (observer == nullptr || param.storeName_.empty()) {
424         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
425             SqliteUtils::Anonymous(param.storeName_).c_str());
426         return RDB_ERROR;
427     }
428     int32_t status = RDB_OK;
429     auto name = RemoveSuffix(param.storeName_);
430     syncObservers_.ComputeIfPresent(name, [this, &param, &status, observer](const auto &storeName, auto &observers) {
431         for (auto it = observers.begin(); it != observers.end();) {
432             if (it->get() != observer.get()) {
433                 ++it;
434                 continue;
435             }
436             status = DoUnRegister(param);
437             if (status == RDB_OK) {
438                 it = observers.erase(it);
439             }
440         }
441         return !observers.empty();
442     });
443     return status;
444 }
445 
DoUnRegister(const RdbSyncerParam & param)446 int32_t RdbServiceProxy::DoUnRegister(const RdbSyncerParam &param)
447 {
448     MessageParcel reply;
449     int32_t status = IPC_SEND(
450         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
451     if (status != RDB_OK) {
452         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
453             SqliteUtils::Anonymous(param.storeName_).c_str());
454     }
455     return status;
456 }
457 
OnDataChange(const Origin & origin,const RdbServiceProxy::PrimaryFields & primaries,RdbServiceProxy::ChangeInfo && changeInfo)458 void RdbServiceProxy::OnDataChange(
459     const Origin &origin, const RdbServiceProxy::PrimaryFields &primaries, RdbServiceProxy::ChangeInfo &&changeInfo)
460 {
461     LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
462         SqliteUtils::Anonymous(origin.store).c_str(),
463         origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(),
464         origin.dataType, origin.origin);
465     auto name = RdbServiceProxy::RemoveSuffix(origin.store);
466     observers_.ComputeIfPresent(name, [&origin, &primaries, info = std::move(changeInfo)](
467                                          const auto &key, const std::list<ObserverParam> &value) mutable {
468         auto size = value.size();
469         for (const auto &params : value) {
470             params.observer->OnChange(origin, primaries, --size > 0 ? ChangeInfo(info) : std::move(info));
471         }
472         return !value.empty();
473     });
474 }
475 
OnSyncComplete(uint32_t seqNum,Details && result)476 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
477 {
478     syncCallbacks_.ComputeIfPresent(seqNum, [&result] (const auto& key, const AsyncDetail& callback) {
479         auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
480         LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
481         if (callback!=nullptr) {
482             callback(std::move(result));
483         }
484         return !finished;
485     });
486 }
487 
OnSyncComplete(const std::string & storeName,Details && result)488 void RdbServiceProxy::OnSyncComplete(const std::string &storeName, Details &&result)
489 {
490     syncObservers_.ComputeIfPresent(storeName, [&result](const auto &key, const auto &observers) {
491         LOG_DEBUG("Sync complete, storeName%{public}s, result size:%{public}zu", SqliteUtils::Anonymous(key).c_str(),
492             result.size());
493         for (const auto &observer : observers) {
494             if (observer != nullptr) {
495                 observer->ProgressNotification(result);
496             }
497         }
498         return true;
499     });
500 }
501 
SetSearchable(const RdbSyncerParam & param,bool isSearchable)502 int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam& param, bool isSearchable)
503 {
504     MessageParcel reply;
505     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_SEARCHABLE),
506         reply, param, isSearchable);
507     if (status != RDB_OK) {
508         LOG_ERROR("RdbServiceProxy SetSearchable fail, status:%{public}d, "
509                   "bundleName:%{public}s, storeName:%{public}s",
510             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
511     }
512     return status;
513 }
514 
NotifyDataChange(const RdbSyncerParam & param,const RdbChangedData & rdbChangedData,const RdbNotifyConfig & rdbNotifyConfig)515 int32_t RdbServiceProxy::NotifyDataChange(const RdbSyncerParam &param, const RdbChangedData &rdbChangedData,
516     const RdbNotifyConfig &rdbNotifyConfig)
517 {
518     MessageParcel reply;
519     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE),
520         reply, param, rdbChangedData, rdbNotifyConfig);
521     if (status != RDB_OK) {
522         LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
523                   "bundleName:%{public}s, storeName:%{public}s",
524             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
525     }
526     return status;
527 }
528 
Disable(const RdbSyncerParam & param)529 int32_t RdbServiceProxy::Disable(const RdbSyncerParam& param)
530 {
531     MessageParcel reply;
532     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DISABLE), reply, param);
533     if (status != RDB_OK) {
534         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
535             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
536     }
537     return status;
538 }
539 
Enable(const RdbSyncerParam & param)540 int32_t RdbServiceProxy::Enable(const RdbSyncerParam& param)
541 {
542     MessageParcel reply;
543     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ENABLE), reply, param);
544     if (status != RDB_OK) {
545         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
546             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
547     }
548     return status;
549 }
550 
GetPassword(const RdbSyncerParam & param,std::vector<uint8_t> & key)551 int32_t RdbServiceProxy::GetPassword(const RdbSyncerParam &param, std::vector<uint8_t> &key)
552 {
553     MessageParcel reply;
554     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_PASSWORD), reply, param);
555     if (status != RDB_OK) {
556         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
557             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
558         return status;
559     }
560     if (!ITypesUtil::Unmarshal(reply, key)) {
561         LOG_ERROR("unmarshal key failed.");
562         status = RDB_ERROR;
563     }
564     return status;
565 }
566 
LockCloudContainer(const RdbSyncerParam & param)567 std::pair<int32_t, uint32_t> RdbServiceProxy::LockCloudContainer(const RdbSyncerParam& param)
568 {
569     MessageParcel reply;
570     uint32_t expiredTime = 0;
571     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_LOCK_CLOUD_CONTAINER), reply,
572         param, expiredTime);
573     if (status != RDB_OK) {
574         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
575             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
576         return { status, expiredTime };
577     }
578     if (!ITypesUtil::Unmarshal(reply, expiredTime)) {
579         LOG_ERROR("Unmarshal failed");
580         status = RDB_ERROR;
581     }
582     return { status, expiredTime };
583 }
584 
UnlockCloudContainer(const RdbSyncerParam & param)585 int32_t RdbServiceProxy::UnlockCloudContainer(const RdbSyncerParam& param)
586 {
587     MessageParcel reply;
588     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNLOCK_CLOUD_CONTAINER),
589         reply, param);
590     if (status != RDB_OK) {
591         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
592             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
593     }
594     return status;
595 }
596 
GetDebugInfo(const RdbSyncerParam & param,std::map<std::string,RdbDebugInfo> & debugInfo)597 int32_t RdbServiceProxy::GetDebugInfo(const RdbSyncerParam &param, std::map<std::string, RdbDebugInfo> &debugInfo)
598 {
599     MessageParcel reply;
600     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DEBUG_INFO), reply, param);
601     if (status != RDB_OK) {
602         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
603             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
604     }
605     if (!ITypesUtil::Unmarshal(reply, debugInfo)) {
606         LOG_ERROR("Unmarshal failed");
607         status = RDB_ERROR;
608     }
609     return status;
610 }
611 } // namespace OHOS::DistributedRdb
612