1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LDBPROJ_NAPI_CALLBACKS_MANAGER_H
17 #define LDBPROJ_NAPI_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21 
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24 
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class NapiCallbacksManager {
28 public:
29     std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
30         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
31         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
32             std::vector<OperationResult> &)>);
33 
34     std::vector<OperationResult> DelObservers(const std::vector<Key> &keys,
35         const std::shared_ptr<Observer> observer = nullptr,
36         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
37             std::vector<OperationResult> &)> processOnLastDel = NapiCallbacksManager::DefaultProcess);
38 
39     std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
40 
41     int GetEnabledSubscriberSize();
42 
43 private:
DefaultProcess(const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)44     static void DefaultProcess(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
45         std::vector<OperationResult> &){};
46     struct ObserverNode {
47         std::shared_ptr<Observer> observer_;
48         bool enabled_;
ObserverNodeObserverNode49         ObserverNode(const std::shared_ptr<Observer> &observer) : observer_(observer)
50         {
51             enabled_ = true;
52         };
ObserverNodeObserverNode53         ObserverNode(const std::shared_ptr<Observer> &observer, bool enabled)
54             : observer_(observer), enabled_(enabled){};
55     };
56     bool IsRegistered(const Observer &, const std::vector<ObserverNode> &);
57     std::recursive_mutex mutex_{};
58     std::map<Key, std::vector<ObserverNode>> callbacks_;
59 };
60 
61 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)62 std::vector<OperationResult> NapiCallbacksManager<Key, Observer>::AddObservers(
63     const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
64     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
65     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
66         std::vector<OperationResult> &)> processOnFirstAdd)
67 {
68     std::vector<OperationResult> result;
69     std::vector<Key> firstRegisterKey;
70     std::vector<Key> localRegisterKey;
71     {
72         std::lock_guard<decltype(mutex_)> lck(mutex_);
73         for (auto &key : keys) {
74             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
75             if (enabledObservers.empty()) {
76                 callbacks_[key].emplace_back(ObserverNode(observer));
77                 firstRegisterKey.emplace_back(key);
78                 continue;
79             }
80             if (IsRegistered(*observer, callbacks_[key])) {
81                 result.emplace_back(static_cast<std::string>(key), E_REGISTERED_REPEATED);
82                 continue;
83             }
84             localRegisterKey.emplace_back(key);
85             callbacks_[key].emplace_back(observer);
86             result.emplace_back(key, E_OK);
87         }
88     }
89     if (!localRegisterKey.empty()) {
90         processOnLocalAdd(localRegisterKey, observer);
91     }
92     processOnFirstAdd(firstRegisterKey, observer, result);
93     return result;
94 }
95 
96 template<class Key, class Observer>
IsRegistered(const Observer & observer,const std::vector<ObserverNode> & observers)97 bool NapiCallbacksManager<Key, Observer>::IsRegistered(const Observer &observer,
98     const std::vector<ObserverNode> &observers)
99 {
100     for (auto &item : observers) {
101         if (*(item.observer_) == observer) {
102             return true;
103         }
104     }
105     return false;
106 }
107 
108 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> &,std::vector<OperationResult> &)> processOnLastDel)109 std::vector<OperationResult> NapiCallbacksManager<Key, Observer>::DelObservers(
110     const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
111     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &, std::vector<OperationResult> &)>
112         processOnLastDel)
113 {
114     std::vector<OperationResult> result;
115     std::vector<Key> lastDelKeys;
116     {
117         std::lock_guard<decltype(mutex_)> lck(mutex_);
118         if (keys.empty() && observer == nullptr) {
119             for (auto &it : callbacks_) {
120                 lastDelKeys.emplace_back(it.first);
121             }
122             callbacks_.clear();
123         }
124         for (auto &key : keys) {
125             auto it = callbacks_.find(key);
126             if (it == callbacks_.end()) {
127                 result.emplace_back(key, E_UNREGISTERED_EMPTY);
128                 continue;
129             }
130             if (observer == nullptr) {
131                 callbacks_.erase(key);
132                 lastDelKeys.emplace_back(key);
133                 continue;
134             }
135             if (!IsRegistered(*observer, it->second)) {
136                 result.emplace_back(key, E_UNREGISTERED_EMPTY);
137                 continue;
138             }
139             std::vector<ObserverNode> &callbacks = it->second;
140             auto callbackIt = callbacks.begin();
141             while (callbackIt != callbacks.end()) {
142                 if (!(*(callbackIt->observer_) == *observer)) {
143                     callbackIt++;
144                     continue;
145                 }
146                 callbackIt = callbacks.erase(callbackIt);
147             }
148             if (!it->second.empty()) {
149                 result.emplace_back(key, E_OK);
150                 continue;
151             }
152             callbacks_.erase(key);
153             lastDelKeys.emplace_back(key);
154         }
155         if (lastDelKeys.empty()) {
156             return result;
157         }
158     }
159     processOnLastDel(lastDelKeys, observer, result);
160     return result;
161 }
162 
163 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)164 std::vector<std::shared_ptr<Observer>> NapiCallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
165 {
166     std::lock_guard<decltype(mutex_)> lck(mutex_);
167     auto it = callbacks_.find(inputKey);
168     if (it == callbacks_.end()) {
169         return std::vector<std::shared_ptr<Observer>>();
170     }
171     std::vector<std::shared_ptr<Observer>> results;
172     for (const auto &value : it->second) {
173         if (value.enabled_ && value.observer_ != nullptr) {
174             results.emplace_back(value.observer_);
175         }
176     }
177     return results;
178 }
179 
180 template<class Key, class Observer>
GetEnabledSubscriberSize()181 int NapiCallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
182 {
183     int count = 0;
184     std::lock_guard<decltype(mutex_)> lck(mutex_);
185     for (auto &[key, value] : callbacks_) {
186         for (const auto &callback : value) {
187             if (callback.enabled_) {
188                 count++;
189             }
190         }
191     }
192     return count;
193 }
194 } // namespace OHOS::DataShare
195 #endif //LDBPROJ_NAPI_CALLBACKS_MANAGER_H
196