1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <algorithm>
17 #include <map>
18 #if !defined(_WIN32) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 #endif
22 
23 #include "event_manager.h"
24 
25 #include "napi_utils.h"
26 #include "netstack_log.h"
27 
28 namespace OHOS::NetStack {
29 static constexpr const int CALLBACK_PARAM_NUM = 1;
30 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
31 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
32 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
33 
EventManager()34 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false) {}
35 
~EventManager()36 EventManager::~EventManager()
37 {
38     NETSTACK_LOGD("EventManager is destructed by the destructor");
39 }
40 
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)41 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
42                                bool asyncCallback)
43 {
44     std::lock_guard lock(mutexForListenersAndEmitByUv_);
45     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
46                              [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
47     if (it != listeners_.end()) {
48         listeners_.erase(it, listeners_.end());
49     }
50 
51 #if !defined(_WIN32) && !defined(__APPLE__)
52     listeners_.emplace_back(syscall(SYS_gettid), env, type, callback, once, asyncCallback);
53 #else
54     listeners_.emplace_back(0, env, type, callback, once, asyncCallback);
55 #endif
56 }
57 
DeleteListener(const std::string & type,napi_value callback)58 void EventManager::DeleteListener(const std::string &type, napi_value callback)
59 {
60     std::lock_guard lock(mutexForListenersAndEmitByUv_);
61     auto it =
62         std::remove_if(listeners_.begin(), listeners_.end(), [type, callback](const EventListener &listener) -> bool {
63             return listener.Match(type, callback);
64         });
65     listeners_.erase(it, listeners_.end());
66 }
67 
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)68 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
69 {
70     std::lock_guard lock(mutexForEmitAndEmitByUv_);
71     auto listeners = listeners_;
72     std::for_each(listeners.begin(), listeners.end(), [type, argv](const EventListener &listener) {
73         if (listener.IsAsyncCallback()) {
74             /* AsyncCallback(BusinessError error, T data) */
75             napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
76             listener.Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
77         } else {
78             /* Callback(T data) */
79             napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
80             listener.Emit(type, CALLBACK_PARAM_NUM, arg);
81         }
82     });
83 
84     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
85                              [type](const EventListener &listener) -> bool { return listener.MatchOnce(type); });
86     listeners_.erase(it, listeners_.end());
87 }
88 
SetData(void * data)89 void EventManager::SetData(void *data)
90 {
91     std::lock_guard<std::mutex> lock(dataMutex_);
92     data_ = data;
93 }
94 
GetData()95 void *EventManager::GetData()
96 {
97     std::lock_guard<std::mutex> lock(dataMutex_);
98     return data_;
99 }
100 
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))101 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
102 {
103     std::lock_guard lock1(mutexForEmitAndEmitByUv_);
104     std::lock_guard lock2(mutexForListenersAndEmitByUv_);
105     bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
106         return listener.MatchType(ON_HEADER_RECEIVE);
107     }) != listeners_.end();
108 
109     bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
110         return listener.MatchType(ON_HEADERS_RECEIVE);
111     }) != listeners_.end();
112     if (!foundHeader && !foundHeaders) {
113         if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
114             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
115             delete tempMap;
116         }
117     } else if (foundHeader && !foundHeaders) {
118         if (type == ON_HEADERS_RECEIVE) {
119             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
120             delete tempMap;
121         }
122     } else if (!foundHeader) {
123         if (type == ON_HEADER_RECEIVE) {
124             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
125             delete tempMap;
126         }
127     }
128 
129     std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
130         if (listener.MatchType(type)) {
131             auto workWrapper = new UvWorkWrapperShared(data, listener.GetEnv(), type, shared_from_this());
132             listener.EmitByUv(type, workWrapper, Handler);
133         }
134     });
135 }
136 
SetQueueData(void * data)137 void EventManager::SetQueueData(void *data)
138 {
139     std::lock_guard<std::mutex> lock(dataQueueMutex_);
140     dataQueue_.push(data);
141 }
142 
GetQueueData()143 void *EventManager::GetQueueData()
144 {
145     std::lock_guard<std::mutex> lock(dataQueueMutex_);
146     if (!dataQueue_.empty()) {
147         auto data = dataQueue_.front();
148         dataQueue_.pop();
149         return data;
150     }
151     NETSTACK_LOGE("eventManager data queue is empty");
152     return nullptr;
153 }
154 
EmitByUvWithoutCheck(const std::string & type,void * data,void (Handler)(uv_work_t *,int status))155 void EventManager::EmitByUvWithoutCheck(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
156 {
157     std::lock_guard lock1(mutexForEmitAndEmitByUv_);
158     std::lock_guard lock2(mutexForListenersAndEmitByUv_);
159     bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
160                            return listener.MatchType(ON_HEADER_RECEIVE);
161                        }) != listeners_.end();
162     bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
163                             return listener.MatchType(ON_HEADERS_RECEIVE);
164                         }) != listeners_.end();
165     if (!foundHeader && !foundHeaders) {
166         if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
167             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
168             delete tempMap;
169         }
170     } else if (foundHeader && !foundHeaders) {
171         if (type == ON_HEADERS_RECEIVE) {
172             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
173             delete tempMap;
174         }
175     } else if (!foundHeader) {
176         if (type == ON_HEADER_RECEIVE) {
177             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
178             delete tempMap;
179         }
180     }
181 
182     std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
183         if (listener.MatchType(type)) {
184             auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
185             listener.EmitByUv(type, workWrapper, Handler);
186         }
187     });
188 }
189 
EmitByUv(const std::string & type,void * data,void (Handler)(uv_work_t *,int status))190 void EventManager::EmitByUv(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
191 {
192     std::lock_guard lock1(mutexForEmitAndEmitByUv_);
193     std::lock_guard lock2(mutexForListenersAndEmitByUv_);
194     if (!EventManager::IsManagerValid(this)) {
195         return;
196     }
197     bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
198                            return listener.MatchType(ON_HEADER_RECEIVE);
199                        }) != listeners_.end();
200     bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
201                             return listener.MatchType(ON_HEADERS_RECEIVE);
202                         }) != listeners_.end();
203     if (!foundHeader && !foundHeaders) {
204         if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
205             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
206             delete tempMap;
207         }
208     } else if (foundHeader && !foundHeaders) {
209         if (type == ON_HEADERS_RECEIVE) {
210             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
211             delete tempMap;
212         }
213     } else if (!foundHeader) {
214         if (type == ON_HEADER_RECEIVE) {
215             auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
216             delete tempMap;
217         }
218     }
219 
220     std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
221         if (listener.MatchType(type)) {
222             auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
223             listener.EmitByUv(type, workWrapper, Handler);
224         }
225     });
226 }
227 
HasEventListener(const std::string & type)228 bool EventManager::HasEventListener(const std::string &type)
229 {
230     std::lock_guard lock(mutexForListenersAndEmitByUv_);
231     return std::any_of(listeners_.begin(), listeners_.end(),
232                        [&type](const EventListener &listener) -> bool { return listener.MatchType(type); });
233 }
234 
DeleteListener(const std::string & type)235 void EventManager::DeleteListener(const std::string &type)
236 {
237     std::lock_guard lock(mutexForListenersAndEmitByUv_);
238     auto it = std::remove_if(listeners_.begin(), listeners_.end(),
239                              [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
240     listeners_.erase(it, listeners_.end());
241 }
242 
243 std::unordered_set<EventManager *> EventManager::validManager_;
244 std::mutex EventManager::mutexForManager_;
245 
SetInvalid(EventManager * manager)246 void EventManager::SetInvalid(EventManager *manager)
247 {
248     std::lock_guard lock(mutexForManager_);
249     auto pos = validManager_.find(manager);
250     if (pos == validManager_.end()) {
251         NETSTACK_LOGE("The manager is not in the unordered_set");
252         return;
253     }
254     validManager_.erase(pos);
255     delete manager;
256     manager = nullptr;
257 }
258 
IsManagerValid(EventManager * manager)259 bool EventManager::IsManagerValid(EventManager *manager)
260 {
261     std::lock_guard lock(mutexForManager_);
262     return validManager_.find(manager) != validManager_.end();
263 }
264 
SetValid(EventManager * manager)265 void EventManager::SetValid(EventManager *manager)
266 {
267     std::lock_guard lock(mutexForManager_);
268     validManager_.emplace(manager);
269 }
270 
CreateEventReference(napi_env env,napi_value value)271 void EventManager::CreateEventReference(napi_env env, napi_value value)
272 {
273     if (env != nullptr && value != nullptr) {
274         eventRef_ = NapiUtils::CreateReference(env, value);
275     }
276 }
277 
DeleteEventReference(napi_env env)278 void EventManager::DeleteEventReference(napi_env env)
279 {
280     if (env != nullptr && eventRef_ != nullptr) {
281         NapiUtils::DeleteReference(env, eventRef_);
282     }
283     eventRef_ = nullptr;
284 }
285 
SetEventDestroy(bool flag)286 void EventManager::SetEventDestroy(bool flag)
287 {
288     isDestroy_.store(flag);
289 }
290 
IsEventDestroy()291 bool EventManager::IsEventDestroy()
292 {
293     return isDestroy_.load();
294 }
295 
GetWebSocketTextData()296 const std::string &EventManager::GetWebSocketTextData()
297 {
298     return webSocketTextData_;
299 }
300 
AppendWebSocketTextData(void * data,size_t length)301 void EventManager::AppendWebSocketTextData(void *data, size_t length)
302 {
303     webSocketTextData_.append(reinterpret_cast<char *>(data), length);
304 }
305 
GetWebSocketBinaryData()306 const std::string &EventManager::GetWebSocketBinaryData()
307 {
308     return webSocketBinaryData_;
309 }
310 
AppendWebSocketBinaryData(void * data,size_t length)311 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
312 {
313     webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
314 }
315 
ClearWebSocketTextData()316 void EventManager::ClearWebSocketTextData()
317 {
318     webSocketTextData_.clear();
319 }
320 
ClearWebSocketBinaryData()321 void EventManager::ClearWebSocketBinaryData()
322 {
323     webSocketBinaryData_.clear();
324 }
325 
NotifyRcvThdExit()326 void EventManager::NotifyRcvThdExit()
327 {
328     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
329     sockRcvExit_ = true;
330     sockRcvThdCon_.notify_one();
331 }
332 
WaitForRcvThdExit()333 void EventManager::WaitForRcvThdExit()
334 {
335     std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
336     sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
337 }
338 
SetReuseAddr(bool reuse)339 void EventManager::SetReuseAddr(bool reuse)
340 {
341     isReuseAddr_.store(reuse);
342 }
343 
GetReuseAddr()344 bool EventManager::GetReuseAddr()
345 {
346     return isReuseAddr_.load();
347 }
348 
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)349 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
350 {
351     std::lock_guard<std::mutex> lock(dataMutex_);
352     webSocketUserData_ = userData;
353 }
354 
GetWebSocketUserData()355 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
356 {
357     std::lock_guard<std::mutex> lock(dataMutex_);
358     return webSocketUserData_;
359 }
360 
UvWorkWrapper(void * theData,napi_env theEnv,std::string eventType,EventManager * eventManager)361 UvWorkWrapper::UvWorkWrapper(void *theData, napi_env theEnv, std::string eventType, EventManager *eventManager)
362     : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
363 {
364 }
365 
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)366 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
367                                          const std::shared_ptr<EventManager> &eventManager)
368     : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
369 {
370 }
371 } // namespace OHOS::NetStack
372