1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include <map>
18 #if !defined(_WIN32) && !defined(__APPLE__)
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 #endif
22
23 #include "event_manager.h"
24
25 #include "napi_utils.h"
26 #include "netstack_log.h"
27
28 namespace OHOS::NetStack {
29 static constexpr const int CALLBACK_PARAM_NUM = 1;
30 static constexpr const int ASYNC_CALLBACK_PARAM_NUM = 2;
31 static constexpr const char *ON_HEADER_RECEIVE = "headerReceive";
32 static constexpr const char *ON_HEADERS_RECEIVE = "headersReceive";
33
EventManager()34 EventManager::EventManager() : data_(nullptr), eventRef_(nullptr), isDestroy_(false) {}
35
~EventManager()36 EventManager::~EventManager()
37 {
38 NETSTACK_LOGD("EventManager is destructed by the destructor");
39 }
40
AddListener(napi_env env,const std::string & type,napi_value callback,bool once,bool asyncCallback)41 void EventManager::AddListener(napi_env env, const std::string &type, napi_value callback, bool once,
42 bool asyncCallback)
43 {
44 std::lock_guard lock(mutexForListenersAndEmitByUv_);
45 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
46 [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
47 if (it != listeners_.end()) {
48 listeners_.erase(it, listeners_.end());
49 }
50
51 #if !defined(_WIN32) && !defined(__APPLE__)
52 listeners_.emplace_back(syscall(SYS_gettid), env, type, callback, once, asyncCallback);
53 #else
54 listeners_.emplace_back(0, env, type, callback, once, asyncCallback);
55 #endif
56 }
57
DeleteListener(const std::string & type,napi_value callback)58 void EventManager::DeleteListener(const std::string &type, napi_value callback)
59 {
60 std::lock_guard lock(mutexForListenersAndEmitByUv_);
61 auto it =
62 std::remove_if(listeners_.begin(), listeners_.end(), [type, callback](const EventListener &listener) -> bool {
63 return listener.Match(type, callback);
64 });
65 listeners_.erase(it, listeners_.end());
66 }
67
Emit(const std::string & type,const std::pair<napi_value,napi_value> & argv)68 void EventManager::Emit(const std::string &type, const std::pair<napi_value, napi_value> &argv)
69 {
70 std::lock_guard lock(mutexForEmitAndEmitByUv_);
71 auto listeners = listeners_;
72 std::for_each(listeners.begin(), listeners.end(), [type, argv](const EventListener &listener) {
73 if (listener.IsAsyncCallback()) {
74 /* AsyncCallback(BusinessError error, T data) */
75 napi_value arg[ASYNC_CALLBACK_PARAM_NUM] = {argv.first, argv.second};
76 listener.Emit(type, ASYNC_CALLBACK_PARAM_NUM, arg);
77 } else {
78 /* Callback(T data) */
79 napi_value arg[CALLBACK_PARAM_NUM] = {argv.second};
80 listener.Emit(type, CALLBACK_PARAM_NUM, arg);
81 }
82 });
83
84 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
85 [type](const EventListener &listener) -> bool { return listener.MatchOnce(type); });
86 listeners_.erase(it, listeners_.end());
87 }
88
SetData(void * data)89 void EventManager::SetData(void *data)
90 {
91 std::lock_guard<std::mutex> lock(dataMutex_);
92 data_ = data;
93 }
94
GetData()95 void *EventManager::GetData()
96 {
97 std::lock_guard<std::mutex> lock(dataMutex_);
98 return data_;
99 }
100
EmitByUvWithoutCheckShared(const std::string & type,void * data,void (* Handler)(uv_work_t *,int))101 void EventManager::EmitByUvWithoutCheckShared(const std::string &type, void *data, void (*Handler)(uv_work_t *, int))
102 {
103 std::lock_guard lock1(mutexForEmitAndEmitByUv_);
104 std::lock_guard lock2(mutexForListenersAndEmitByUv_);
105 bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
106 return listener.MatchType(ON_HEADER_RECEIVE);
107 }) != listeners_.end();
108
109 bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
110 return listener.MatchType(ON_HEADERS_RECEIVE);
111 }) != listeners_.end();
112 if (!foundHeader && !foundHeaders) {
113 if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
114 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
115 delete tempMap;
116 }
117 } else if (foundHeader && !foundHeaders) {
118 if (type == ON_HEADERS_RECEIVE) {
119 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
120 delete tempMap;
121 }
122 } else if (!foundHeader) {
123 if (type == ON_HEADER_RECEIVE) {
124 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
125 delete tempMap;
126 }
127 }
128
129 std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
130 if (listener.MatchType(type)) {
131 auto workWrapper = new UvWorkWrapperShared(data, listener.GetEnv(), type, shared_from_this());
132 listener.EmitByUv(type, workWrapper, Handler);
133 }
134 });
135 }
136
SetQueueData(void * data)137 void EventManager::SetQueueData(void *data)
138 {
139 std::lock_guard<std::mutex> lock(dataQueueMutex_);
140 dataQueue_.push(data);
141 }
142
GetQueueData()143 void *EventManager::GetQueueData()
144 {
145 std::lock_guard<std::mutex> lock(dataQueueMutex_);
146 if (!dataQueue_.empty()) {
147 auto data = dataQueue_.front();
148 dataQueue_.pop();
149 return data;
150 }
151 NETSTACK_LOGE("eventManager data queue is empty");
152 return nullptr;
153 }
154
EmitByUvWithoutCheck(const std::string & type,void * data,void (Handler)(uv_work_t *,int status))155 void EventManager::EmitByUvWithoutCheck(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
156 {
157 std::lock_guard lock1(mutexForEmitAndEmitByUv_);
158 std::lock_guard lock2(mutexForListenersAndEmitByUv_);
159 bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
160 return listener.MatchType(ON_HEADER_RECEIVE);
161 }) != listeners_.end();
162 bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
163 return listener.MatchType(ON_HEADERS_RECEIVE);
164 }) != listeners_.end();
165 if (!foundHeader && !foundHeaders) {
166 if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
167 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
168 delete tempMap;
169 }
170 } else if (foundHeader && !foundHeaders) {
171 if (type == ON_HEADERS_RECEIVE) {
172 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
173 delete tempMap;
174 }
175 } else if (!foundHeader) {
176 if (type == ON_HEADER_RECEIVE) {
177 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
178 delete tempMap;
179 }
180 }
181
182 std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
183 if (listener.MatchType(type)) {
184 auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
185 listener.EmitByUv(type, workWrapper, Handler);
186 }
187 });
188 }
189
EmitByUv(const std::string & type,void * data,void (Handler)(uv_work_t *,int status))190 void EventManager::EmitByUv(const std::string &type, void *data, void(Handler)(uv_work_t *, int status))
191 {
192 std::lock_guard lock1(mutexForEmitAndEmitByUv_);
193 std::lock_guard lock2(mutexForListenersAndEmitByUv_);
194 if (!EventManager::IsManagerValid(this)) {
195 return;
196 }
197 bool foundHeader = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
198 return listener.MatchType(ON_HEADER_RECEIVE);
199 }) != listeners_.end();
200 bool foundHeaders = std::find_if(listeners_.begin(), listeners_.end(), [](const EventListener &listener) {
201 return listener.MatchType(ON_HEADERS_RECEIVE);
202 }) != listeners_.end();
203 if (!foundHeader && !foundHeaders) {
204 if (type == ON_HEADER_RECEIVE || type == ON_HEADERS_RECEIVE) {
205 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
206 delete tempMap;
207 }
208 } else if (foundHeader && !foundHeaders) {
209 if (type == ON_HEADERS_RECEIVE) {
210 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
211 delete tempMap;
212 }
213 } else if (!foundHeader) {
214 if (type == ON_HEADER_RECEIVE) {
215 auto tempMap = static_cast<std::map<std::string, std::string> *>(data);
216 delete tempMap;
217 }
218 }
219
220 std::for_each(listeners_.begin(), listeners_.end(), [type, data, Handler, this](const EventListener &listener) {
221 if (listener.MatchType(type)) {
222 auto workWrapper = new UvWorkWrapper(data, listener.GetEnv(), type, this);
223 listener.EmitByUv(type, workWrapper, Handler);
224 }
225 });
226 }
227
HasEventListener(const std::string & type)228 bool EventManager::HasEventListener(const std::string &type)
229 {
230 std::lock_guard lock(mutexForListenersAndEmitByUv_);
231 return std::any_of(listeners_.begin(), listeners_.end(),
232 [&type](const EventListener &listener) -> bool { return listener.MatchType(type); });
233 }
234
DeleteListener(const std::string & type)235 void EventManager::DeleteListener(const std::string &type)
236 {
237 std::lock_guard lock(mutexForListenersAndEmitByUv_);
238 auto it = std::remove_if(listeners_.begin(), listeners_.end(),
239 [type](const EventListener &listener) -> bool { return listener.MatchType(type); });
240 listeners_.erase(it, listeners_.end());
241 }
242
243 std::unordered_set<EventManager *> EventManager::validManager_;
244 std::mutex EventManager::mutexForManager_;
245
SetInvalid(EventManager * manager)246 void EventManager::SetInvalid(EventManager *manager)
247 {
248 std::lock_guard lock(mutexForManager_);
249 auto pos = validManager_.find(manager);
250 if (pos == validManager_.end()) {
251 NETSTACK_LOGE("The manager is not in the unordered_set");
252 return;
253 }
254 validManager_.erase(pos);
255 delete manager;
256 manager = nullptr;
257 }
258
IsManagerValid(EventManager * manager)259 bool EventManager::IsManagerValid(EventManager *manager)
260 {
261 std::lock_guard lock(mutexForManager_);
262 return validManager_.find(manager) != validManager_.end();
263 }
264
SetValid(EventManager * manager)265 void EventManager::SetValid(EventManager *manager)
266 {
267 std::lock_guard lock(mutexForManager_);
268 validManager_.emplace(manager);
269 }
270
CreateEventReference(napi_env env,napi_value value)271 void EventManager::CreateEventReference(napi_env env, napi_value value)
272 {
273 if (env != nullptr && value != nullptr) {
274 eventRef_ = NapiUtils::CreateReference(env, value);
275 }
276 }
277
DeleteEventReference(napi_env env)278 void EventManager::DeleteEventReference(napi_env env)
279 {
280 if (env != nullptr && eventRef_ != nullptr) {
281 NapiUtils::DeleteReference(env, eventRef_);
282 }
283 eventRef_ = nullptr;
284 }
285
SetEventDestroy(bool flag)286 void EventManager::SetEventDestroy(bool flag)
287 {
288 isDestroy_.store(flag);
289 }
290
IsEventDestroy()291 bool EventManager::IsEventDestroy()
292 {
293 return isDestroy_.load();
294 }
295
GetWebSocketTextData()296 const std::string &EventManager::GetWebSocketTextData()
297 {
298 return webSocketTextData_;
299 }
300
AppendWebSocketTextData(void * data,size_t length)301 void EventManager::AppendWebSocketTextData(void *data, size_t length)
302 {
303 webSocketTextData_.append(reinterpret_cast<char *>(data), length);
304 }
305
GetWebSocketBinaryData()306 const std::string &EventManager::GetWebSocketBinaryData()
307 {
308 return webSocketBinaryData_;
309 }
310
AppendWebSocketBinaryData(void * data,size_t length)311 void EventManager::AppendWebSocketBinaryData(void *data, size_t length)
312 {
313 webSocketBinaryData_.append(reinterpret_cast<char *>(data), length);
314 }
315
ClearWebSocketTextData()316 void EventManager::ClearWebSocketTextData()
317 {
318 webSocketTextData_.clear();
319 }
320
ClearWebSocketBinaryData()321 void EventManager::ClearWebSocketBinaryData()
322 {
323 webSocketBinaryData_.clear();
324 }
325
NotifyRcvThdExit()326 void EventManager::NotifyRcvThdExit()
327 {
328 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
329 sockRcvExit_ = true;
330 sockRcvThdCon_.notify_one();
331 }
332
WaitForRcvThdExit()333 void EventManager::WaitForRcvThdExit()
334 {
335 std::unique_lock<std::mutex> lock(sockRcvThdMtx_);
336 sockRcvThdCon_.wait(lock, [this]() { return sockRcvExit_; });
337 }
338
SetReuseAddr(bool reuse)339 void EventManager::SetReuseAddr(bool reuse)
340 {
341 isReuseAddr_.store(reuse);
342 }
343
GetReuseAddr()344 bool EventManager::GetReuseAddr()
345 {
346 return isReuseAddr_.load();
347 }
348
SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> & userData)349 void EventManager::SetWebSocketUserData(const std::shared_ptr<Websocket::UserData> &userData)
350 {
351 std::lock_guard<std::mutex> lock(dataMutex_);
352 webSocketUserData_ = userData;
353 }
354
GetWebSocketUserData()355 std::shared_ptr<Websocket::UserData> EventManager::GetWebSocketUserData()
356 {
357 std::lock_guard<std::mutex> lock(dataMutex_);
358 return webSocketUserData_;
359 }
360
UvWorkWrapper(void * theData,napi_env theEnv,std::string eventType,EventManager * eventManager)361 UvWorkWrapper::UvWorkWrapper(void *theData, napi_env theEnv, std::string eventType, EventManager *eventManager)
362 : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
363 {
364 }
365
UvWorkWrapperShared(void * theData,napi_env theEnv,std::string eventType,const std::shared_ptr<EventManager> & eventManager)366 UvWorkWrapperShared::UvWorkWrapperShared(void *theData, napi_env theEnv, std::string eventType,
367 const std::shared_ptr<EventManager> &eventManager)
368 : data(theData), env(theEnv), type(std::move(eventType)), manager(eventManager)
369 {
370 }
371 } // namespace OHOS::NetStack
372