1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "napi_subscriber_manager.h"
17 
18 #include "datashare_log.h"
19 
20 namespace OHOS {
21 namespace DataShare {
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)22 std::vector<OperationResult> NapiRdbSubscriberManager::AddObservers(napi_env env, napi_value callback,
23     const std::vector<std::string> &uris, const TemplateId &templateId)
24 {
25     auto datashareHelper = dataShareHelper_.lock();
26     if (datashareHelper == nullptr) {
27         LOG_ERROR("datashareHelper is nullptr");
28         return std::vector<OperationResult>();
29     }
30 
31     std::vector<Key> keys;
32     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
33         keys.emplace_back(uri, templateId);
34     });
35     return BaseCallbacks::AddObservers(
36         keys, std::make_shared<Observer>(env, callback),
37         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38             Emit(localRegisterKeys, observer);
39         },
40         [&datashareHelper, &templateId, this](const std::vector<Key> &firstAddKeys,
41             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42             std::vector<std::string> firstAddUris;
43             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44                 firstAddUris.emplace_back(result);
45             });
46             if (firstAddUris.empty()) {
47                 return;
48             }
49             auto subResults =
50                 datashareHelper->SubscribeRdbData(firstAddUris, templateId, [this](const RdbChangeNode &changeNode) {
51                     Emit(changeNode);
52                 });
53             std::vector<Key> failedKeys;
54             for (auto &subResult : subResults) {
55                 opResult.emplace_back(subResult);
56                 if (subResult.errCode_ != E_OK) {
57                     failedKeys.emplace_back(subResult.key_, templateId);
58                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
59                 }
60             }
61             if (failedKeys.size() > 0) {
62                 BaseCallbacks::DelObservers(failedKeys, observer);
63             }
64         });
65 }
66 
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)67 std::vector<OperationResult> NapiRdbSubscriberManager::DelObservers(napi_env env, napi_value callback,
68     const std::vector<std::string> &uris, const TemplateId &templateId)
69 {
70     auto dataShareHelper = dataShareHelper_.lock();
71     if (dataShareHelper == nullptr) {
72         LOG_ERROR("nativeManager is nullptr");
73         return std::vector<OperationResult>();
74     }
75     std::vector<Key> keys;
76     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
77         keys.emplace_back(uri, templateId);
78     });
79     return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
80         [&dataShareHelper, &templateId, this](const std::vector<Key> &lastDelKeys,
81             const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
82             std::vector<std::string> lastDelUris;
83             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
84                 lastChangeNodeMap_.erase(result);
85                 lastDelUris.emplace_back(result);
86             });
87             if (lastDelUris.empty()) {
88                 return;
89             }
90             auto unsubResult = dataShareHelper->UnsubscribeRdbData(lastDelUris, templateId);
91             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
92         });
93 }
94 
Emit(const RdbChangeNode & changeNode)95 void NapiRdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
96 {
97     Key key(changeNode.uri_, changeNode.templateId_);
98     lastChangeNodeMap_[key] = changeNode;
99     auto callbacks = BaseCallbacks::GetEnabledObservers(key);
100     for (auto &obs : callbacks) {
101         if (obs != nullptr) {
102             obs->OnChange(changeNode);
103         }
104     }
105 }
106 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)107 void NapiRdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
108 {
109     for (auto const &key : keys) {
110         auto it = lastChangeNodeMap_.find(key);
111         if (it != lastChangeNodeMap_.end()) {
112             observer->OnChange(it->second);
113         }
114     }
115 }
116 
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)117 std::vector<OperationResult> NapiPublishedSubscriberManager::AddObservers(napi_env env, napi_value callback,
118     const std::vector<std::string> &uris, int64_t subscriberId)
119 {
120     auto dataShareHelper = dataShareHelper_.lock();
121     if (dataShareHelper == nullptr) {
122         LOG_ERROR("datashareHelper is nullptr");
123         return std::vector<OperationResult>();
124     }
125 
126     std::vector<Key> keys;
127     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
128         keys.emplace_back(uri, subscriberId);
129     });
130     return BaseCallbacks::AddObservers(
131         keys, std::make_shared<Observer>(env, callback),
132         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
133             Emit(localRegisterKeys, observer);
134         },
135         [&dataShareHelper, &subscriberId, this](const std::vector<Key> &firstAddKeys,
136             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
137             std::vector<std::string> firstAddUris;
138             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
139                 firstAddUris.emplace_back(result);
140             });
141             if (firstAddUris.empty()) {
142                 return;
143             }
144             auto subResults = dataShareHelper->SubscribePublishedData(firstAddUris, subscriberId,
145                 [this](const PublishedDataChangeNode &changeNode) {
146                     Emit(changeNode);
147                 });
148             std::vector<Key> failedKeys;
149             for (auto &subResult : subResults) {
150                 opResult.emplace_back(subResult);
151                 if (subResult.errCode_ != E_OK) {
152                     failedKeys.emplace_back(subResult.key_, subscriberId);
153                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
154                 }
155             }
156             if (failedKeys.size() > 0) {
157                 BaseCallbacks::DelObservers(failedKeys, observer);
158             }
159         });
160 }
161 
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)162 std::vector<OperationResult> NapiPublishedSubscriberManager::DelObservers(napi_env env, napi_value callback,
163     const std::vector<std::string> &uris, int64_t subscriberId)
164 {
165     auto dataShareHelper = dataShareHelper_.lock();
166     if (dataShareHelper == nullptr) {
167         LOG_ERROR("nativeManager is nullptr");
168         return std::vector<OperationResult>();
169     }
170     std::vector<Key> keys;
171     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
172         keys.emplace_back(uri, subscriberId);
173     });
174     return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
175         [&dataShareHelper, &subscriberId, &callback, &uris, this](const std::vector<Key> &lastDelKeys,
176             const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
177             std::vector<std::string> lastDelUris;
178             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
179                 lastChangeNodeMap_.erase(result);
180                 lastDelUris.emplace_back(result);
181             });
182             if (lastDelUris.empty()) {
183                 return;
184             }
185             auto unsubResult = dataShareHelper->UnsubscribePublishedData(lastDelUris, subscriberId);
186             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
187         });
188 }
189 
Emit(const PublishedDataChangeNode & changeNode)190 void NapiPublishedSubscriberManager::Emit(const PublishedDataChangeNode &changeNode)
191 {
192     for (auto &data : changeNode.datas_) {
193         Key key(data.key_, data.subscriberId_);
194         lastChangeNodeMap_[key].datas_.clear();
195     }
196     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
197     for (auto &data : changeNode.datas_) {
198         Key key(data.key_, data.subscriberId_);
199         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
200         if (callbacks.empty()) {
201             LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
202             continue;
203         }
204         lastChangeNodeMap_[key].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
205         lastChangeNodeMap_[key].ownerBundleName_ = changeNode.ownerBundleName_;
206         for (auto const &obs : callbacks) {
207             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
208         }
209     }
210     for (auto &[callback, node] : results) {
211         node.ownerBundleName_ = changeNode.ownerBundleName_;
212         callback->OnChange(node);
213     }
214 }
215 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)216 void NapiPublishedSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
217 {
218     PublishedDataChangeNode node;
219     for (auto &key : keys) {
220         auto it = lastChangeNodeMap_.find(key);
221         if (it == lastChangeNodeMap_.end()) {
222             continue;
223         }
224         for (auto &data : it->second.datas_) {
225             node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
226         }
227         node.ownerBundleName_ = it->second.ownerBundleName_;
228     }
229     observer->OnChange(node);
230 }
231 } // namespace DataShare
232 } // namespace OHOS
233