1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21 
22 #include "datashare_errno.h"
23 #include "datashare_log.h"
24 #include "datashare_template.h"
25 
26 namespace OHOS::DataShare {
27 template<class Key, class Observer>
28 class CallbacksManager {
29 public:
30     struct ObserverNodeOnEnabled {
31         ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
32             : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
33         std::shared_ptr<Observer> observer_;
34         bool isNotifyOnEnabled_;
35     };
36 
37     std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
38         const std::shared_ptr<Observer> observer,
39         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
40         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
41             std::vector<OperationResult> &)>);
42 
43     std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
44         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
45             CallbacksManager::DefaultProcess);
46 
47     std::vector<OperationResult> DelObservers(void *subscriber,
48         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
49             CallbacksManager::DefaultProcess);
50 
51     std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
52         std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
53         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
54 
55     std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
56         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
57             CallbacksManager::DefaultProcess);
58 
59     std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
60 
61     int GetAllSubscriberSize();
62     int GetAllSubscriberSize(const Key &key);
63     std::vector<Key> GetKeys();
64     void SetObserversNotifiedOnEnabled(const Key &key);
65 
66 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)67     static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
68     struct ObserverNode {
69         std::shared_ptr<Observer> observer_;
70         bool enabled_;
71         void *subscriber_;
72         bool isNotifyOnEnabled_;
ObserverNodeObserverNode73         ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
74             : observer_(observer), subscriber_(subscriber)
75         {
76             enabled_ = true;
77             isNotifyOnEnabled_ = false;
78         };
79     };
80     void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
81         std::vector<OperationResult> &result);
82     void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
83     std::recursive_mutex mutex_{};
84     std::map<Key, std::vector<ObserverNode>> callbacks_;
85 };
86 
87 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)88 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
89     void *subscriber, const std::shared_ptr<Observer> observer,
90     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
91     std::function<void(const std::vector<Key> &,
92         const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
93 {
94     std::vector<OperationResult> result;
95     std::vector<Key> firstRegisterKey;
96     std::vector<Key> localRegisterKey;
97     {
98         std::lock_guard<decltype(mutex_)> lck(mutex_);
99         for (auto &key : keys) {
100             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
101             if (enabledObservers.empty()) {
102                 callbacks_[key].emplace_back(observer, subscriber);
103                 firstRegisterKey.emplace_back(key);
104                 continue;
105             }
106             localRegisterKey.emplace_back(key);
107             callbacks_[key].emplace_back(observer, subscriber);
108             result.emplace_back(key, E_OK);
109         }
110     }
111     if (!localRegisterKey.empty()) {
112         processOnLocalAdd(localRegisterKey, observer);
113     }
114     processOnFirstAdd(firstRegisterKey, observer, result);
115     return result;
116 }
117 
118 template<class Key, class Observer>
GetKeys()119 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
120 {
121     std::vector<Key> keys;
122     {
123         std::lock_guard<decltype(mutex_)> lck(mutex_);
124         for (auto &it : callbacks_) {
125             keys.emplace_back(it.first);
126         }
127     }
128     return keys;
129 }
130 
131 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)132 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
133     std::vector<OperationResult> &result)
134 {
135     for (auto &it : callbacks_) {
136         DelLocalObservers(it.first, subscriber, lastDelKeys, result);
137     }
138 }
139 
140 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)141 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
142     std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
143 {
144     auto it = callbacks_.find(key);
145     if (it == callbacks_.end()) {
146         result.emplace_back(key, E_UNREGISTERED_EMPTY);
147         return;
148     }
149     std::vector<ObserverNode> &callbacks = it->second;
150     auto callbackIt = callbacks.begin();
151     while (callbackIt != callbacks.end()) {
152         if (callbackIt->subscriber_ != subscriber) {
153             callbackIt++;
154             continue;
155         }
156         callbackIt = callbacks.erase(callbackIt);
157     }
158     if (!it->second.empty()) {
159         result.emplace_back(key, E_OK);
160         return;
161     }
162     lastDelKeys.emplace_back(key);
163 }
164 
165 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)166 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
167     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
168 {
169     std::vector<OperationResult> result;
170     std::vector<Key> lastDelKeys;
171     {
172         std::lock_guard<decltype(mutex_)> lck(mutex_);
173         DelLocalObservers(subscriber, lastDelKeys, result);
174         if (lastDelKeys.empty()) {
175             return result;
176         }
177         for (auto &key : lastDelKeys) {
178             callbacks_.erase(key);
179         }
180     }
181     processOnLastDel(lastDelKeys, result);
182     return result;
183 }
184 
185 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)186 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
187     void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
188 {
189     std::vector<OperationResult> result;
190     std::vector<Key> lastDelKeys;
191     {
192         std::lock_guard<decltype(mutex_)> lck(mutex_);
193         for (auto &key : keys) {
194             DelLocalObservers(key, subscriber, lastDelKeys, result);
195         }
196         if (lastDelKeys.empty()) {
197             return result;
198         }
199         for (auto &key : lastDelKeys) {
200             callbacks_.erase(key);
201         }
202     }
203     processOnLastDel(lastDelKeys, result);
204     return result;
205 }
206 
207 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)208 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
209 {
210     std::lock_guard<decltype(mutex_)> lck(mutex_);
211     auto it = callbacks_.find(inputKey);
212     if (it == callbacks_.end()) {
213         return std::vector<std::shared_ptr<Observer>>();
214     }
215     std::vector<std::shared_ptr<Observer>> results;
216     for (const auto &value : it->second) {
217         if (value.enabled_ && value.observer_ != nullptr) {
218             results.emplace_back(value.observer_);
219         }
220     }
221     return results;
222 }
223 
224 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)225 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
226     const std::vector<Key> &keys, void *subscriber,
227     std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
228     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
229 {
230     std::vector<OperationResult> result;
231     std::vector<Key> sendServiceKeys;
232     std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
233     {
234         std::lock_guard<decltype(mutex_)> lck(mutex_);
235         for (auto &key : keys) {
236             auto it = callbacks_.find(key);
237             if (it == callbacks_.end()) {
238                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
239                 continue;
240             }
241 
242             auto& allObservers = it->second;
243             auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
244                 if (node.subscriber_ == subscriber) {
245                     return true;
246                 }
247                 return false;
248             });
249             if (iterator == allObservers.end()) {
250                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
251                 continue;
252             }
253             if (iterator->enabled_) {
254                 result.emplace_back(key, E_OK);
255                 continue;
256             }
257 
258             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
259             if (enabledObservers.empty()) {
260                 sendServiceKeys.emplace_back(key);
261             }
262             refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
263             iterator->enabled_ = true;
264         }
265     }
266     enableServiceFunc(sendServiceKeys, result);
267     enableLocalFunc(refreshObservers);
268 
269     return result;
270 }
271 
272 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)273 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
274     void *subscriber,
275     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
276 {
277     std::vector<OperationResult> result;
278     std::vector<Key> lastDisabledKeys;
279     {
280         std::lock_guard<decltype(mutex_)> lck(mutex_);
281         for (auto &key : keys) {
282             auto it = callbacks_.find(key);
283             if (it == callbacks_.end()) {
284                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
285                 continue;
286             }
287             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
288             if (enabledObservers.empty()) {
289                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
290                 continue;
291             }
292 
293             bool hasDisabled = false;
294             for (auto &item : callbacks_[key]) {
295                 if (item.subscriber_ == subscriber) {
296                     item.enabled_ = false;
297                     item.isNotifyOnEnabled_ = false;
298                     hasDisabled = true;
299                 }
300             }
301             if (!hasDisabled) {
302                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
303                 continue;
304             }
305             enabledObservers = GetEnabledObservers(key);
306             if (!enabledObservers.empty()) {
307                 result.emplace_back(key, E_OK);
308                 continue;
309             }
310             lastDisabledKeys.emplace_back(key);
311         }
312     }
313     processOnLastDisable(lastDisabledKeys, result);
314     return result;
315 }
316 
317 template<class Key, class Observer>
GetAllSubscriberSize()318 int CallbacksManager<Key, Observer>::GetAllSubscriberSize()
319 {
320     int count = 0;
321     std::lock_guard<decltype(mutex_)> lck(mutex_);
322     for (auto &[key, value] : callbacks_) {
323         count += GetAllSubscriberSize(key);
324     }
325     return count;
326 }
327 
328 template<class Key, class Observer>
GetAllSubscriberSize(const Key & key)329 int CallbacksManager<Key, Observer>::GetAllSubscriberSize(const Key &key)
330 {
331     std::lock_guard<decltype(mutex_)> lck(mutex_);
332     auto it = callbacks_.find(key);
333     if (it == callbacks_.end()) {
334         return 0;
335     }
336     return it->second.size();
337 }
338 
339 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)340 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
341 {
342     std::lock_guard<decltype(mutex_)> lck(mutex_);
343     auto it = callbacks_.find(key);
344     if (it == callbacks_.end()) {
345         return;
346     }
347     std::vector<ObserverNode> &callbacks = it->second;
348     uint32_t num = 0;
349     for (auto &observerNode : callbacks) {
350         if (!observerNode.enabled_) {
351             num++;
352             observerNode.isNotifyOnEnabled_ = true;
353         }
354     }
355     if (num > 0) {
356         LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
357     }
358 }
359 } // namespace OHOS::DataShare
360 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
361