1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "notification_chain.h"
17
18 #include <algorithm>
19 #include <functional>
20
21 #include "db_errno.h"
22 #include "log_print.h"
23 #include "macro_utils.h"
24
25 namespace DistributedDB {
RegisterListener(EventType type,const Listener::OnEvent & onEvent,const Listener::OnFinalize & onFinalize,int & errCode)26 NotificationChain::Listener *NotificationChain::RegisterListener(
27 EventType type, const Listener::OnEvent &onEvent, const Listener::OnFinalize &onFinalize, int &errCode)
28 {
29 errCode = E_OK;
30 if (!onEvent) {
31 LOGE("[NotificationChain] Register listener failed, 'onEvent()' is null!");
32 errCode = -E_INVALID_ARGS;
33 return nullptr;
34 }
35
36 NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
37 if (listenerChain == nullptr) {
38 LOGE("[NotificationChain] Register listener failed, no event type %u found!", type);
39 errCode = -E_NOT_REGISTER;
40 return nullptr;
41 }
42
43 NotificationChain::Listener *listener = new (std::nothrow)
44 NotificationChain::Listener(onEvent, onFinalize);
45 if (listener == nullptr) {
46 listenerChain->DecObjRef(listenerChain);
47 listenerChain = nullptr;
48 errCode = -E_OUT_OF_MEMORY;
49 return nullptr;
50 }
51
52 errCode = listenerChain->RegisterListener(listener);
53 if (errCode != E_OK) {
54 LOGE("[NotificationChain] Register listener failed, event type %u has been unregistered!", type);
55 listener->DecObjRef(listener);
56 listener = nullptr;
57 listenerChain->DecObjRef(listenerChain);
58 listenerChain = nullptr;
59 return nullptr;
60 }
61
62 listenerChain->DecObjRef(listenerChain);
63 listenerChain = nullptr;
64 return listener;
65 }
66
RegisterEventType(EventType type)67 int NotificationChain::RegisterEventType(EventType type)
68 {
69 AutoLock lockGuard(this);
70 if (IsKilled()) {
71 LOGI("Register event failed, the notification chain has been killed!");
72 return -E_STALE;
73 }
74
75 ListenerChain *listenerChain = FindListenerChain(type);
76 if (listenerChain != nullptr) {
77 LOGE("[NotificationChain] Register event failed, event type %u has been registered!", type);
78 return -E_ALREADY_REGISTER;
79 }
80
81 listenerChain = new (std::nothrow) ListenerChain();
82 if (listenerChain == nullptr) {
83 LOGE("[NotificationChain] Register event failed, OOM!");
84 return -E_OUT_OF_MEMORY;
85 }
86
87 listenerChain->OnKill([listenerChain] {
88 listenerChain->ClearListeners();
89 });
90 eventChains_.insert(std::pair<EventType, ListenerChain *>(type, listenerChain));
91 IncObjRef(this);
92 return E_OK;
93 }
94
UnRegisterEventType(EventType type)95 int NotificationChain::UnRegisterEventType(EventType type)
96 {
97 NotificationChain::ListenerChain *listenerChain = nullptr;
98 {
99 AutoLock lockGuard(this);
100 listenerChain = FindListenerChain(type);
101 if (listenerChain == nullptr) {
102 LOGE("[NotificationChain] UnRegister event failed, event %u is not registered!", type);
103 return -E_NOT_FOUND;
104 }
105 eventChains_.erase(type);
106 }
107
108 listenerChain->KillAndDecObjRef(listenerChain);
109 listenerChain = nullptr;
110 DecObjRef(this);
111 return E_OK;
112 }
113
NotifyEvent(EventType type,void * arg)114 void NotificationChain::NotifyEvent(EventType type, void *arg)
115 {
116 NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
117 if (listenerChain == nullptr) {
118 return;
119 }
120 listenerChain->NotifyListeners(arg);
121 listenerChain->DecObjRef(listenerChain);
122 listenerChain = nullptr;
123 }
124
EmptyListener(EventType type) const125 bool NotificationChain::EmptyListener(EventType type) const
126 {
127 NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
128 if (listenerChain == nullptr) {
129 return true;
130 }
131 bool empty = listenerChain->Empty();
132 RefObject::DecObjRef(listenerChain);
133 return empty;
134 }
135
ListenerChain()136 NotificationChain::ListenerChain::ListenerChain() {}
137
~ListenerChain()138 NotificationChain::ListenerChain::~ListenerChain() {}
139
FindAndGetListenerChainLocked(EventType type) const140 NotificationChain::ListenerChain *NotificationChain::FindAndGetListenerChainLocked(EventType type) const
141 {
142 AutoLock lockGuard(this);
143 ListenerChain *listenerChain = FindListenerChain(type);
144 if (listenerChain == nullptr) {
145 return nullptr;
146 }
147 listenerChain->IncObjRef(listenerChain);
148 return listenerChain;
149 }
150
FindListenerChain(EventType type) const151 NotificationChain::ListenerChain *NotificationChain::FindListenerChain(EventType type) const
152 {
153 auto iter = eventChains_.find(type);
154 if (iter != eventChains_.end()) {
155 return iter->second;
156 }
157 return nullptr;
158 }
159
RegisterListener(Listener * listener)160 int NotificationChain::ListenerChain::RegisterListener(Listener *listener)
161 {
162 AutoLock lockGuard(this);
163 if (IsKilled()) {
164 return -E_STALE;
165 }
166 if (listenerSet_.find(listener) != listenerSet_.end()) {
167 return -E_ALREADY_REGISTER;
168 }
169 listenerSet_.insert(listener);
170 listener->SetOwner(this);
171 return E_OK;
172 }
173
UnRegisterListener(Listener * listener,bool wait)174 int NotificationChain::ListenerChain::UnRegisterListener(Listener *listener, bool wait)
175 {
176 if (listener == nullptr) {
177 return -E_INVALID_ARGS;
178 }
179
180 {
181 AutoLock lockGuard(this);
182 auto result = listenerSet_.find(listener);
183 if (result != listenerSet_.end()) {
184 if (wait) {
185 listener->OnKill([listener]() {
186 listener->KillWait();
187 });
188 }
189 listenerSet_.erase(result);
190 }
191 }
192
193 listener->KillAndDecObjRef(listener);
194 listener = nullptr;
195 return E_OK;
196 }
197
BackupListenerSet(std::set<Listener * > & backupSet) const198 void NotificationChain::ListenerChain::BackupListenerSet(std::set<Listener *> &backupSet) const
199 {
200 for (auto listener : listenerSet_) {
201 RefObject::IncObjRef(listener);
202 backupSet.insert(listener);
203 }
204 }
205
NotifyListeners(void * arg)206 void NotificationChain::ListenerChain::NotifyListeners(void *arg)
207 {
208 std::set<Listener *> tmpSet;
209 {
210 AutoLock lockGuard(this);
211 if (IsKilled()) {
212 return;
213 }
214 BackupListenerSet(tmpSet);
215 }
216
217 for (auto listener : tmpSet) {
218 if (listener != nullptr) {
219 listener->NotifyListener(arg);
220 listener->DecObjRef(listener);
221 listener = nullptr;
222 }
223 }
224 }
225
ClearListeners()226 void NotificationChain::ListenerChain::ClearListeners()
227 {
228 std::set<Listener *> tmpSet;
229 BackupListenerSet(tmpSet);
230 listenerSet_.clear();
231 // Enter this function with lock held(OnKill() is invoked with object lock held), so drop it.
232 UnlockObj();
233
234 for (auto listener : tmpSet) {
235 // Drop the ref 1 which increased in 'BackupListenerSet()',
236 // the original 1 will be dropped when user call listener->Drop();
237 listener->KillAndDecObjRef(listener);
238 listener = nullptr;
239 }
240
241 // Lock it again before leaving.
242 LockObj();
243 }
244
Empty() const245 bool NotificationChain::ListenerChain::Empty() const
246 {
247 AutoLock lockGuard(this);
248 return listenerSet_.empty();
249 }
250
NotifyListener(void * arg)251 void NotificationChain::Listener::NotifyListener(void *arg)
252 {
253 if (onEvent_ && !IsKilled()) {
254 if (EnterEventAction()) {
255 onEvent_(arg);
256 LeaveEventAction();
257 }
258 }
259 }
260
Finalize() const261 void NotificationChain::Listener::Finalize() const
262 {
263 if (onFinalize_) {
264 onFinalize_();
265 }
266 }
267
EnterEventAction()268 bool NotificationChain::Listener::EnterEventAction()
269 {
270 AutoLock lockGuard(this);
271 if (IsKilled()) {
272 return false;
273 }
274 // We never call onEvent() of the same listener in parallel with 2 or more threads.
275 eventRunningThread_ = std::this_thread::get_id();
276 return true;
277 }
278
LeaveEventAction()279 void NotificationChain::Listener::LeaveEventAction()
280 {
281 AutoLock lockGuard(this);
282 eventRunningThread_ = std::thread::id();
283 safeKill_.notify_one();
284 }
285
KillWait()286 void NotificationChain::Listener::KillWait()
287 {
288 // We entered with object lock held.
289 if ((eventRunningThread_ == std::thread::id()) ||
290 (eventRunningThread_ == std::this_thread::get_id())) {
291 return;
292 }
293
294 LOGW("[NotificationChain] Try to kill an active event listener, now wait.");
295 bool noDeadLock = WaitLockedUntil(safeKill_, [this]() {
296 if (eventRunningThread_ == std::thread::id()) {
297 return true;
298 }
299 return false;
300 }, KILL_WAIT_SECONDS);
301 if (!noDeadLock) {
302 LOGE("[NotificationChain] Dead lock maybe happen, we stop waiting the listener.");
303 } else {
304 LOGW("[NotificationChain] Wait the active event listener ok.");
305 }
306 }
307
SetOwner(ListenerChain * listenerChain)308 void NotificationChain::Listener::SetOwner(ListenerChain *listenerChain)
309 {
310 if (listenerChain_ != nullptr) {
311 listenerChain_->DecObjRef(listenerChain_);
312 }
313 listenerChain_ = listenerChain;
314 if (listenerChain_ != nullptr) {
315 listenerChain_->IncObjRef(listenerChain_);
316 }
317 }
318
Drop(bool wait)319 int NotificationChain::Listener::Drop(bool wait)
320 {
321 if (listenerChain_ == nullptr) {
322 LOGE("[NotificationChain] Drop listener failed, lost the chain!");
323 return -E_INTERNAL_ERROR;
324 }
325 return listenerChain_->UnRegisterListener(this, wait);
326 }
327
Listener(const OnEvent & onEvent,const OnFinalize & onFinalize)328 NotificationChain::Listener::Listener(const OnEvent &onEvent, const OnFinalize &onFinalize)
329 : onEvent_(onEvent),
330 onFinalize_(onFinalize),
331 listenerChain_(nullptr)
332 {
333 OnLastRef([this]() {
334 this->Finalize();
335 });
336 }
337
~Listener()338 NotificationChain::Listener::~Listener()
339 {
340 SetOwner(nullptr);
341 }
342
~NotificationChain()343 NotificationChain::~NotificationChain()
344 {
345 for (auto &iter : eventChains_) {
346 iter.second->KillAndDecObjRef(iter.second);
347 iter.second = nullptr;
348 }
349 eventChains_.clear();
350 }
351
352 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain)
353 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain::Listener)
354 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain::ListenerChain)
355 } // namespace DistributedDB