1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef LDBPROJ_NAPI_CALLBACKS_MANAGER_H
17 #define LDBPROJ_NAPI_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class NapiCallbacksManager {
28 public:
29 std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
30 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
31 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
32 std::vector<OperationResult> &)>);
33
34 std::vector<OperationResult> DelObservers(const std::vector<Key> &keys,
35 const std::shared_ptr<Observer> observer = nullptr,
36 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
37 std::vector<OperationResult> &)> processOnLastDel = NapiCallbacksManager::DefaultProcess);
38
39 std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
40
41 int GetEnabledSubscriberSize();
42
43 private:
DefaultProcess(const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)44 static void DefaultProcess(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
45 std::vector<OperationResult> &){};
46 struct ObserverNode {
47 std::shared_ptr<Observer> observer_;
48 bool enabled_;
ObserverNodeObserverNode49 ObserverNode(const std::shared_ptr<Observer> &observer) : observer_(observer)
50 {
51 enabled_ = true;
52 };
ObserverNodeObserverNode53 ObserverNode(const std::shared_ptr<Observer> &observer, bool enabled)
54 : observer_(observer), enabled_(enabled){};
55 };
56 bool IsRegistered(const Observer &, const std::vector<ObserverNode> &);
57 std::recursive_mutex mutex_{};
58 std::map<Key, std::vector<ObserverNode>> callbacks_;
59 };
60
61 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)62 std::vector<OperationResult> NapiCallbacksManager<Key, Observer>::AddObservers(
63 const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
64 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
65 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
66 std::vector<OperationResult> &)> processOnFirstAdd)
67 {
68 std::vector<OperationResult> result;
69 std::vector<Key> firstRegisterKey;
70 std::vector<Key> localRegisterKey;
71 {
72 std::lock_guard<decltype(mutex_)> lck(mutex_);
73 for (auto &key : keys) {
74 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
75 if (enabledObservers.empty()) {
76 callbacks_[key].emplace_back(ObserverNode(observer));
77 firstRegisterKey.emplace_back(key);
78 continue;
79 }
80 if (IsRegistered(*observer, callbacks_[key])) {
81 result.emplace_back(static_cast<std::string>(key), E_REGISTERED_REPEATED);
82 continue;
83 }
84 localRegisterKey.emplace_back(key);
85 callbacks_[key].emplace_back(observer);
86 result.emplace_back(key, E_OK);
87 }
88 }
89 if (!localRegisterKey.empty()) {
90 processOnLocalAdd(localRegisterKey, observer);
91 }
92 processOnFirstAdd(firstRegisterKey, observer, result);
93 return result;
94 }
95
96 template<class Key, class Observer>
IsRegistered(const Observer & observer,const std::vector<ObserverNode> & observers)97 bool NapiCallbacksManager<Key, Observer>::IsRegistered(const Observer &observer,
98 const std::vector<ObserverNode> &observers)
99 {
100 for (auto &item : observers) {
101 if (*(item.observer_) == observer) {
102 return true;
103 }
104 }
105 return false;
106 }
107
108 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> &,std::vector<OperationResult> &)> processOnLastDel)109 std::vector<OperationResult> NapiCallbacksManager<Key, Observer>::DelObservers(
110 const std::vector<Key> &keys, const std::shared_ptr<Observer> observer,
111 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &, std::vector<OperationResult> &)>
112 processOnLastDel)
113 {
114 std::vector<OperationResult> result;
115 std::vector<Key> lastDelKeys;
116 {
117 std::lock_guard<decltype(mutex_)> lck(mutex_);
118 if (keys.empty() && observer == nullptr) {
119 for (auto &it : callbacks_) {
120 lastDelKeys.emplace_back(it.first);
121 }
122 callbacks_.clear();
123 }
124 for (auto &key : keys) {
125 auto it = callbacks_.find(key);
126 if (it == callbacks_.end()) {
127 result.emplace_back(key, E_UNREGISTERED_EMPTY);
128 continue;
129 }
130 if (observer == nullptr) {
131 callbacks_.erase(key);
132 lastDelKeys.emplace_back(key);
133 continue;
134 }
135 if (!IsRegistered(*observer, it->second)) {
136 result.emplace_back(key, E_UNREGISTERED_EMPTY);
137 continue;
138 }
139 std::vector<ObserverNode> &callbacks = it->second;
140 auto callbackIt = callbacks.begin();
141 while (callbackIt != callbacks.end()) {
142 if (!(*(callbackIt->observer_) == *observer)) {
143 callbackIt++;
144 continue;
145 }
146 callbackIt = callbacks.erase(callbackIt);
147 }
148 if (!it->second.empty()) {
149 result.emplace_back(key, E_OK);
150 continue;
151 }
152 callbacks_.erase(key);
153 lastDelKeys.emplace_back(key);
154 }
155 if (lastDelKeys.empty()) {
156 return result;
157 }
158 }
159 processOnLastDel(lastDelKeys, observer, result);
160 return result;
161 }
162
163 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)164 std::vector<std::shared_ptr<Observer>> NapiCallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
165 {
166 std::lock_guard<decltype(mutex_)> lck(mutex_);
167 auto it = callbacks_.find(inputKey);
168 if (it == callbacks_.end()) {
169 return std::vector<std::shared_ptr<Observer>>();
170 }
171 std::vector<std::shared_ptr<Observer>> results;
172 for (const auto &value : it->second) {
173 if (value.enabled_ && value.observer_ != nullptr) {
174 results.emplace_back(value.observer_);
175 }
176 }
177 return results;
178 }
179
180 template<class Key, class Observer>
GetEnabledSubscriberSize()181 int NapiCallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
182 {
183 int count = 0;
184 std::lock_guard<decltype(mutex_)> lck(mutex_);
185 for (auto &[key, value] : callbacks_) {
186 for (const auto &callback : value) {
187 if (callback.enabled_) {
188 count++;
189 }
190 }
191 }
192 return count;
193 }
194 } // namespace OHOS::DataShare
195 #endif //LDBPROJ_NAPI_CALLBACKS_MANAGER_H
196