1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21
22 #include "datashare_errno.h"
23 #include "datashare_log.h"
24 #include "datashare_template.h"
25
26 namespace OHOS::DataShare {
27 template<class Key, class Observer>
28 class CallbacksManager {
29 public:
30 struct ObserverNodeOnEnabled {
31 ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
32 : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
33 std::shared_ptr<Observer> observer_;
34 bool isNotifyOnEnabled_;
35 };
36
37 std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
38 const std::shared_ptr<Observer> observer,
39 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
40 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
41 std::vector<OperationResult> &)>);
42
43 std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
44 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
45 CallbacksManager::DefaultProcess);
46
47 std::vector<OperationResult> DelObservers(void *subscriber,
48 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
49 CallbacksManager::DefaultProcess);
50
51 std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
52 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
53 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
54
55 std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
56 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
57 CallbacksManager::DefaultProcess);
58
59 std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
60
61 int GetAllSubscriberSize();
62 int GetAllSubscriberSize(const Key &key);
63 std::vector<Key> GetKeys();
64 void SetObserversNotifiedOnEnabled(const Key &key);
65
66 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)67 static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
68 struct ObserverNode {
69 std::shared_ptr<Observer> observer_;
70 bool enabled_;
71 void *subscriber_;
72 bool isNotifyOnEnabled_;
ObserverNodeObserverNode73 ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
74 : observer_(observer), subscriber_(subscriber)
75 {
76 enabled_ = true;
77 isNotifyOnEnabled_ = false;
78 };
79 };
80 void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
81 std::vector<OperationResult> &result);
82 void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
83 std::recursive_mutex mutex_{};
84 std::map<Key, std::vector<ObserverNode>> callbacks_;
85 };
86
87 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)88 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
89 void *subscriber, const std::shared_ptr<Observer> observer,
90 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
91 std::function<void(const std::vector<Key> &,
92 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
93 {
94 std::vector<OperationResult> result;
95 std::vector<Key> firstRegisterKey;
96 std::vector<Key> localRegisterKey;
97 {
98 std::lock_guard<decltype(mutex_)> lck(mutex_);
99 for (auto &key : keys) {
100 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
101 if (enabledObservers.empty()) {
102 callbacks_[key].emplace_back(observer, subscriber);
103 firstRegisterKey.emplace_back(key);
104 continue;
105 }
106 localRegisterKey.emplace_back(key);
107 callbacks_[key].emplace_back(observer, subscriber);
108 result.emplace_back(key, E_OK);
109 }
110 }
111 if (!localRegisterKey.empty()) {
112 processOnLocalAdd(localRegisterKey, observer);
113 }
114 processOnFirstAdd(firstRegisterKey, observer, result);
115 return result;
116 }
117
118 template<class Key, class Observer>
GetKeys()119 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
120 {
121 std::vector<Key> keys;
122 {
123 std::lock_guard<decltype(mutex_)> lck(mutex_);
124 for (auto &it : callbacks_) {
125 keys.emplace_back(it.first);
126 }
127 }
128 return keys;
129 }
130
131 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)132 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
133 std::vector<OperationResult> &result)
134 {
135 for (auto &it : callbacks_) {
136 DelLocalObservers(it.first, subscriber, lastDelKeys, result);
137 }
138 }
139
140 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)141 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
142 std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
143 {
144 auto it = callbacks_.find(key);
145 if (it == callbacks_.end()) {
146 result.emplace_back(key, E_UNREGISTERED_EMPTY);
147 return;
148 }
149 std::vector<ObserverNode> &callbacks = it->second;
150 auto callbackIt = callbacks.begin();
151 while (callbackIt != callbacks.end()) {
152 if (callbackIt->subscriber_ != subscriber) {
153 callbackIt++;
154 continue;
155 }
156 callbackIt = callbacks.erase(callbackIt);
157 }
158 if (!it->second.empty()) {
159 result.emplace_back(key, E_OK);
160 return;
161 }
162 lastDelKeys.emplace_back(key);
163 }
164
165 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)166 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
167 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
168 {
169 std::vector<OperationResult> result;
170 std::vector<Key> lastDelKeys;
171 {
172 std::lock_guard<decltype(mutex_)> lck(mutex_);
173 DelLocalObservers(subscriber, lastDelKeys, result);
174 if (lastDelKeys.empty()) {
175 return result;
176 }
177 for (auto &key : lastDelKeys) {
178 callbacks_.erase(key);
179 }
180 }
181 processOnLastDel(lastDelKeys, result);
182 return result;
183 }
184
185 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)186 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
187 void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
188 {
189 std::vector<OperationResult> result;
190 std::vector<Key> lastDelKeys;
191 {
192 std::lock_guard<decltype(mutex_)> lck(mutex_);
193 for (auto &key : keys) {
194 DelLocalObservers(key, subscriber, lastDelKeys, result);
195 }
196 if (lastDelKeys.empty()) {
197 return result;
198 }
199 for (auto &key : lastDelKeys) {
200 callbacks_.erase(key);
201 }
202 }
203 processOnLastDel(lastDelKeys, result);
204 return result;
205 }
206
207 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)208 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
209 {
210 std::lock_guard<decltype(mutex_)> lck(mutex_);
211 auto it = callbacks_.find(inputKey);
212 if (it == callbacks_.end()) {
213 return std::vector<std::shared_ptr<Observer>>();
214 }
215 std::vector<std::shared_ptr<Observer>> results;
216 for (const auto &value : it->second) {
217 if (value.enabled_ && value.observer_ != nullptr) {
218 results.emplace_back(value.observer_);
219 }
220 }
221 return results;
222 }
223
224 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)225 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
226 const std::vector<Key> &keys, void *subscriber,
227 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
228 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
229 {
230 std::vector<OperationResult> result;
231 std::vector<Key> sendServiceKeys;
232 std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
233 {
234 std::lock_guard<decltype(mutex_)> lck(mutex_);
235 for (auto &key : keys) {
236 auto it = callbacks_.find(key);
237 if (it == callbacks_.end()) {
238 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
239 continue;
240 }
241
242 auto& allObservers = it->second;
243 auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
244 if (node.subscriber_ == subscriber) {
245 return true;
246 }
247 return false;
248 });
249 if (iterator == allObservers.end()) {
250 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
251 continue;
252 }
253 if (iterator->enabled_) {
254 result.emplace_back(key, E_OK);
255 continue;
256 }
257
258 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
259 if (enabledObservers.empty()) {
260 sendServiceKeys.emplace_back(key);
261 }
262 refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
263 iterator->enabled_ = true;
264 }
265 }
266 enableServiceFunc(sendServiceKeys, result);
267 enableLocalFunc(refreshObservers);
268
269 return result;
270 }
271
272 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)273 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
274 void *subscriber,
275 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
276 {
277 std::vector<OperationResult> result;
278 std::vector<Key> lastDisabledKeys;
279 {
280 std::lock_guard<decltype(mutex_)> lck(mutex_);
281 for (auto &key : keys) {
282 auto it = callbacks_.find(key);
283 if (it == callbacks_.end()) {
284 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
285 continue;
286 }
287 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
288 if (enabledObservers.empty()) {
289 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
290 continue;
291 }
292
293 bool hasDisabled = false;
294 for (auto &item : callbacks_[key]) {
295 if (item.subscriber_ == subscriber) {
296 item.enabled_ = false;
297 item.isNotifyOnEnabled_ = false;
298 hasDisabled = true;
299 }
300 }
301 if (!hasDisabled) {
302 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
303 continue;
304 }
305 enabledObservers = GetEnabledObservers(key);
306 if (!enabledObservers.empty()) {
307 result.emplace_back(key, E_OK);
308 continue;
309 }
310 lastDisabledKeys.emplace_back(key);
311 }
312 }
313 processOnLastDisable(lastDisabledKeys, result);
314 return result;
315 }
316
317 template<class Key, class Observer>
GetAllSubscriberSize()318 int CallbacksManager<Key, Observer>::GetAllSubscriberSize()
319 {
320 int count = 0;
321 std::lock_guard<decltype(mutex_)> lck(mutex_);
322 for (auto &[key, value] : callbacks_) {
323 count += GetAllSubscriberSize(key);
324 }
325 return count;
326 }
327
328 template<class Key, class Observer>
GetAllSubscriberSize(const Key & key)329 int CallbacksManager<Key, Observer>::GetAllSubscriberSize(const Key &key)
330 {
331 std::lock_guard<decltype(mutex_)> lck(mutex_);
332 auto it = callbacks_.find(key);
333 if (it == callbacks_.end()) {
334 return 0;
335 }
336 return it->second.size();
337 }
338
339 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)340 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
341 {
342 std::lock_guard<decltype(mutex_)> lck(mutex_);
343 auto it = callbacks_.find(key);
344 if (it == callbacks_.end()) {
345 return;
346 }
347 std::vector<ObserverNode> &callbacks = it->second;
348 uint32_t num = 0;
349 for (auto &observerNode : callbacks) {
350 if (!observerNode.enabled_) {
351 num++;
352 observerNode.isNotifyOnEnabled_ = true;
353 }
354 }
355 if (num > 0) {
356 LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
357 }
358 }
359 } // namespace OHOS::DataShare
360 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
361