1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16
17 #include "published_data_subscriber_manager.h"
18
19 #include <cinttypes>
20
21 #include "ipc_skeleton.h"
22 #include "general/load_config_data_info_strategy.h"
23 #include "log_print.h"
24 #include "published_data.h"
25 #include "uri_utils.h"
26 #include "utils/anonymous.h"
27
28 namespace OHOS::DataShare {
GetInstance()29 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
30 {
31 static PublishedDataSubscriberManager manager;
32 return manager;
33 }
34
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)35 int PublishedDataSubscriberManager::Add(
36 const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
37 {
38 publishedDataCache_.Compute(
39 key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
40 ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
41 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
42 value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID(),
43 IPCSkeleton::GetCallingPid());
44 return true;
45 });
46 return E_OK;
47 }
48
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)49 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
50 {
51 auto result =
52 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
53 std::vector<ObserverNode> &value) {
54 for (auto it = value.begin(); it != value.end();) {
55 if (it->firstCallerTokenId == firstCallerTokenId) {
56 ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
57 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
58 it = value.erase(it);
59 } else {
60 it++;
61 }
62 }
63 return !value.empty();
64 });
65 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
66 }
67
Delete(uint32_t callerTokenId,uint32_t callerPid)68 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId, uint32_t callerPid)
69 {
70 publishedDataCache_.EraseIf([&callerTokenId, &callerPid](const auto &key, std::vector<ObserverNode> &value) {
71 for (auto it = value.begin(); it != value.end();) {
72 if (it->callerTokenId == callerTokenId && it->callerPid == callerPid) {
73 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x, pid is %{public}d",
74 DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId, callerPid);
75 it = value.erase(it);
76 } else {
77 it++;
78 }
79 }
80 return value.empty();
81 });
82 }
83
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)84 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
85 {
86 auto result =
87 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
88 std::vector<ObserverNode> &value) {
89 for (auto it = value.begin(); it != value.end(); it++) {
90 if (it->firstCallerTokenId == firstCallerTokenId) {
91 it->enabled = false;
92 it->isNotifyOnEnabled = false;
93 }
94 }
95 return true;
96 });
97 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
98 }
99
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)100 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
101 {
102 auto result =
103 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
104 std::vector<ObserverNode> &value) {
105 for (auto it = value.begin(); it != value.end(); it++) {
106 if (it->firstCallerTokenId == firstCallerTokenId) {
107 it->enabled = true;
108 }
109 }
110 return true;
111 });
112 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
113 }
114
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)115 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
116 const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
117 {
118 int32_t status;
119 // key is bundleName, value is change node
120 std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
121 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
122 publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
123 const PublishedDataKey &key, std::vector<ObserverNode> &val) {
124 for (auto &data : keys) {
125 if (key != data || publishedResult.count(key) != 0) {
126 continue;
127 }
128 status = PublishedData::Query(
129 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
130 if (status != E_OK) {
131 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
132 data.subscriberId);
133 publishedResult.erase(key);
134 continue;
135 }
136 PutInto(callbacks, val, key, observer);
137 break;
138 }
139 return false;
140 });
141 PublishedDataChangeNode result;
142 for (auto &[callback, keys] : callbacks) {
143 result.datas_.clear();
144 for (auto &key : keys) {
145 if (publishedResult.count(key) != 0) {
146 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
147 }
148 }
149 if (result.datas_.empty()) {
150 continue;
151 }
152 result.ownerBundleName_ = ownerBundleName;
153 callback->OnChangeFromPublishedData(result);
154 }
155 }
156
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)157 void PublishedDataSubscriberManager::PutInto(
158 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
159 const std::vector<ObserverNode> &val, const PublishedDataKey &key,
160 const sptr<IDataProxyPublishedDataObserver> observer)
161 {
162 for (auto const &callback : val) {
163 if (callback.enabled && callback.observer != nullptr) {
164 // callback the observer, others do not call
165 if (observer != nullptr && callback.observer != observer) {
166 continue;
167 }
168 callbacks[callback.observer].emplace_back(key);
169 }
170 }
171 }
172
Clear()173 void PublishedDataSubscriberManager::Clear()
174 {
175 publishedDataCache_.Clear();
176 }
177
GetCount(const PublishedDataKey & key)178 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
179 {
180 int count = 0;
181 publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
182 count = static_cast<int>(value.size());
183 return true;
184 });
185 return count;
186 }
187
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)188 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
189 {
190 auto pair = publishedDataCache_.Find(key);
191 if (!pair.first) {
192 return false;
193 }
194 for (const auto &value : pair.second) {
195 if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
196 return true;
197 }
198 }
199 return false;
200 }
201
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)202 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
203 {
204 for (const auto &pkey : keys) {
205 publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
206 for (auto it = value.begin(); it != value.end(); it++) {
207 if (!it->enabled) {
208 it->isNotifyOnEnabled = true;
209 }
210 }
211 return true;
212 });
213 }
214 }
215
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)216 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
217 : key(key), bundleName(bundle), subscriberId(subscriberId)
218 {
219 /* private published data can use key as simple uri */
220 /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
221 /* if key is normal uri, bundleName is from uri */
222 if (URIUtils::IsDataProxyURI(key)) {
223 URIUtils::GetBundleNameFromProxyURI(key, bundleName);
224 }
225 }
226
operator <(const PublishedDataKey & rhs) const227 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
228 {
229 if (key < rhs.key) {
230 return true;
231 }
232 if (rhs.key < key) {
233 return false;
234 }
235 if (bundleName < rhs.bundleName) {
236 return true;
237 }
238 if (rhs.bundleName < bundleName) {
239 return false;
240 }
241 return subscriberId < rhs.subscriberId;
242 }
243
operator >(const PublishedDataKey & rhs) const244 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
245 {
246 return rhs < *this;
247 }
248
operator <=(const PublishedDataKey & rhs) const249 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
250 {
251 return !(rhs < *this);
252 }
253
operator >=(const PublishedDataKey & rhs) const254 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
255 {
256 return !(*this < rhs);
257 }
258
operator ==(const PublishedDataKey & rhs) const259 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
260 {
261 return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
262 }
263
operator !=(const PublishedDataKey & rhs) const264 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
265 {
266 return !(rhs == *this);
267 }
268
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId,uint32_t callerPid)269 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
270 uint32_t firstCallerTokenId, uint32_t callerTokenId, uint32_t callerPid)
271 : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId), callerPid(callerPid)
272 {
273 }
274 } // namespace OHOS::DataShare
275