1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "notification_chain.h"
17 
18 #include <algorithm>
19 #include <functional>
20 
21 #include "db_errno.h"
22 #include "log_print.h"
23 #include "macro_utils.h"
24 
25 namespace DistributedDB {
RegisterListener(EventType type,const Listener::OnEvent & onEvent,const Listener::OnFinalize & onFinalize,int & errCode)26 NotificationChain::Listener *NotificationChain::RegisterListener(
27     EventType type, const Listener::OnEvent &onEvent, const Listener::OnFinalize &onFinalize, int &errCode)
28 {
29     errCode = E_OK;
30     if (!onEvent) {
31         LOGE("[NotificationChain] Register listener failed, 'onEvent()' is null!");
32         errCode = -E_INVALID_ARGS;
33         return nullptr;
34     }
35 
36     NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
37     if (listenerChain == nullptr) {
38         LOGE("[NotificationChain] Register listener failed, no event type %u found!", type);
39         errCode = -E_NOT_REGISTER;
40         return nullptr;
41     }
42 
43     NotificationChain::Listener *listener = new (std::nothrow)
44         NotificationChain::Listener(onEvent, onFinalize);
45     if (listener == nullptr) {
46         listenerChain->DecObjRef(listenerChain);
47         listenerChain = nullptr;
48         errCode = -E_OUT_OF_MEMORY;
49         return nullptr;
50     }
51 
52     errCode = listenerChain->RegisterListener(listener);
53     if (errCode != E_OK) {
54         LOGE("[NotificationChain] Register listener failed, event type %u has been unregistered!", type);
55         listener->DecObjRef(listener);
56         listener = nullptr;
57         listenerChain->DecObjRef(listenerChain);
58         listenerChain = nullptr;
59         return nullptr;
60     }
61 
62     listenerChain->DecObjRef(listenerChain);
63     listenerChain = nullptr;
64     return listener;
65 }
66 
RegisterEventType(EventType type)67 int NotificationChain::RegisterEventType(EventType type)
68 {
69     AutoLock lockGuard(this);
70     if (IsKilled()) {
71         LOGI("Register event failed, the notification chain has been killed!");
72         return -E_STALE;
73     }
74 
75     ListenerChain *listenerChain = FindListenerChain(type);
76     if (listenerChain != nullptr) {
77         LOGE("[NotificationChain] Register event failed, event type %u has been registered!", type);
78         return -E_ALREADY_REGISTER;
79     }
80 
81     listenerChain = new (std::nothrow) ListenerChain();
82     if (listenerChain == nullptr) {
83         LOGE("[NotificationChain] Register event failed, OOM!");
84         return -E_OUT_OF_MEMORY;
85     }
86 
87     listenerChain->OnKill([listenerChain] {
88         listenerChain->ClearListeners();
89     });
90     eventChains_.insert(std::pair<EventType, ListenerChain *>(type, listenerChain));
91     IncObjRef(this);
92     return E_OK;
93 }
94 
UnRegisterEventType(EventType type)95 int NotificationChain::UnRegisterEventType(EventType type)
96 {
97     NotificationChain::ListenerChain *listenerChain = nullptr;
98     {
99         AutoLock lockGuard(this);
100         listenerChain = FindListenerChain(type);
101         if (listenerChain == nullptr) {
102             LOGE("[NotificationChain] UnRegister event failed, event %u is not registered!", type);
103             return -E_NOT_FOUND;
104         }
105         eventChains_.erase(type);
106     }
107 
108     listenerChain->KillAndDecObjRef(listenerChain);
109     listenerChain = nullptr;
110     DecObjRef(this);
111     return E_OK;
112 }
113 
NotifyEvent(EventType type,void * arg)114 void NotificationChain::NotifyEvent(EventType type, void *arg)
115 {
116     NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
117     if (listenerChain == nullptr) {
118         return;
119     }
120     listenerChain->NotifyListeners(arg);
121     listenerChain->DecObjRef(listenerChain);
122     listenerChain = nullptr;
123 }
124 
EmptyListener(EventType type) const125 bool NotificationChain::EmptyListener(EventType type) const
126 {
127     NotificationChain::ListenerChain *listenerChain = FindAndGetListenerChainLocked(type);
128     if (listenerChain == nullptr) {
129         return true;
130     }
131     bool empty = listenerChain->Empty();
132     RefObject::DecObjRef(listenerChain);
133     return empty;
134 }
135 
ListenerChain()136 NotificationChain::ListenerChain::ListenerChain() {}
137 
~ListenerChain()138 NotificationChain::ListenerChain::~ListenerChain() {}
139 
FindAndGetListenerChainLocked(EventType type) const140 NotificationChain::ListenerChain *NotificationChain::FindAndGetListenerChainLocked(EventType type) const
141 {
142     AutoLock lockGuard(this);
143     ListenerChain *listenerChain = FindListenerChain(type);
144     if (listenerChain == nullptr) {
145         return nullptr;
146     }
147     listenerChain->IncObjRef(listenerChain);
148     return listenerChain;
149 }
150 
FindListenerChain(EventType type) const151 NotificationChain::ListenerChain *NotificationChain::FindListenerChain(EventType type) const
152 {
153     auto iter = eventChains_.find(type);
154     if (iter != eventChains_.end()) {
155         return iter->second;
156     }
157     return nullptr;
158 }
159 
RegisterListener(Listener * listener)160 int NotificationChain::ListenerChain::RegisterListener(Listener *listener)
161 {
162     AutoLock lockGuard(this);
163     if (IsKilled()) {
164         return -E_STALE;
165     }
166     if (listenerSet_.find(listener) != listenerSet_.end()) {
167         return -E_ALREADY_REGISTER;
168     }
169     listenerSet_.insert(listener);
170     listener->SetOwner(this);
171     return E_OK;
172 }
173 
UnRegisterListener(Listener * listener,bool wait)174 int NotificationChain::ListenerChain::UnRegisterListener(Listener *listener, bool wait)
175 {
176     if (listener == nullptr) {
177         return -E_INVALID_ARGS;
178     }
179 
180     {
181         AutoLock lockGuard(this);
182         auto result = listenerSet_.find(listener);
183         if (result != listenerSet_.end()) {
184             if (wait) {
185                 listener->OnKill([listener]() {
186                     listener->KillWait();
187                 });
188             }
189             listenerSet_.erase(result);
190         }
191     }
192 
193     listener->KillAndDecObjRef(listener);
194     listener = nullptr;
195     return E_OK;
196 }
197 
BackupListenerSet(std::set<Listener * > & backupSet) const198 void NotificationChain::ListenerChain::BackupListenerSet(std::set<Listener *> &backupSet) const
199 {
200     for (auto listener : listenerSet_) {
201         RefObject::IncObjRef(listener);
202         backupSet.insert(listener);
203     }
204 }
205 
NotifyListeners(void * arg)206 void NotificationChain::ListenerChain::NotifyListeners(void *arg)
207 {
208     std::set<Listener *> tmpSet;
209     {
210         AutoLock lockGuard(this);
211         if (IsKilled()) {
212             return;
213         }
214         BackupListenerSet(tmpSet);
215     }
216 
217     for (auto listener : tmpSet) {
218         if (listener != nullptr) {
219             listener->NotifyListener(arg);
220             listener->DecObjRef(listener);
221             listener = nullptr;
222         }
223     }
224 }
225 
ClearListeners()226 void NotificationChain::ListenerChain::ClearListeners()
227 {
228     std::set<Listener *> tmpSet;
229     BackupListenerSet(tmpSet);
230     listenerSet_.clear();
231     // Enter this function with lock held(OnKill() is invoked with object lock held), so drop it.
232     UnlockObj();
233 
234     for (auto listener : tmpSet) {
235         // Drop the ref 1 which increased in 'BackupListenerSet()',
236         // the original 1 will be dropped when user call listener->Drop();
237         listener->KillAndDecObjRef(listener);
238         listener = nullptr;
239     }
240 
241     // Lock it again before leaving.
242     LockObj();
243 }
244 
Empty() const245 bool NotificationChain::ListenerChain::Empty() const
246 {
247     AutoLock lockGuard(this);
248     return listenerSet_.empty();
249 }
250 
NotifyListener(void * arg)251 void NotificationChain::Listener::NotifyListener(void *arg)
252 {
253     if (onEvent_ && !IsKilled()) {
254         if (EnterEventAction()) {
255             onEvent_(arg);
256             LeaveEventAction();
257         }
258     }
259 }
260 
Finalize() const261 void NotificationChain::Listener::Finalize() const
262 {
263     if (onFinalize_) {
264         onFinalize_();
265     }
266 }
267 
EnterEventAction()268 bool NotificationChain::Listener::EnterEventAction()
269 {
270     AutoLock lockGuard(this);
271     if (IsKilled()) {
272         return false;
273     }
274     // We never call onEvent() of the same listener in parallel with 2 or more threads.
275     eventRunningThread_ = std::this_thread::get_id();
276     return true;
277 }
278 
LeaveEventAction()279 void NotificationChain::Listener::LeaveEventAction()
280 {
281     AutoLock lockGuard(this);
282     eventRunningThread_ = std::thread::id();
283     safeKill_.notify_one();
284 }
285 
KillWait()286 void NotificationChain::Listener::KillWait()
287 {
288     // We entered with object lock held.
289     if ((eventRunningThread_ == std::thread::id()) ||
290         (eventRunningThread_ == std::this_thread::get_id())) {
291         return;
292     }
293 
294     LOGW("[NotificationChain] Try to kill an active event listener, now wait.");
295     bool noDeadLock = WaitLockedUntil(safeKill_, [this]() {
296             if (eventRunningThread_ == std::thread::id()) {
297                 return true;
298             }
299             return false;
300         }, KILL_WAIT_SECONDS);
301     if (!noDeadLock) {
302         LOGE("[NotificationChain] Dead lock maybe happen, we stop waiting the listener.");
303     } else {
304         LOGW("[NotificationChain] Wait the active event listener ok.");
305     }
306 }
307 
SetOwner(ListenerChain * listenerChain)308 void NotificationChain::Listener::SetOwner(ListenerChain *listenerChain)
309 {
310     if (listenerChain_ != nullptr) {
311         listenerChain_->DecObjRef(listenerChain_);
312     }
313     listenerChain_ = listenerChain;
314     if (listenerChain_ != nullptr) {
315         listenerChain_->IncObjRef(listenerChain_);
316     }
317 }
318 
Drop(bool wait)319 int NotificationChain::Listener::Drop(bool wait)
320 {
321     if (listenerChain_ == nullptr) {
322         LOGE("[NotificationChain] Drop listener failed, lost the chain!");
323         return -E_INTERNAL_ERROR;
324     }
325     return listenerChain_->UnRegisterListener(this, wait);
326 }
327 
Listener(const OnEvent & onEvent,const OnFinalize & onFinalize)328 NotificationChain::Listener::Listener(const OnEvent &onEvent, const OnFinalize &onFinalize)
329     : onEvent_(onEvent),
330       onFinalize_(onFinalize),
331       listenerChain_(nullptr)
332 {
333     OnLastRef([this]() {
334         this->Finalize();
335     });
336 }
337 
~Listener()338 NotificationChain::Listener::~Listener()
339 {
340     SetOwner(nullptr);
341 }
342 
~NotificationChain()343 NotificationChain::~NotificationChain()
344 {
345     for (auto &iter : eventChains_) {
346         iter.second->KillAndDecObjRef(iter.second);
347         iter.second = nullptr;
348     }
349     eventChains_.clear();
350 }
351 
352 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain)
353 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain::Listener)
354 DEFINE_OBJECT_TAG_FACILITIES(NotificationChain::ListenerChain)
355 } // namespace DistributedDB