1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbSubscriberManager"
16 
17 #include "rdb_subscriber_manager.h"
18 
19 #include <cinttypes>
20 #include <utility>
21 
22 #include "ipc_skeleton.h"
23 #include "general/load_config_data_info_strategy.h"
24 #include "log_print.h"
25 #include "scheduler_manager.h"
26 #include "template_data.h"
27 #include "uri_utils.h"
28 #include "utils/anonymous.h"
29 
30 namespace OHOS::DataShare {
Get(const Key & key,int32_t userId,Template & tpl)31 bool TemplateManager::Get(const Key &key, int32_t userId, Template &tpl)
32 {
33     return TemplateData::Query(Id(TemplateData::GenId(key.uri, key.bundleName, key.subscriberId), userId), tpl) == E_OK;
34 }
35 
Add(const Key & key,int32_t userId,const Template & tpl)36 int32_t TemplateManager::Add(const Key &key, int32_t userId, const Template &tpl)
37 {
38     auto status = TemplateData::Add(key.uri, userId, key.bundleName, key.subscriberId, tpl);
39     if (!status) {
40         ZLOGE("Add failed, %{public}d", status);
41         return E_ERROR;
42     }
43     return E_OK;
44 }
45 
Delete(const Key & key,int32_t userId)46 int32_t TemplateManager::Delete(const Key &key, int32_t userId)
47 {
48     auto status = TemplateData::Delete(key.uri, userId, key.bundleName, key.subscriberId);
49     if (!status) {
50         ZLOGE("Delete failed, %{public}d", status);
51         return E_ERROR;
52     }
53     SchedulerManager::GetInstance().RemoveTimer(key);
54     return E_OK;
55 }
56 
Key(const std::string & uri,int64_t subscriberId,const std::string & bundleName)57 Key::Key(const std::string &uri, int64_t subscriberId, const std::string &bundleName)
58     : uri(uri), subscriberId(subscriberId), bundleName(bundleName)
59 {
60 }
61 
operator ==(const Key & rhs) const62 bool Key::operator==(const Key &rhs) const
63 {
64     return uri == rhs.uri && subscriberId == rhs.subscriberId && bundleName == rhs.bundleName;
65 }
66 
operator !=(const Key & rhs) const67 bool Key::operator!=(const Key &rhs) const
68 {
69     return !(rhs == *this);
70 }
operator <(const Key & rhs) const71 bool Key::operator<(const Key &rhs) const
72 {
73     if (uri < rhs.uri) {
74         return true;
75     }
76     if (rhs.uri < uri) {
77         return false;
78     }
79     if (subscriberId < rhs.subscriberId) {
80         return true;
81     }
82     if (rhs.subscriberId < subscriberId) {
83         return false;
84     }
85     return bundleName < rhs.bundleName;
86 }
operator >(const Key & rhs) const87 bool Key::operator>(const Key &rhs) const
88 {
89     return rhs < *this;
90 }
operator <=(const Key & rhs) const91 bool Key::operator<=(const Key &rhs) const
92 {
93     return !(rhs < *this);
94 }
operator >=(const Key & rhs) const95 bool Key::operator>=(const Key &rhs) const
96 {
97     return !(*this < rhs);
98 }
99 
TemplateManager()100 TemplateManager::TemplateManager() {}
101 
GetInstance()102 TemplateManager &TemplateManager::GetInstance()
103 {
104     static TemplateManager manager;
105     return manager;
106 }
107 
GetInstance()108 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
109 {
110     static RdbSubscriberManager manager;
111     return manager;
112 }
113 
Add(const Key & key,const sptr<IDataProxyRdbObserver> observer,std::shared_ptr<Context> context,std::shared_ptr<ExecutorPool> executorPool)114 int RdbSubscriberManager::Add(const Key &key, const sptr<IDataProxyRdbObserver> observer,
115     std::shared_ptr<Context> context, std::shared_ptr<ExecutorPool> executorPool)
116 {
117     int result = E_OK;
118     rdbCache_.Compute(key, [&observer, &context, executorPool, this](const auto &key, auto &value) {
119         ZLOGI("add subscriber, uri %{private}s tokenId 0x%{public}x", key.uri.c_str(), context->callerTokenId);
120         auto callerTokenId = IPCSkeleton::GetCallingTokenID();
121         auto callerPid = IPCSkeleton::GetCallingPid();
122         value.emplace_back(observer, context->callerTokenId, callerTokenId, callerPid);
123         std::vector<ObserverNode> node;
124         node.emplace_back(observer, context->callerTokenId, callerTokenId, callerPid);
125         ExecutorPool::Task task = [key, node, context, this]() {
126             LoadConfigDataInfoStrategy loadDataInfo;
127             if (!loadDataInfo(context)) {
128                 ZLOGE("loadDataInfo failed, uri %{public}s tokenId 0x%{public}x",
129                     DistributedData::Anonymous::Change(key.uri).c_str(), context->callerTokenId);
130                 return;
131             }
132             Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
133             if (GetEnableObserverCount(key) == 1) {
134                 SchedulerManager::GetInstance().Execute(
135                     key, context->currentUserId, context->calledSourceDir, context->version);
136             }
137         };
138         executorPool->Execute(task);
139         return true;
140     });
141     return result;
142 }
143 
Delete(const Key & key,uint32_t firstCallerTokenId)144 int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId)
145 {
146     auto result =
147         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
148             std::vector<ObserverNode> &value) {
149             ZLOGI("delete subscriber, uri %{public}s tokenId 0x%{public}x",
150                 DistributedData::Anonymous::Change(key.uri).c_str(), firstCallerTokenId);
151             for (auto it = value.begin(); it != value.end();) {
152                 if (it->firstCallerTokenId == firstCallerTokenId) {
153                     ZLOGI("erase start");
154                     it = value.erase(it);
155                 } else {
156                     it++;
157                 }
158             }
159             if (value.empty()) {
160                 SchedulerManager::GetInstance().RemoveTimer(key);
161             }
162             return !value.empty();
163         });
164     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
165 }
166 
Delete(uint32_t callerTokenId,uint32_t callerPid)167 void RdbSubscriberManager::Delete(uint32_t callerTokenId, uint32_t callerPid)
168 {
169     rdbCache_.EraseIf([&callerTokenId, &callerPid, this](const auto &key, std::vector<ObserverNode> &value) {
170         for (auto it = value.begin(); it != value.end();) {
171             if (it->callerTokenId == callerTokenId && it->callerPid == callerPid) {
172                 it = value.erase(it);
173             } else {
174                 it++;
175             }
176         }
177         if (value.empty()) {
178             ZLOGI("delete timer, subId %{public}" PRId64 ", bundleName %{public}s, tokenId %{public}x, uri %{public}s.",
179                 key.subscriberId, key.bundleName.c_str(), callerTokenId,
180                 DistributedData::Anonymous::Change(key.uri).c_str());
181             SchedulerManager::GetInstance().RemoveTimer(key);
182         }
183         return value.empty();
184     });
185 }
186 
Disable(const Key & key,uint32_t firstCallerTokenId)187 int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId)
188 {
189     auto result =
190         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
191             std::vector<ObserverNode> &value) {
192             for (auto it = value.begin(); it != value.end(); it++) {
193                 if (it->firstCallerTokenId == firstCallerTokenId) {
194                     it->enabled = false;
195                     it->isNotifyOnEnabled = false;
196                 }
197             }
198             return true;
199         });
200     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
201 }
202 
Enable(const Key & key,std::shared_ptr<Context> context)203 int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr<Context> context)
204 {
205     auto result = rdbCache_.ComputeIfPresent(key, [&context, this](const auto &key, std::vector<ObserverNode> &value) {
206         for (auto it = value.begin(); it != value.end(); it++) {
207             if (it->firstCallerTokenId != context->callerTokenId) {
208                 continue;
209             }
210             it->enabled = true;
211             if (it->isNotifyOnEnabled) {
212                 std::vector<ObserverNode> node;
213                 node.emplace_back(it->observer, context->callerTokenId);
214                 LoadConfigDataInfoStrategy loadDataInfo;
215                 if (loadDataInfo(context)) {
216                     Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
217                 }
218             }
219         }
220         return true;
221     });
222     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
223 }
224 
Emit(const std::string & uri,std::shared_ptr<Context> context)225 void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr<Context> context)
226 {
227     if (!URIUtils::IsDataProxyURI(uri)) {
228         return;
229     }
230     if (context->calledSourceDir.empty()) {
231         LoadConfigDataInfoStrategy loadDataInfo;
232         loadDataInfo(context);
233     }
234     rdbCache_.ForEach([&uri, &context, this](const Key &key, std::vector<ObserverNode> &val) {
235         if (key.uri != uri) {
236             return false;
237         }
238         Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
239         SetObserverNotifyOnEnabled(val);
240         return false;
241     });
242     SchedulerManager::GetInstance().Execute(
243         uri, context->currentUserId, context->calledSourceDir, context->version, context->calledBundleName);
244 }
245 
Emit(const std::string & uri,int32_t userId,DistributedData::StoreMetaData & metaData)246 void RdbSubscriberManager::Emit(const std::string &uri, int32_t userId,
247     DistributedData::StoreMetaData &metaData)
248 {
249     if (!URIUtils::IsDataProxyURI(uri)) {
250         return;
251     }
252     bool hasObserver = false;
253     rdbCache_.ForEach([&uri, &userId, &metaData, &hasObserver, this](const Key &key, std::vector<ObserverNode> &val) {
254         if (key.uri != uri) {
255             return false;
256         }
257         hasObserver = true;
258         Notify(key, userId, val, metaData.dataDir, metaData.version);
259         SetObserverNotifyOnEnabled(val);
260         return false;
261     });
262     if (!hasObserver) {
263         return;
264     }
265     SchedulerManager::GetInstance().Execute(
266         uri, userId, metaData.dataDir, metaData.version, metaData.bundleName);
267 }
268 
SetObserverNotifyOnEnabled(std::vector<ObserverNode> & nodes)269 void RdbSubscriberManager::SetObserverNotifyOnEnabled(std::vector<ObserverNode> &nodes)
270 {
271     for (auto &node : nodes) {
272         if (!node.enabled) {
273             node.isNotifyOnEnabled = true;
274         }
275     }
276 }
277 
GetKeysByUri(const std::string & uri)278 std::vector<Key> RdbSubscriberManager::GetKeysByUri(const std::string &uri)
279 {
280     std::vector<Key> results;
281     rdbCache_.ForEach([&uri, &results](const Key &key, std::vector<ObserverNode> &val) {
282         if (key.uri != uri) {
283             return false;
284         }
285         results.emplace_back(key);
286         return false;
287     });
288     return results;
289 }
290 
EmitByKey(const Key & key,int32_t userId,const std::string & rdbPath,int version)291 void RdbSubscriberManager::EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version)
292 {
293     if (!URIUtils::IsDataProxyURI(key.uri)) {
294         return;
295     }
296     rdbCache_.ComputeIfPresent(key, [&rdbPath, &version, &userId, this](const Key &key, auto &val) {
297         Notify(key, userId, val, rdbPath, version);
298         SetObserverNotifyOnEnabled(val);
299         return true;
300     });
301 }
302 
GetEnableObserverCount(const Key & key)303 int RdbSubscriberManager::GetEnableObserverCount(const Key &key)
304 {
305     auto pair = rdbCache_.Find(key);
306     if (!pair.first) {
307         return 0;
308     }
309     int count = 0;
310     for (const auto &observer : pair.second) {
311         if (observer.enabled) {
312             count++;
313         }
314     }
315     return count;
316 }
317 
Notify(const Key & key,int32_t userId,const std::vector<ObserverNode> & val,const std::string & rdbDir,int rdbVersion)318 int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vector<ObserverNode> &val,
319     const std::string &rdbDir, int rdbVersion)
320 {
321     Template tpl;
322     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
323         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
324             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
325         return E_TEMPLATE_NOT_EXIST;
326     }
327     DistributedData::StoreMetaData meta;
328     meta.dataDir = rdbDir;
329     meta.bundleName = key.bundleName;
330     auto delegate = DBDelegate::Create(meta, key.uri);
331     if (delegate == nullptr) {
332         ZLOGE("Create fail %{public}s %{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
333             key.bundleName.c_str());
334         return E_ERROR;
335     }
336     RdbChangeNode changeNode;
337     changeNode.uri_ = key.uri;
338     changeNode.templateId_.subscriberId_ = key.subscriberId;
339     changeNode.templateId_.bundleName_ = key.bundleName;
340     for (const auto &predicate : tpl.predicates_) {
341         std::string result = delegate->Query(predicate.selectSql_);
342         if (result.empty()) {
343             continue;
344         }
345         changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
346     }
347 
348     ZLOGI("emit, valSize: %{public}zu, dataSize:%{public}zu, uri:%{public}s,",
349         val.size(), changeNode.data_.size(), DistributedData::Anonymous::Change(changeNode.uri_).c_str());
350     for (const auto &callback : val) {
351         if (callback.enabled && callback.observer != nullptr) {
352             callback.observer->OnChangeFromRdb(changeNode);
353         }
354     }
355     return E_OK;
356 }
357 
Clear()358 void RdbSubscriberManager::Clear()
359 {
360     rdbCache_.Clear();
361 }
362 
Emit(const std::string & uri,int64_t subscriberId,const std::string & bundleName,std::shared_ptr<Context> context)363 void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId,
364     const std::string &bundleName, std::shared_ptr<Context> context)
365 {
366     if (!URIUtils::IsDataProxyURI(uri)) {
367         return;
368     }
369     if (context->calledSourceDir.empty()) {
370         LoadConfigDataInfoStrategy loadDataInfo;
371         loadDataInfo(context);
372     }
373     rdbCache_.ForEach([&uri, &context, &subscriberId, this](const Key &key, std::vector<ObserverNode> &val) {
374         if (key.uri != uri || key.subscriberId != subscriberId) {
375             return false;
376         }
377         Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
378         SetObserverNotifyOnEnabled(val);
379         return false;
380     });
381     Key executeKey(uri, subscriberId, bundleName);
382     SchedulerManager::GetInstance().Execute(executeKey, context->currentUserId,
383         context->calledSourceDir, context->version);
384 }
ObserverNode(const sptr<IDataProxyRdbObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId,uint32_t callerPid)385 RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
386     uint32_t firstCallerTokenId, uint32_t callerTokenId, uint32_t callerPid)
387     : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId), callerPid(callerPid)
388 {
389 }
390 } // namespace OHOS::DataShare
391 
392