1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28 static RdbSubscriberManager manager;
29 return manager;
30 }
31
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34 serviceCallback_ = nullptr;
35 }
36
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)37 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
38 std::shared_ptr<DataShareServiceProxy> proxy,
39 const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
40 {
41 if (proxy == nullptr) {
42 LOG_ERROR("proxy is nullptr");
43 return std::vector<OperationResult>();
44 }
45 std::vector<Key> keys;
46 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
47 keys.emplace_back(uri, templateId);
48 });
49 return BaseCallbacks::AddObservers(
50 keys, subscriber, std::make_shared<Observer>(callback),
51 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
52 Emit(localRegisterKeys, observer);
53 },
54 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
55 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
56 std::vector<std::string> firstAddUris;
57 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
58 firstAddUris.emplace_back(result);
59 });
60 if (firstAddUris.empty()) {
61 return;
62 }
63
64 Init();
65 auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
66 std::vector<Key> failedKeys;
67 for (auto &subResult : subResults) {
68 opResult.emplace_back(subResult);
69 if (subResult.errCode_ != E_OK) {
70 failedKeys.emplace_back(subResult.key_, templateId);
71 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
72 }
73 }
74 if (!failedKeys.empty()) {
75 BaseCallbacks::DelObservers(failedKeys, subscriber);
76 }
77 Destroy();
78 });
79 }
80
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)81 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
82 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
83 {
84 if (proxy == nullptr) {
85 LOG_ERROR("proxy is nullptr");
86 return std::vector<OperationResult>();
87 }
88 if (uris.empty()) {
89 return DelObservers(subscriber, proxy);
90 }
91
92 std::vector<Key> keys;
93 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
94 keys.emplace_back(uri, templateId);
95 });
96 return BaseCallbacks::DelObservers(keys, subscriber,
97 [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
98 std::vector<std::string> lastDelUris;
99 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
100 lastDelUris.emplace_back(result);
101 lastChangeNodeMap_.erase(result);
102 });
103 if (lastDelUris.empty()) {
104 return;
105 }
106 auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
107 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
108 Destroy();
109 });
110 }
111
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)112 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
113 std::shared_ptr<DataShareServiceProxy> proxy)
114 {
115 if (proxy == nullptr) {
116 LOG_ERROR("proxy is nullptr");
117 return std::vector<OperationResult>();
118 }
119 return BaseCallbacks::DelObservers(subscriber,
120 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
121 // delete all obs by subscriber
122 for (const auto &key : lastDelKeys) {
123 lastChangeNodeMap_.erase(key);
124 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
125 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
126 }
127 Destroy();
128 });
129 }
130
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)131 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
132 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
133 {
134 if (proxy == nullptr) {
135 LOG_ERROR("proxy is nullptr");
136 return std::vector<OperationResult>();
137 }
138 std::vector<Key> keys;
139 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
140 keys.emplace_back(uri, templateId);
141 });
142 return BaseCallbacks::EnableObservers(keys, subscriber,
143 [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
144 EmitOnEnable(obsMap);
145 },
146 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
147 std::vector<OperationResult> &opResult) {
148 std::vector<std::string> firstAddUris;
149 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
150 firstAddUris.emplace_back(result);
151 });
152 if (firstAddUris.empty()) {
153 return;
154 }
155 auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
156 std::vector<Key> failedKeys;
157 for (auto &subResult : subResults) {
158 opResult.emplace_back(subResult);
159 if (subResult.errCode_ != E_OK) {
160 failedKeys.emplace_back(subResult.key_, templateId);
161 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
162 }
163 }
164 if (!failedKeys.empty()) {
165 BaseCallbacks::DisableObservers(failedKeys, subscriber);
166 }
167 });
168 }
169
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)170 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
171 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
172 {
173 if (proxy == nullptr) {
174 LOG_ERROR("proxy is nullptr");
175 return std::vector<OperationResult>();
176 }
177 std::vector<Key> keys;
178 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
179 keys.emplace_back(uri, templateId);
180 });
181 return BaseCallbacks::DisableObservers(keys, subscriber,
182 [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
183 std::vector<std::string> lastDisabledUris;
184 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
185 lastDisabledUris.emplace_back(result);
186 });
187 if (lastDisabledUris.empty()) {
188 return;
189 }
190
191 auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
192 opResult.insert(opResult.end(), results.begin(), results.end());
193 });
194 }
195
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)196 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
197 {
198 if (proxy == nullptr) {
199 LOG_ERROR("proxy is nullptr");
200 return;
201 }
202 std::map<TemplateId, std::vector<std::string>> keysMap;
203 std::vector<Key> keys = CallbacksManager::GetKeys();
204 for (const auto& key : keys) {
205 keysMap[key.templateId_].emplace_back(key.uri_);
206 }
207 for (const auto& [templateId, uris] : keysMap) {
208 auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
209 for (const auto& result : results) {
210 if (result.errCode_ != E_OK) {
211 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d", result.key_.c_str(),
212 result.errCode_);
213 }
214 }
215 }
216 }
217
Emit(const RdbChangeNode & changeNode)218 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
219 {
220 RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
221 lastChangeNodeMap_[key] = changeNode;
222 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
223 for (auto &obs : callbacks) {
224 if (obs != nullptr) {
225 LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
226 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
227 obs->OnChange(changeNode);
228 }
229 }
230 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
231 }
232
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)233 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
234 {
235 for (auto const &key : keys) {
236 auto it = lastChangeNodeMap_.find(key);
237 if (it != lastChangeNodeMap_.end()) {
238 observer->OnChange(it->second);
239 }
240 }
241 }
242
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)243 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
244 {
245 for (auto &[key, obsVector] : obsMap) {
246 auto it = lastChangeNodeMap_.find(key);
247 if (it == lastChangeNodeMap_.end()) {
248 continue;
249 }
250 for (auto &obs : obsVector) {
251 if (obs.isNotifyOnEnabled_) {
252 obs.observer_->OnChange(it->second);
253 }
254 }
255 }
256 }
257
Init()258 bool RdbSubscriberManager::Init()
259 {
260 if (serviceCallback_ == nullptr) {
261 LOG_INFO("callback init");
262 serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
263 Emit(changeNode);
264 });
265 }
266 return true;
267 }
268
Destroy()269 void RdbSubscriberManager::Destroy()
270 {
271 if (BaseCallbacks::GetAllSubscriberSize() == 0) {
272 if (serviceCallback_ != nullptr) {
273 serviceCallback_->ClearCallback();
274 }
275 LOG_INFO("no valid subscriber, delete callback");
276 serviceCallback_ = nullptr;
277 }
278 }
279
RdbObserver(const RdbCallback & callback)280 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
281
OnChange(const RdbChangeNode & changeNode)282 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
283 {
284 callback_(changeNode);
285 }
286
operator ==(const RdbObserver & rhs) const287 bool RdbObserver::operator==(const RdbObserver &rhs) const
288 {
289 return false;
290 }
291
operator !=(const RdbObserver & rhs) const292 bool RdbObserver::operator!=(const RdbObserver &rhs) const
293 {
294 return !(rhs == *this);
295 }
296 } // namespace DataShare
297 } // namespace OHOS
298