1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "napi_subscriber_manager.h"
17
18 #include "datashare_log.h"
19
20 namespace OHOS {
21 namespace DataShare {
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)22 std::vector<OperationResult> NapiRdbSubscriberManager::AddObservers(napi_env env, napi_value callback,
23 const std::vector<std::string> &uris, const TemplateId &templateId)
24 {
25 auto datashareHelper = dataShareHelper_.lock();
26 if (datashareHelper == nullptr) {
27 LOG_ERROR("datashareHelper is nullptr");
28 return std::vector<OperationResult>();
29 }
30
31 std::vector<Key> keys;
32 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
33 keys.emplace_back(uri, templateId);
34 });
35 return BaseCallbacks::AddObservers(
36 keys, std::make_shared<Observer>(env, callback),
37 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38 Emit(localRegisterKeys, observer);
39 },
40 [&datashareHelper, &templateId, this](const std::vector<Key> &firstAddKeys,
41 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42 std::vector<std::string> firstAddUris;
43 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44 firstAddUris.emplace_back(result);
45 });
46 if (firstAddUris.empty()) {
47 return;
48 }
49 auto subResults =
50 datashareHelper->SubscribeRdbData(firstAddUris, templateId, [this](const RdbChangeNode &changeNode) {
51 Emit(changeNode);
52 });
53 std::vector<Key> failedKeys;
54 for (auto &subResult : subResults) {
55 opResult.emplace_back(subResult);
56 if (subResult.errCode_ != E_OK) {
57 failedKeys.emplace_back(subResult.key_, templateId);
58 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
59 }
60 }
61 if (failedKeys.size() > 0) {
62 BaseCallbacks::DelObservers(failedKeys, observer);
63 }
64 });
65 }
66
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)67 std::vector<OperationResult> NapiRdbSubscriberManager::DelObservers(napi_env env, napi_value callback,
68 const std::vector<std::string> &uris, const TemplateId &templateId)
69 {
70 auto dataShareHelper = dataShareHelper_.lock();
71 if (dataShareHelper == nullptr) {
72 LOG_ERROR("nativeManager is nullptr");
73 return std::vector<OperationResult>();
74 }
75 std::vector<Key> keys;
76 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
77 keys.emplace_back(uri, templateId);
78 });
79 return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
80 [&dataShareHelper, &templateId, this](const std::vector<Key> &lastDelKeys,
81 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
82 std::vector<std::string> lastDelUris;
83 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
84 lastChangeNodeMap_.erase(result);
85 lastDelUris.emplace_back(result);
86 });
87 if (lastDelUris.empty()) {
88 return;
89 }
90 auto unsubResult = dataShareHelper->UnsubscribeRdbData(lastDelUris, templateId);
91 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
92 });
93 }
94
Emit(const RdbChangeNode & changeNode)95 void NapiRdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
96 {
97 Key key(changeNode.uri_, changeNode.templateId_);
98 lastChangeNodeMap_[key] = changeNode;
99 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
100 for (auto &obs : callbacks) {
101 if (obs != nullptr) {
102 obs->OnChange(changeNode);
103 }
104 }
105 }
106
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)107 void NapiRdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
108 {
109 for (auto const &key : keys) {
110 auto it = lastChangeNodeMap_.find(key);
111 if (it != lastChangeNodeMap_.end()) {
112 observer->OnChange(it->second);
113 }
114 }
115 }
116
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)117 std::vector<OperationResult> NapiPublishedSubscriberManager::AddObservers(napi_env env, napi_value callback,
118 const std::vector<std::string> &uris, int64_t subscriberId)
119 {
120 auto dataShareHelper = dataShareHelper_.lock();
121 if (dataShareHelper == nullptr) {
122 LOG_ERROR("datashareHelper is nullptr");
123 return std::vector<OperationResult>();
124 }
125
126 std::vector<Key> keys;
127 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
128 keys.emplace_back(uri, subscriberId);
129 });
130 return BaseCallbacks::AddObservers(
131 keys, std::make_shared<Observer>(env, callback),
132 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
133 Emit(localRegisterKeys, observer);
134 },
135 [&dataShareHelper, &subscriberId, this](const std::vector<Key> &firstAddKeys,
136 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
137 std::vector<std::string> firstAddUris;
138 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
139 firstAddUris.emplace_back(result);
140 });
141 if (firstAddUris.empty()) {
142 return;
143 }
144 auto subResults = dataShareHelper->SubscribePublishedData(firstAddUris, subscriberId,
145 [this](const PublishedDataChangeNode &changeNode) {
146 Emit(changeNode);
147 });
148 std::vector<Key> failedKeys;
149 for (auto &subResult : subResults) {
150 opResult.emplace_back(subResult);
151 if (subResult.errCode_ != E_OK) {
152 failedKeys.emplace_back(subResult.key_, subscriberId);
153 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
154 }
155 }
156 if (failedKeys.size() > 0) {
157 BaseCallbacks::DelObservers(failedKeys, observer);
158 }
159 });
160 }
161
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)162 std::vector<OperationResult> NapiPublishedSubscriberManager::DelObservers(napi_env env, napi_value callback,
163 const std::vector<std::string> &uris, int64_t subscriberId)
164 {
165 auto dataShareHelper = dataShareHelper_.lock();
166 if (dataShareHelper == nullptr) {
167 LOG_ERROR("nativeManager is nullptr");
168 return std::vector<OperationResult>();
169 }
170 std::vector<Key> keys;
171 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
172 keys.emplace_back(uri, subscriberId);
173 });
174 return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
175 [&dataShareHelper, &subscriberId, &callback, &uris, this](const std::vector<Key> &lastDelKeys,
176 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
177 std::vector<std::string> lastDelUris;
178 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
179 lastChangeNodeMap_.erase(result);
180 lastDelUris.emplace_back(result);
181 });
182 if (lastDelUris.empty()) {
183 return;
184 }
185 auto unsubResult = dataShareHelper->UnsubscribePublishedData(lastDelUris, subscriberId);
186 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
187 });
188 }
189
Emit(const PublishedDataChangeNode & changeNode)190 void NapiPublishedSubscriberManager::Emit(const PublishedDataChangeNode &changeNode)
191 {
192 for (auto &data : changeNode.datas_) {
193 Key key(data.key_, data.subscriberId_);
194 lastChangeNodeMap_[key].datas_.clear();
195 }
196 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
197 for (auto &data : changeNode.datas_) {
198 Key key(data.key_, data.subscriberId_);
199 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
200 if (callbacks.empty()) {
201 LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
202 continue;
203 }
204 lastChangeNodeMap_[key].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
205 lastChangeNodeMap_[key].ownerBundleName_ = changeNode.ownerBundleName_;
206 for (auto const &obs : callbacks) {
207 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
208 }
209 }
210 for (auto &[callback, node] : results) {
211 node.ownerBundleName_ = changeNode.ownerBundleName_;
212 callback->OnChange(node);
213 }
214 }
215
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)216 void NapiPublishedSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
217 {
218 PublishedDataChangeNode node;
219 for (auto &key : keys) {
220 auto it = lastChangeNodeMap_.find(key);
221 if (it == lastChangeNodeMap_.end()) {
222 continue;
223 }
224 for (auto &data : it->second.datas_) {
225 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
226 }
227 node.ownerBundleName_ = it->second.ownerBundleName_;
228 }
229 observer->OnChange(node);
230 }
231 } // namespace DataShare
232 } // namespace OHOS
233