1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16 
17 #include "published_data_subscriber_manager.h"
18 
19 #include <cinttypes>
20 
21 #include "ipc_skeleton.h"
22 #include "general/load_config_data_info_strategy.h"
23 #include "log_print.h"
24 #include "published_data.h"
25 #include "uri_utils.h"
26 #include "utils/anonymous.h"
27 
28 namespace OHOS::DataShare {
GetInstance()29 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
30 {
31     static PublishedDataSubscriberManager manager;
32     return manager;
33 }
34 
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)35 int PublishedDataSubscriberManager::Add(
36     const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
37 {
38     publishedDataCache_.Compute(
39         key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
40             ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
41                 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
42             value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID(),
43                 IPCSkeleton::GetCallingPid());
44             return true;
45         });
46     return E_OK;
47 }
48 
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)49 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
50 {
51     auto result =
52         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
53             std::vector<ObserverNode> &value) {
54             for (auto it = value.begin(); it != value.end();) {
55                 if (it->firstCallerTokenId == firstCallerTokenId) {
56                     ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
57                         DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
58                     it = value.erase(it);
59                 } else {
60                     it++;
61                 }
62             }
63             return !value.empty();
64         });
65     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
66 }
67 
Delete(uint32_t callerTokenId,uint32_t callerPid)68 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId, uint32_t callerPid)
69 {
70     publishedDataCache_.EraseIf([&callerTokenId, &callerPid](const auto &key, std::vector<ObserverNode> &value) {
71         for (auto it = value.begin(); it != value.end();) {
72             if (it->callerTokenId == callerTokenId && it->callerPid == callerPid) {
73                 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x, pid is %{public}d",
74                     DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId, callerPid);
75                 it = value.erase(it);
76             } else {
77                 it++;
78             }
79         }
80         return value.empty();
81     });
82 }
83 
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)84 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
85 {
86     auto result =
87         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
88             std::vector<ObserverNode> &value) {
89             for (auto it = value.begin(); it != value.end(); it++) {
90                 if (it->firstCallerTokenId == firstCallerTokenId) {
91                     it->enabled = false;
92                     it->isNotifyOnEnabled = false;
93                 }
94             }
95             return true;
96         });
97     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
98 }
99 
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)100 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
101 {
102     auto result =
103         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
104             std::vector<ObserverNode> &value) {
105             for (auto it = value.begin(); it != value.end(); it++) {
106                 if (it->firstCallerTokenId == firstCallerTokenId) {
107                     it->enabled = true;
108                 }
109             }
110             return true;
111         });
112     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
113 }
114 
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)115 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
116     const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
117 {
118     int32_t status;
119     // key is bundleName, value is change node
120     std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
121     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
122     publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
123         const PublishedDataKey &key, std::vector<ObserverNode> &val) {
124         for (auto &data : keys) {
125             if (key != data || publishedResult.count(key) != 0) {
126                 continue;
127             }
128             status = PublishedData::Query(
129                 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
130             if (status != E_OK) {
131                 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
132                     data.subscriberId);
133                 publishedResult.erase(key);
134                 continue;
135             }
136             PutInto(callbacks, val, key, observer);
137             break;
138         }
139         return false;
140     });
141     PublishedDataChangeNode result;
142     for (auto &[callback, keys] : callbacks) {
143         result.datas_.clear();
144         for (auto &key : keys) {
145             if (publishedResult.count(key) != 0) {
146                 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
147             }
148         }
149         if (result.datas_.empty()) {
150             continue;
151         }
152         result.ownerBundleName_ = ownerBundleName;
153         callback->OnChangeFromPublishedData(result);
154     }
155 }
156 
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)157 void PublishedDataSubscriberManager::PutInto(
158     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
159     const std::vector<ObserverNode> &val, const PublishedDataKey &key,
160     const sptr<IDataProxyPublishedDataObserver> observer)
161 {
162     for (auto const &callback : val) {
163         if (callback.enabled && callback.observer != nullptr) {
164             // callback the observer, others do not call
165             if (observer != nullptr && callback.observer != observer) {
166                 continue;
167             }
168             callbacks[callback.observer].emplace_back(key);
169         }
170     }
171 }
172 
Clear()173 void PublishedDataSubscriberManager::Clear()
174 {
175     publishedDataCache_.Clear();
176 }
177 
GetCount(const PublishedDataKey & key)178 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
179 {
180     int count = 0;
181     publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
182         count = static_cast<int>(value.size());
183         return true;
184     });
185     return count;
186 }
187 
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)188 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
189 {
190     auto pair = publishedDataCache_.Find(key);
191     if (!pair.first) {
192         return false;
193     }
194     for (const auto &value : pair.second) {
195         if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
196             return true;
197         }
198     }
199     return false;
200 }
201 
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)202 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
203 {
204     for (const auto &pkey : keys) {
205         publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
206             for (auto it = value.begin(); it != value.end(); it++) {
207                 if (!it->enabled) {
208                     it->isNotifyOnEnabled = true;
209                 }
210             }
211             return true;
212         });
213     }
214 }
215 
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)216 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
217     : key(key), bundleName(bundle), subscriberId(subscriberId)
218 {
219     /* private published data can use key as simple uri */
220     /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
221     /* if key is normal uri, bundleName is from uri */
222     if (URIUtils::IsDataProxyURI(key)) {
223         URIUtils::GetBundleNameFromProxyURI(key, bundleName);
224     }
225 }
226 
operator <(const PublishedDataKey & rhs) const227 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
228 {
229     if (key < rhs.key) {
230         return true;
231     }
232     if (rhs.key < key) {
233         return false;
234     }
235     if (bundleName < rhs.bundleName) {
236         return true;
237     }
238     if (rhs.bundleName < bundleName) {
239         return false;
240     }
241     return subscriberId < rhs.subscriberId;
242 }
243 
operator >(const PublishedDataKey & rhs) const244 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
245 {
246     return rhs < *this;
247 }
248 
operator <=(const PublishedDataKey & rhs) const249 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
250 {
251     return !(rhs < *this);
252 }
253 
operator >=(const PublishedDataKey & rhs) const254 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
255 {
256     return !(*this < rhs);
257 }
258 
operator ==(const PublishedDataKey & rhs) const259 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
260 {
261     return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
262 }
263 
operator !=(const PublishedDataKey & rhs) const264 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
265 {
266     return !(rhs == *this);
267 }
268 
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId,uint32_t callerPid)269 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
270     uint32_t firstCallerTokenId, uint32_t callerTokenId, uint32_t callerPid)
271     : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId), callerPid(callerPid)
272 {
273 }
274 } // namespace OHOS::DataShare
275