1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_base_listener.h"
17
18 #include <fcntl.h>
19 #include <securec.h>
20 #include <stdatomic.h>
21 #include <unistd.h>
22
23 #include "common_list.h"
24 #include "conn_event.h"
25 #include "conn_log.h"
26 #include "softbus_adapter_errcode.h"
27 #include "softbus_adapter_mem.h"
28 #include "softbus_adapter_socket.h"
29 #include "softbus_conn_common.h"
30 #include "softbus_conn_interface.h"
31 #include "softbus_def.h"
32 #include "softbus_errcode.h"
33 #include "softbus_feature_config.h"
34 #include "softbus_socket.h"
35 #include "softbus_utils.h"
36
37 #define MAX_LISTEN_EVENTS 1024
38 #define DEFAULT_BACKLOG 4
39 #define FDARR_EXPAND_BASE 2
40 #define SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS (3 * 1000)
41 #define SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS (3 * 10) /* wait retry time for an abnotmal event by select*/
42 #define SOFTBUS_LISTENER_SELECT_TIMEOUT_SEC (6 * 60 * 60)
43
44 enum BaseListenerStatus {
45 LISTENER_IDLE = 0,
46 LISTENER_RUNNING,
47 };
48
49 typedef struct {
50 ListNode node;
51 int32_t fd;
52 uint32_t triggerSet;
53 } FdNode;
54
55 typedef struct {
56 ListNode waitEventFds;
57 uint32_t waitEventFdsLen;
58
59 ModeType modeType;
60 LocalListenerInfo listenerInfo;
61 int32_t listenFd;
62 int32_t listenPort;
63
64 enum BaseListenerStatus status;
65 } SoftbusBaseListenerInfo;
66
67 typedef struct {
68 ListenerModule module;
69 SoftBusMutex lock;
70 SoftbusBaseListener listener;
71 const SocketInterface *socketIf;
72 SoftbusBaseListenerInfo info;
73 int32_t objectRc;
74 } SoftbusListenerNode;
75
76 typedef struct {
77 uint32_t traceId;
78 // pipe fds, to wakeup select thread in time
79 int32_t ctrlRfd;
80 int32_t ctrlWfd;
81
82 SoftBusMutex lock;
83 int32_t referenceCount;
84 } SelectThreadState;
85
86 static int32_t ShutdownBaseListener(SoftbusListenerNode *node);
87 static int32_t StartSelectThread(void);
88 static int32_t StopSelectThread(void);
89 static void WakeupSelectThread(void);
90 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module);
91
92 static SoftBusMutex g_listenerListLock = { 0 };
93 static SoftbusListenerNode *g_listenerList[UNUSE_BUTT] = { 0 };
94 static SoftBusMutex g_selectThreadStateLock = { 0 };
95 static SelectThreadState *g_selectThreadState = NULL;
96 static _Atomic bool g_initBaseListener = false;
97
GetListenerNodeCommon(ListenerModule module,bool create)98 static SoftbusListenerNode *GetListenerNodeCommon(ListenerModule module, bool create)
99 {
100 int32_t status = SoftBusMutexLock(&g_listenerListLock);
101 CONN_CHECK_AND_RETURN_RET_LOGE(
102 status == SOFTBUS_OK, NULL, CONN_COMMON, "lock failed, module=%{public}d, error=%{public}d",
103 module, status);
104 SoftbusListenerNode *node = g_listenerList[module];
105 do {
106 if (node == NULL) {
107 if (create) {
108 node = CreateSpecifiedListenerModule(module);
109 }
110 if (node == NULL) {
111 break;
112 }
113 g_listenerList[module] = node;
114 }
115 status = SoftBusMutexLock(&node->lock);
116 if (status != SOFTBUS_OK) {
117 CONN_LOGE(CONN_COMMON, "lock listener failed, module=%{public}d, error=%{public}d", module, status);
118 node = NULL;
119 break;
120 }
121 node->objectRc += 1;
122 SoftBusMutexUnlock(&node->lock);
123 } while (false);
124 (void)SoftBusMutexUnlock(&g_listenerListLock);
125 return node;
126 }
127
GetListenerNode(ListenerModule module)128 static SoftbusListenerNode *GetListenerNode(ListenerModule module)
129 {
130 return GetListenerNodeCommon(module, false);
131 }
132
GetOrCreateListenerNode(ListenerModule module)133 static SoftbusListenerNode *GetOrCreateListenerNode(ListenerModule module)
134 {
135 return GetListenerNodeCommon(module, true);
136 }
137
RemoveListenerNode(SoftbusListenerNode * node)138 static void RemoveListenerNode(SoftbusListenerNode *node)
139 {
140 int32_t status = SoftBusMutexLock(&g_listenerListLock);
141 CONN_CHECK_AND_RETURN_LOGE(
142 status == SOFTBUS_OK, CONN_COMMON, "lock listener lists failed, module=%{public}d, error=%{public}d",
143 node->module, status);
144 do {
145 if (g_listenerList[node->module] != node) {
146 CONN_LOGW(CONN_COMMON, "listener node is not in listener list, just skip, module=%{public}d",
147 node->module);
148 break;
149 }
150 status = SoftBusMutexLock(&node->lock);
151 if (status != SOFTBUS_OK) {
152 CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d", node->module);
153 break;
154 }
155 // decrease root object reference
156 node->objectRc -= 1;
157 g_listenerList[node->module] = NULL;
158 (void)SoftBusMutexUnlock(&node->lock);
159 } while (false);
160 (void)SoftBusMutexUnlock(&g_listenerListLock);
161 }
162
ReturnListenerNode(SoftbusListenerNode ** nodePtr)163 static void ReturnListenerNode(SoftbusListenerNode **nodePtr)
164 {
165 SoftbusListenerNode *node = *nodePtr;
166 do {
167 int32_t status = SoftBusMutexLock(&node->lock);
168 if (status != SOFTBUS_OK) {
169 CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d", node->module);
170 break;
171 }
172 node->objectRc -= 1;
173 int32_t objectRc = node->objectRc;
174 (void)SoftBusMutexUnlock(&node->lock);
175
176 if (objectRc > 0) {
177 break;
178 }
179 CONN_LOGI(CONN_COMMON, "object reference count <= 0, free listener node, module=%{public}d, "
180 "objectReference=%{public}d", node->module, objectRc);
181 (void)ShutdownBaseListener(node);
182 SoftBusMutexDestroy(&node->lock);
183 SoftBusFree(node);
184 } while (false);
185
186 *nodePtr = NULL;
187 }
188
CreateSpecifiedListenerModule(ListenerModule module)189 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module)
190 {
191 SoftbusListenerNode *node = (SoftbusListenerNode *)SoftBusCalloc(sizeof(SoftbusListenerNode));
192 CONN_CHECK_AND_RETURN_RET_LOGE(
193 node != NULL, NULL, CONN_COMMON, "calloc failed, module=%{public}d", module);
194
195 node->module = module;
196 // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
197 int32_t status = SoftBusMutexInit(&node->lock, NULL);
198 if (status != SOFTBUS_OK) {
199 CONN_LOGE(CONN_COMMON, "init lock failed, module=%{public}d, error=%{public}d", module, status);
200 SoftBusFree(node);
201 return NULL;
202 }
203 node->listener.onConnectEvent = NULL;
204 node->listener.onDataEvent = NULL;
205
206 node->socketIf = NULL;
207
208 ListInit(&node->info.waitEventFds);
209 node->info.waitEventFdsLen = 0;
210 node->info.modeType = UNSET_MODE;
211 (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
212 node->info.listenFd = -1;
213 node->info.listenPort = -1;
214 // set root object reference count 1
215 node->objectRc = 1;
216 return node;
217 }
218
InitBaseListener(void)219 int32_t InitBaseListener(void)
220 {
221 if (atomic_load_explicit(&g_initBaseListener, memory_order_acquire)) {
222 return SOFTBUS_OK;
223 }
224 // flag : if the client and server are in the same process, this function can be executed only once.
225 static bool flag = false;
226 if (flag) {
227 return SOFTBUS_OK;
228 }
229 flag = true;
230
231 // stop select thread need re-enter lock
232 SoftBusMutexAttr attr = {
233 .type = SOFTBUS_MUTEX_RECURSIVE,
234 };
235 int32_t status = SoftBusMutexInit(&g_selectThreadStateLock, &attr);
236 if (status != SOFTBUS_OK) {
237 CONN_LOGE(CONN_INIT, "init select thread lock failed, error=%{public}d", status);
238 return SOFTBUS_LOCK_ERR;
239 }
240 // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
241 status = SoftBusMutexInit(&g_listenerListLock, NULL);
242 if (status != SOFTBUS_OK) {
243 SoftBusMutexDestroy(&g_selectThreadStateLock);
244 CONN_LOGE(CONN_INIT, "init listener list lock failed, error=%{public}d", status);
245 return SOFTBUS_LOCK_ERR;
246 }
247
248 status = SoftBusMutexLock(&g_listenerListLock);
249 if (status != SOFTBUS_OK) {
250 CONN_LOGE(CONN_INIT, "lock listener list failed, error=%{public}d", status);
251 SoftBusMutexDestroy(&g_selectThreadStateLock);
252 SoftBusMutexDestroy(&g_listenerListLock);
253 return SOFTBUS_LOCK_ERR;
254 }
255 (void)memset_s(g_listenerList, sizeof(g_listenerList), 0, sizeof(g_listenerList));
256 (void)SoftBusMutexUnlock(&g_listenerListLock);
257 if (status != SOFTBUS_OK) {
258 CONN_LOGE(CONN_INIT, "create static module listener failed, error=%{public}d", status);
259 SoftBusMutexDestroy(&g_selectThreadStateLock);
260 SoftBusMutexDestroy(&g_listenerListLock);
261 return status;
262 }
263 atomic_store_explicit(&g_initBaseListener, true, memory_order_release);
264 return SOFTBUS_OK;
265 }
266
DeinitBaseListener(void)267 void DeinitBaseListener(void)
268 {
269 if (!atomic_load_explicit(&g_initBaseListener, memory_order_acquire)) {
270 return;
271 }
272
273 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
274 SoftbusListenerNode *node = GetListenerNode(module);
275 if (node == NULL) {
276 continue;
277 }
278 RemoveListenerNode(node);
279 ReturnListenerNode(&node);
280 }
281 atomic_store_explicit(&g_initBaseListener, false, memory_order_release);
282 }
283
CreateListenerModule(void)284 uint32_t CreateListenerModule(void)
285 {
286 int32_t status = SoftBusMutexLock(&g_listenerListLock);
287 CONN_CHECK_AND_RETURN_RET_LOGE(
288 status == SOFTBUS_OK, UNUSE_BUTT, CONN_COMMON, "lock failed, error=%{public}d", status);
289
290 ListenerModule module = LISTENER_MODULE_DYNAMIC_START;
291 for (; module <= LISTENER_MODULE_DYNAMIC_END; module++) {
292 if (g_listenerList[module] != NULL) {
293 continue;
294 }
295 SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
296 if (node == NULL) {
297 CONN_LOGE(CONN_COMMON, "create specified listener module failed, module=%{public}d", module);
298 module = UNUSE_BUTT;
299 } else {
300 CONN_LOGI(CONN_COMMON, "create listener module success, module=%{public}d", module);
301 g_listenerList[module] = node;
302 }
303 break;
304 }
305 (void)SoftBusMutexUnlock(&g_listenerListLock);
306 return module;
307 }
308
DestroyBaseListener(ListenerModule module)309 void DestroyBaseListener(ListenerModule module)
310 {
311 CONN_CHECK_AND_RETURN_LOGW(module >= LISTENER_MODULE_DYNAMIC_START && module <= LISTENER_MODULE_DYNAMIC_END,
312 CONN_COMMON, "only dynamic module support destroy, module=%{public}d", module);
313
314 CONN_LOGI(CONN_COMMON, "receive request, module=%{public}d", module);
315 SoftbusListenerNode *node = GetListenerNode(module);
316 if (node == NULL) {
317 CONN_LOGW(CONN_COMMON, "listener not exist, module=%{public}d", module);
318 return;
319 }
320 RemoveListenerNode(node);
321 ReturnListenerNode(&node);
322 }
323
StartBaseClient(ListenerModule module,const SoftbusBaseListener * listener)324 int32_t StartBaseClient(ListenerModule module, const SoftbusBaseListener *listener)
325 {
326 CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
327 "invalid module, module=%{public}d", module);
328 CONN_CHECK_AND_RETURN_RET_LOGW(listener != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
329 "listener is null, module=%{public}d", module);
330 CONN_CHECK_AND_RETURN_RET_LOGW(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
331 "listener onConnectEvent is null, module=%{public}d", module);
332 CONN_CHECK_AND_RETURN_RET_LOGW(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
333 "listener onDataEvent is null, module=%{public}d", module);
334
335 SoftbusListenerNode *node = GetOrCreateListenerNode(module);
336 CONN_CHECK_AND_RETURN_RET_LOGW(
337 node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "get listener node failed, module=%{public}d", module);
338
339 int32_t status = SoftBusMutexLock(&node->lock);
340 if (status != SOFTBUS_OK) {
341 CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d, error=%{public}d", module, status);
342 ReturnListenerNode(&node);
343 return SOFTBUS_LOCK_ERR;
344 }
345 do {
346 if (node->info.status != LISTENER_IDLE) {
347 CONN_LOGE(CONN_COMMON, "listener is not idle status, module=%{public}d, status=%{public}d",
348 module, node->info.status);
349 status = SOFTBUS_CONN_LISTENER_NOT_IDLE;
350 break;
351 }
352 node->listener.onConnectEvent = listener->onConnectEvent;
353 node->listener.onDataEvent = listener->onDataEvent;
354 status = StartSelectThread();
355 if (status != SOFTBUS_OK) {
356 CONN_LOGE(CONN_COMMON, "start select thread failed, module=%{public}d, "
357 "status=%{public}d", module, status);
358 break;
359 }
360 node->info.status = LISTENER_RUNNING;
361 CONN_LOGI(CONN_COMMON, "start base client listener success, module=%{public}d", module);
362 } while (false);
363 (void)SoftBusMutexUnlock(&node->lock);
364 ReturnListenerNode(&node);
365 return status;
366 }
367
StartServerListenUnsafe(SoftbusListenerNode * node,const LocalListenerInfo * info)368 static int32_t StartServerListenUnsafe(SoftbusListenerNode *node, const LocalListenerInfo *info)
369 {
370 ListenerModule module = node->module;
371 ProtocolType protocol = info->socketOption.protocol;
372 const SocketInterface *socketIf = GetSocketInterface(protocol);
373 if (socketIf == NULL) {
374 CONN_LOGE(CONN_COMMON, "not find protocal implement, module=%{public}d, protocal=%{public}d", module, protocol);
375 return SOFTBUS_NOT_FIND;
376 }
377 node->socketIf = socketIf;
378
379 int32_t listenFd = -1;
380 int32_t listenPort = -1;
381 int32_t status = SOFTBUS_OK;
382 do {
383 listenFd = socketIf->OpenServerSocket(info);
384 if (listenFd < 0) {
385 CONN_LOGE(CONN_COMMON, "create server socket failed: module=%{public}d, listenFd=%{public}d",
386 module, listenFd);
387 status = listenFd;
388 break;
389 }
390 status = SoftBusSocketListen(listenFd, DEFAULT_BACKLOG);
391 if (status != SOFTBUS_OK) {
392 CONN_LOGE(CONN_COMMON, "listen server socket failed: module=%{public}d, error=%{public}d", module, status);
393 break;
394 }
395 listenPort = socketIf->GetSockPort(listenFd);
396 if (listenPort < 0) {
397 CONN_LOGE(CONN_COMMON, "get listen server port failed: module=%{public}d, listenFd=%{public}d, "
398 "error=%{public}d", module, listenFd, status);
399 status = SOFTBUS_TCP_SOCKET_ERR;
400 break;
401 }
402 if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
403 CONN_LOGE(CONN_COMMON, "memcpy_s local listener info object failed: module=%{public}d", module);
404 status = SOFTBUS_MEM_ERR;
405 break;
406 }
407 node->info.modeType = SERVER_MODE;
408 node->info.listenFd = listenFd;
409 node->info.listenPort = listenPort;
410 } while (false);
411 if (status != SOFTBUS_OK && listenFd > 0) {
412 ConnShutdownSocket(listenFd);
413 }
414 return status == SOFTBUS_OK ? listenPort : status;
415 }
416
CleanupServerListenInfoUnsafe(SoftbusListenerNode * node)417 static void CleanupServerListenInfoUnsafe(SoftbusListenerNode *node)
418 {
419 memset_s(&node->info.listenerInfo, sizeof(SoftbusBaseListenerInfo), 0, sizeof(SoftbusBaseListenerInfo));
420 if (node->info.listenFd > 0) {
421 ConnShutdownSocket(node->info.listenFd);
422 }
423 node->info.listenFd = -1;
424 node->info.listenPort = -1;
425 }
426
FillConnEventExtra(const LocalListenerInfo * info,ConnEventExtra * extra,int32_t err)427 static void FillConnEventExtra(const LocalListenerInfo *info, ConnEventExtra *extra, int32_t err)
428 {
429 if (info == NULL || extra == NULL) {
430 return;
431 }
432 extra->errcode = err;
433 extra->result = err == SOFTBUS_OK ? EVENT_STAGE_RESULT_OK : EVENT_STAGE_RESULT_FAILED;
434 extra->linkType = info->type;
435 extra->moduleId = info->socketOption.moduleId;
436 extra->proType = info->socketOption.protocol;
437 }
438
StartBaseListener(const LocalListenerInfo * info,const SoftbusBaseListener * listener)439 int32_t StartBaseListener(const LocalListenerInfo *info, const SoftbusBaseListener *listener)
440 {
441 ConnEventExtra extra = {
442 .result = 0
443 };
444 CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
445 CONN_CHECK_AND_RETURN_RET_LOGW(info != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "info is null");
446 CONN_CHECK_AND_RETURN_RET_LOGW(info->type == CONNECT_TCP || info->type == CONNECT_P2P || info->type == CONNECT_HML,
447 SOFTBUS_INVALID_PARAM, CONN_COMMON, "only CONNECT_TCP, CONNECT_P2P and CONNECT_HML is permitted, "
448 "CONNECT_TCP=%{public}d, CONNECT_P2P=%{public}d, CONNECT_HML=%{public}d, type=%{public}d",
449 CONNECT_TCP, CONNECT_P2P, CONNECT_HML, info->type);
450 CONN_CHECK_AND_RETURN_RET_LOGW(info->socketOption.port >= 0, SOFTBUS_INVALID_PARAM, CONN_COMMON,
451 "port is invalid, port=%{public}d", info->socketOption.port);
452 CONN_CHECK_AND_RETURN_RET_LOGW(info->socketOption.moduleId >= 0 && info->socketOption.moduleId < UNUSE_BUTT,
453 SOFTBUS_INVALID_PARAM, CONN_COMMON, "invalid module, module=%{public}d", info->socketOption.moduleId);
454 CONN_CHECK_AND_RETURN_RET_LOGW(listener != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
455 "listener is null, module=%{public}d", info->socketOption.moduleId);
456 CONN_CHECK_AND_RETURN_RET_LOGW(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
457 "listener onConnectEvent is null, module=%{public}d", info->socketOption.moduleId);
458 CONN_CHECK_AND_RETURN_RET_LOGW(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
459 "listener onDataEvent is null, module=%{public}d", info->socketOption.moduleId);
460
461 ListenerModule module = info->socketOption.moduleId;
462 SoftbusListenerNode *node = GetOrCreateListenerNode(module);
463 CONN_CHECK_AND_RETURN_RET_LOGW(
464 node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "get listener node failed, module=%{public}d", module);
465 int32_t status = SoftBusMutexLock(&node->lock);
466 if (status != SOFTBUS_OK) {
467 CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, error=%{public}d", module, status);
468 ReturnListenerNode(&node);
469 FillConnEventExtra(info, &extra, SOFTBUS_LOCK_ERR);
470 CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
471 return SOFTBUS_LOCK_ERR;
472 }
473
474 int32_t listenPort = -1;
475 do {
476 if (node->info.status != LISTENER_IDLE) {
477 CONN_LOGE(CONN_COMMON, "listener is not idle status, module=%{public}d, status=%{public}d",
478 module, node->info.status);
479 status = SOFTBUS_CONN_LISTENER_NOT_IDLE;
480 break;
481 }
482
483 node->listener.onConnectEvent = listener->onConnectEvent;
484 node->listener.onDataEvent = listener->onDataEvent;
485 if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
486 CONN_LOGE(CONN_COMMON, "memcpy_s listener info failed, module=%{public}d", node->module);
487 status = SOFTBUS_LOCK_ERR;
488 break;
489 }
490 listenPort = StartServerListenUnsafe(node, info);
491 if (listenPort <= 0) {
492 CONN_LOGE(CONN_COMMON, "start server failed, module=%{public}d, listenPort=%{public}d",
493 module, listenPort);
494 status = listenPort;
495 break;
496 }
497
498 status = StartSelectThread();
499 if (status != SOFTBUS_OK) {
500 CONN_LOGE(CONN_COMMON, "start listener thread failed, module=%{public}d, status=%{public}d",
501 module, status);
502 CleanupServerListenInfoUnsafe(node);
503 break;
504 }
505 node->info.status = LISTENER_RUNNING;
506 CONN_LOGI(CONN_COMMON, "start base listener success, module=%{public}d, listenFd=%{public}d, "
507 "listenPort=%{public}d", module, node->info.listenFd, listenPort);
508 } while (false);
509 (void)SoftBusMutexUnlock(&node->lock);
510 ReturnListenerNode(&node);
511 FillConnEventExtra(info, &extra, status);
512 CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
513 return status == SOFTBUS_OK ? listenPort : status;
514 }
515
StopBaseListener(ListenerModule module)516 int32_t StopBaseListener(ListenerModule module)
517 {
518 ConnEventExtra extra = {
519 .moduleId = module,
520 .result = 0
521 };
522 CONN_EVENT(EVENT_SCENE_STOP_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
523 CONN_CHECK_AND_RETURN_RET_LOGW(
524 module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
525 "invalid module, module=%{public}d", module);
526
527 CONN_LOGI(CONN_COMMON, "receive request, module=%{public}d", module);
528 SoftbusListenerNode *node = GetListenerNode(module);
529 CONN_CHECK_AND_RETURN_RET_LOGW(
530 node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "listener node not exist, module=%{public}d", module);
531
532 int32_t status = ShutdownBaseListener(node);
533 if (status != SOFTBUS_OK) {
534 CONN_LOGE(CONN_COMMON, "stop listen thread failed, module=%{public}d, error=%{public}d", module, status);
535 }
536 ReturnListenerNode(&node);
537 extra.errcode = status;
538 extra.result = status == SOFTBUS_OK ? EVENT_STAGE_RESULT_OK : EVENT_STAGE_RESULT_FAILED;
539 CONN_EVENT(EVENT_SCENE_STOP_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
540 return status;
541 }
542
ShutdownBaseListener(SoftbusListenerNode * node)543 static int32_t ShutdownBaseListener(SoftbusListenerNode *node)
544 {
545 int32_t status = SoftBusMutexLock(&node->lock);
546 CONN_CHECK_AND_RETURN_RET_LOGE(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON,
547 "lock failed, module=%{public}d, error=%{public}d", node->module, status);
548
549 do {
550 if (node->info.status != LISTENER_RUNNING) {
551 CONN_LOGW(CONN_COMMON, "listener is not running, just skip, module=%{public}d, error=%{public}d",
552 node->module, node->info.status);
553 break;
554 }
555 status = StopSelectThread();
556 if (status != SOFTBUS_OK) {
557 CONN_LOGE(CONN_COMMON, "stop select thread failed, module=%{public}d, error=%{public}d",
558 node->module, status);
559 // fall-through
560 }
561 node->info.status = LISTENER_IDLE;
562
563 FdNode *it = NULL;
564 FdNode *next = NULL;
565 LIST_FOR_EACH_ENTRY_SAFE(it, next, &node->info.waitEventFds, FdNode, node) {
566 CONN_LOGE(CONN_COMMON, "listener node there is fd not close, module=%{public}d, fd=%{public}d, "
567 "triggerSet=%{public}u", node->module, it->fd, it->triggerSet);
568 // not close fd, repeat close will crash process
569 ListDelete(&it->node);
570 SoftBusFree(it);
571 }
572 node->info.waitEventFdsLen = 0;
573
574 int32_t listenFd = node->info.listenFd;
575 int32_t listenPort = node->info.listenPort;
576 if (node->info.modeType == SERVER_MODE && listenFd > 0) {
577 CONN_LOGE(CONN_COMMON, "close server, module=%{public}d, listenFd=%{public}d, port=%{public}d",
578 node->module, listenFd, listenPort);
579 ConnCloseSocket(listenFd);
580 }
581 node->info.modeType = UNSET_MODE;
582 node->info.listenFd = -1;
583 node->info.listenPort = -1;
584 (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
585 } while (false);
586
587 SoftBusMutexUnlock(&node->lock);
588 return status;
589 }
590
IsValidTriggerType(TriggerType trigger)591 static bool IsValidTriggerType(TriggerType trigger)
592 {
593 switch (trigger) {
594 case READ_TRIGGER:
595 case WRITE_TRIGGER:
596 case EXCEPT_TRIGGER:
597 case RW_TRIGGER:
598 return true;
599 default:
600 return false;
601 }
602 }
603
IsListenerNodeExist(ListenerModule module)604 bool IsListenerNodeExist(ListenerModule module)
605 {
606 SoftbusListenerNode *node = GetListenerNode(module);
607 bool exist = false;
608 if (node != NULL) {
609 exist = true;
610 ReturnListenerNode(&node);
611 }
612 return exist;
613 }
614
AddTrigger(ListenerModule module,int32_t fd,TriggerType trigger)615 int32_t AddTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
616 {
617 CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
618 "invalid module, module=%{public}d", module);
619 CONN_CHECK_AND_RETURN_RET_LOGW(
620 fd > 0, SOFTBUS_INVALID_PARAM, CONN_COMMON, "invalid fd, module=%{public}d, fd=%{public}d", module, fd);
621 CONN_CHECK_AND_RETURN_RET_LOGW(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM, CONN_COMMON,
622 "invalid trigger, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
623
624 SoftbusListenerNode *node = GetListenerNode(module);
625 CONN_CHECK_AND_RETURN_RET_LOGW(node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON,
626 "listener node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
627
628 int32_t status = SoftBusMutexLock(&node->lock);
629 if (status != SOFTBUS_OK) {
630 CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
631 "error=%{public}d", module, fd, trigger, status);
632 ReturnListenerNode(&node);
633 return SOFTBUS_LOCK_ERR;
634 }
635
636 bool wakeup = false;
637 do {
638 if (node->info.status != LISTENER_RUNNING) {
639 CONN_LOGE(CONN_COMMON, "module is not running, module=%{public}d, fd=%{public}d, trigger=%{public}d",
640 module, fd, trigger);
641 status = SOFTBUS_CONN_FAIL;
642 break;
643 }
644
645 if (node->info.waitEventFdsLen > MAX_LISTEN_EVENTS) {
646 CONN_LOGE(CONN_COMMON, "can not trigger more, fd=%{public}d > MAX_LISTEN_EVENTS=%{public}d, "
647 "module=%{public}d, trigger=%{public}d, waitEventFdsLen=%{public}d",
648 fd, MAX_LISTEN_EVENTS, module, trigger, node->info.waitEventFdsLen);
649 status = SOFTBUS_CONN_FAIL;
650 break;
651 }
652
653 FdNode *target = NULL;
654 FdNode *it = NULL;
655 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
656 if (fd == it->fd) {
657 target = it;
658 break;
659 }
660 }
661
662 if (target != NULL) {
663 if ((it->triggerSet & trigger) == trigger) {
664 CONN_LOGW(CONN_COMMON, "repeat add trigger, just skip, module=%{public}d, fd=%{public}d, "
665 "trigger=%{public}d, triggerSet=%{public}u",
666 module, fd, trigger, it->triggerSet);
667 break;
668 }
669 it->triggerSet |= trigger;
670 CONN_LOGI(CONN_COMMON, "add trigger success, module=%{public}d, fd=%{public}d, newAddTrigger=%{public}d, "
671 "triggerSet=%{public}u", module, fd, trigger, it->triggerSet);
672 wakeup = true;
673 break;
674 }
675
676 FdNode *fdNode = (FdNode *)SoftBusCalloc(sizeof(FdNode));
677 if (fdNode == NULL) {
678 CONN_LOGE(CONN_COMMON, "calloc failed, module=%{public}d, fd=%{public}d, trigger=%{public}d",
679 module, fd, trigger);
680 status = SOFTBUS_MALLOC_ERR;
681 break;
682 }
683 ListInit(&fdNode->node);
684 fdNode->fd = fd;
685 fdNode->triggerSet = trigger;
686 ListAdd(&node->info.waitEventFds, &fdNode->node);
687 node->info.waitEventFdsLen += 1;
688 wakeup = true;
689 CONN_LOGI(CONN_COMMON, "add trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d",
690 module, fd, trigger);
691 } while (false);
692
693 (void)SoftBusMutexUnlock(&node->lock);
694 ReturnListenerNode(&node);
695
696 if (status == SOFTBUS_OK && wakeup) {
697 WakeupSelectThread();
698 }
699 return status;
700 }
701
DelTrigger(ListenerModule module,int32_t fd,TriggerType trigger)702 int32_t DelTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
703 {
704 CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
705 "invalid module, module=%{public}d", module);
706 CONN_CHECK_AND_RETURN_RET_LOGW(fd > 0, SOFTBUS_INVALID_PARAM, CONN_COMMON,
707 "invalid fd, module=%{public}d, fd=%{public}d", module, fd);
708 CONN_CHECK_AND_RETURN_RET_LOGW(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM, CONN_COMMON,
709 "invalid trigger, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
710
711 SoftbusListenerNode *node = GetListenerNode(module);
712 CONN_CHECK_AND_RETURN_RET_LOGW(node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON,
713 "listener node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
714
715 int32_t status = SoftBusMutexLock(&node->lock);
716 if (status != SOFTBUS_OK) {
717 CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
718 "error=%{public}d", module, fd, trigger, status);
719 ReturnListenerNode(&node);
720 return SOFTBUS_LOCK_ERR;
721 }
722
723 bool wakeup = false;
724 do {
725 FdNode *target = NULL;
726 FdNode *it = NULL;
727 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
728 if (fd == it->fd) {
729 target = it;
730 break;
731 }
732 }
733
734 if (target == NULL) {
735 CONN_LOGW(CONN_COMMON, "fd node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d",
736 module, fd, trigger);
737 // consider delete trigger success,
738 status = SOFTBUS_OK;
739 break;
740 }
741
742 if ((target->triggerSet & trigger) == 0) {
743 CONN_LOGW(CONN_COMMON,
744 "without add trigger before, repeat delete trigger or mismatch module. "
745 "module=%{public}d, fd=%{public}d, wantDeleteTrigger=%{public}d, triggerSet=%{public}u",
746 module, fd, trigger, it->triggerSet);
747 // consider delete trigger success,
748 status = SOFTBUS_OK;
749 break;
750 }
751
752 target->triggerSet &= ~trigger;
753 wakeup = true;
754 if (target->triggerSet != 0) {
755 CONN_LOGI(CONN_COMMON, "delete trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
756 "triggerSet=%{public}u", module, fd, trigger, target->triggerSet);
757 status = SOFTBUS_OK;
758 break;
759 }
760 CONN_LOGI(
761 CONN_COMMON,
762 "delete trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d",
763 module, fd, trigger);
764 ListDelete(&target->node);
765 SoftBusFree(target);
766 node->info.waitEventFdsLen -= 1;
767 } while (false);
768
769 SoftBusMutexUnlock(&node->lock);
770 ReturnListenerNode(&node);
771
772 if (status == SOFTBUS_OK && wakeup) {
773 WakeupSelectThread();
774 }
775 return status;
776 }
777
CleanupSelectThreadState(SelectThreadState ** statePtr)778 static void CleanupSelectThreadState(SelectThreadState **statePtr)
779 {
780 SelectThreadState *state = *statePtr;
781 if (state->ctrlRfd != 0) {
782 ConnCloseSocket(state->ctrlRfd);
783 }
784 if (state->ctrlWfd != 0) {
785 ConnCloseSocket(state->ctrlWfd);
786 }
787
788 CONN_LOGI(CONN_COMMON, "cleanup select thread state, traceId=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d",
789 state->traceId, state->ctrlRfd, state->ctrlWfd);
790 (void)SoftBusMutexDestroy(&state->lock);
791 SoftBusFree(state);
792 *statePtr = NULL;
793 }
794
ProcessCtrlFdEvent(int32_t fd,int32_t wakeupTrace)795 static void ProcessCtrlFdEvent(int32_t fd, int32_t wakeupTrace)
796 {
797 #ifndef __LITEOS__
798 while (true) {
799 int32_t ctrlTraceId = 0;
800 ssize_t len = read(fd, &ctrlTraceId, sizeof(ctrlTraceId));
801 if (len < 0) {
802 int32_t status = errno;
803 if (status == EINTR) {
804 continue;
805 } else if (status == EAGAIN) {
806 break;
807 } else {
808 CONN_LOGE(
809 CONN_COMMON, "wakeupTrace=%{public}d, fd=%{public}d, readLen=%{public}zd, error=%{public}d",
810 wakeupTrace, fd, len, status);
811 break;
812 }
813 }
814 CONN_LOGI(CONN_COMMON, "wakeup ctrl message received, wakeupTrace=%{public}d, fd=%{public}d, "
815 "ctrlTraceId=%{public}d, readLength=%{public}zd", wakeupTrace, fd, ctrlTraceId, len);
816 }
817 #endif
818 }
819
DispatchFdEvent(int32_t fd,ListenerModule module,enum SocketEvent event,const SoftbusBaseListener * listener,int32_t wakeupTrace)820 static void DispatchFdEvent(
821 int32_t fd, ListenerModule module, enum SocketEvent event, const SoftbusBaseListener *listener, int32_t wakeupTrace)
822 {
823 if (listener->onDataEvent != NULL) {
824 listener->onDataEvent(module, event, fd);
825 CONN_LOGI(CONN_COMMON,
826 "wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, event=%{public}d",
827 wakeupTrace, module, fd, event);
828 } else {
829 CONN_LOGE(CONN_COMMON,
830 "listener not registered, to avoid repeat wakeup, close it,"
831 "wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, event=%{public}d",
832 wakeupTrace, module, fd, event);
833 ConnCloseSocket(fd);
834 }
835 }
836
ProcessSpecifiedServerAcceptEvent(ListenerModule module,int32_t listenFd,ConnectType connectType,const SocketInterface * socketIf,const SoftbusBaseListener * listener,int32_t wakeupTrace)837 static int32_t ProcessSpecifiedServerAcceptEvent(ListenerModule module, int32_t listenFd, ConnectType connectType,
838 const SocketInterface *socketIf, const SoftbusBaseListener *listener, int32_t wakeupTrace)
839 {
840 CONN_CHECK_AND_RETURN_RET_LOGW(socketIf != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
841 "socket interface implement is null, wakeupTrace=%{public}d, module=%{public}d", wakeupTrace, module);
842 CONN_CHECK_AND_RETURN_RET_LOGW(socketIf->AcceptClient != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
843 "socket interface implement not support AcceptClient method, wakeupTrace=%{public}d, module=%{public}d",
844 wakeupTrace, module);
845
846 int32_t status = SOFTBUS_OK;
847 while (true) {
848 int32_t clientFd = -1;
849 ConnectOption clientAddr = {
850 .type = connectType,
851 .socketOption = {
852 .addr = { 0 },
853 .port = 0,
854 .moduleId = module,
855 .protocol = 0,
856 .keepAlive = 0,
857 },
858 };
859 status = SOFTBUS_TEMP_FAILURE_RETRY(socketIf->AcceptClient(listenFd, &clientAddr, &clientFd));
860 if (status != SOFTBUS_OK) {
861 break;
862 }
863
864 char animizedIp[IP_LEN] = { 0 };
865 ConvertAnonymizeIpAddress(animizedIp, IP_LEN, clientAddr.socketOption.addr, IP_LEN);
866 if (listener->onConnectEvent != NULL) {
867 CONN_LOGI(CONN_COMMON,
868 "trigger ACCEPT event, wakeupTrace=%{public}d, module=%{public}d, listenFd=%{public}d, "
869 "clientIp=%{public}s, clientFd=%{public}d", wakeupTrace, module, listenFd, animizedIp, clientFd);
870 listener->onConnectEvent(module, clientFd, &clientAddr);
871 } else {
872 CONN_LOGE(CONN_COMMON,
873 "event listener not registered, wakeupTrace=%{public}d, module=%{public}d, "
874 "listenFd=%{public}d, clientIp=%{public}s, clientFd=%{public}d",
875 wakeupTrace, module, listenFd, animizedIp, clientFd);
876 ConnCloseSocket(clientFd);
877 }
878 }
879 return status;
880 }
881
CopyWaitEventFdsUnsafe(const SoftbusListenerNode * node,FdNode ** outArray,uint32_t * outArrayLen)882 static int32_t CopyWaitEventFdsUnsafe(const SoftbusListenerNode *node, FdNode **outArray, uint32_t *outArrayLen)
883 {
884 if (node->info.waitEventFdsLen == 0) {
885 *outArray = NULL;
886 outArrayLen = 0;
887 return SOFTBUS_OK;
888 }
889
890 uint32_t fdArrayLen = node->info.waitEventFdsLen;
891 FdNode *fdArray = (FdNode *)SoftBusCalloc(fdArrayLen * sizeof(FdNode));
892 CONN_CHECK_AND_RETURN_RET_LOGE(fdArray != NULL, SOFTBUS_MALLOC_ERR, CONN_COMMON,
893 "calloc failed, module=%{public}d, eventLen=%{public}u", node->module, fdArrayLen);
894
895 uint32_t i = 0;
896 FdNode *item = NULL;
897 bool expand = false;
898 LIST_FOR_EACH_ENTRY(item, &node->info.waitEventFds, FdNode, node) {
899 if (i >= fdArrayLen) {
900 uint32_t tmpLen = fdArrayLen * FDARR_EXPAND_BASE;
901 FdNode *tmp = (FdNode *)SoftBusCalloc(tmpLen * sizeof(FdNode));
902 if (tmp == NULL) {
903 CONN_LOGE(CONN_COMMON, "expand calloc fd node array object failed, module=%{public}d, "
904 "eventLen=%{public}u", node->module, tmpLen);
905 SoftBusFree(fdArray);
906 return SOFTBUS_MALLOC_ERR;
907 }
908 for (uint32_t j = 0; j < fdArrayLen; j++) {
909 tmp[j].fd = fdArray[j].fd;
910 tmp[j].triggerSet = fdArray[j].triggerSet;
911 }
912 SoftBusFree(fdArray);
913 fdArray = tmp;
914 fdArrayLen = tmpLen;
915 expand = true;
916 }
917 fdArray[i].fd = item->fd;
918 fdArray[i].triggerSet = item->triggerSet;
919 i++;
920 }
921
922 // diagnose by the way
923 if (expand) {
924 CONN_LOGE(CONN_COMMON, "listener node 'waitEventFdsLen' field is unexpected, "
925 "actualWaitEventFdsLen=%{public}u > waitEventFdsLen=%{public}u, module=%{public}d",
926 i, node->info.waitEventFdsLen, node->module);
927 } else if (i != fdArrayLen) {
928 CONN_LOGE(CONN_COMMON, "listener node 'waitEventFdsLen' field is unexpected, "
929 "actualWaitEventFdsLen=%{public}u < waitEventFdsLen=%{public}u, module=%{public}d",
930 i, node->info.waitEventFdsLen, node->module);
931 }
932
933 *outArrayLen = i;
934 *outArray = fdArray;
935 return SOFTBUS_OK;
936 }
937
CloseInvalidListenForcely(SoftbusListenerNode * node,int32_t listenFd,const char * anomizedIp,int32_t reason)938 static void CloseInvalidListenForcely(SoftbusListenerNode *node, int32_t listenFd, const char *anomizedIp,
939 int32_t reason)
940 {
941 CONN_CHECK_AND_RETURN_LOGE(
942 SoftBusMutexLock(&node->lock) == SOFTBUS_OK, CONN_COMMON, "lock failed, module=%{public}d",
943 node->module);
944 do {
945 if (node->info.status != LISTENER_RUNNING || node->info.modeType != SERVER_MODE ||
946 node->info.listenFd != listenFd) {
947 break;
948 }
949 CONN_LOGW(CONN_COMMON, "forcely close to prevent repeat wakeup select, module=%{public}d, "
950 "listenFd=%{public}d, port=%{public}d, ip=%{public}s, error=%{public}d",
951 node->module, node->info.listenFd, node->info.listenPort, anomizedIp, reason);
952 ConnCloseSocket(node->info.listenFd);
953 node->info.listenFd = -1;
954 node->info.listenPort = -1;
955 } while (false);
956 SoftBusMutexUnlock(&node->lock);
957 }
958
ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,int32_t wakeupTrace)959 static void ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet,
960 SoftBusFdSet *exceptSet, int32_t wakeupTrace)
961 {
962 CONN_CHECK_AND_RETURN_LOGE(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, CONN_COMMON,
963 "lock failed, wakeupTrace=%{public}d, module=%{public}d", wakeupTrace, node->module);
964 if (node->info.status != LISTENER_RUNNING) {
965 SoftBusMutexUnlock(&node->lock);
966 return;
967 }
968
969 int32_t listenFd = -1;
970 int32_t listenPort = -1;
971 char animizedIp[IP_LEN] = { 0 };
972 if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
973 listenFd = node->info.listenFd;
974 listenPort = node->info.listenPort;
975 ConvertAnonymizeIpAddress(animizedIp, IP_LEN, node->info.listenerInfo.socketOption.addr, IP_LEN);
976 }
977 const SocketInterface *socketIf = node->socketIf;
978 SoftbusBaseListener listener = node->listener;
979 ConnectType connectType = node->info.listenerInfo.type;
980
981 FdNode *fdArray = NULL;
982 uint32_t fdArrayLen = 0;
983 int32_t status = CopyWaitEventFdsUnsafe(node, &fdArray, &fdArrayLen);
984 SoftBusMutexUnlock(&node->lock);
985 if (status != SOFTBUS_OK) {
986 CONN_LOGE(CONN_COMMON,
987 "copy wait event fds failed, wakeupTrace=%{public}d, module=%{public}d, error=%{public}d",
988 wakeupTrace, node->module, status);
989 return;
990 }
991
992 if (listenFd > 0 && SoftBusSocketFdIsset(listenFd, readSet)) {
993 status =
994 ProcessSpecifiedServerAcceptEvent(node->module, listenFd, connectType, socketIf, &listener, wakeupTrace);
995 switch (status) {
996 case SOFTBUS_OK:
997 case SOFTBUS_ADAPTER_SOCKET_EAGAIN:
998 break;
999 case SOFTBUS_ADAPTER_SOCKET_EINVAL:
1000 case SOFTBUS_ADAPTER_SOCKET_EBADF:
1001 CloseInvalidListenForcely(node, listenFd, animizedIp, status);
1002 break;
1003 default:
1004 CONN_LOGD(CONN_COMMON,
1005 "accept client failed, wakeupTrace=%{public}d, module=%{public}d, listenFd=%{public}d, "
1006 "port=%{public}d, ip=%{public}s, error=%{public}d",
1007 wakeupTrace, node->module, listenFd, listenPort, animizedIp, status);
1008 break;
1009 }
1010 }
1011
1012 if (fdArrayLen == 0) {
1013 return;
1014 }
1015
1016 for (uint32_t i = 0; i < fdArrayLen; i++) {
1017 if ((fdArray[i].triggerSet & READ_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, readSet)) {
1018 CONN_LOGD(CONN_COMMON, "trigger IN event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1019 "triggerSet=%{public}u",
1020 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1021 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_IN, &listener, wakeupTrace);
1022 }
1023 if ((fdArray[i].triggerSet & WRITE_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, writeSet)) {
1024 CONN_LOGD(CONN_COMMON, "trigger OUT event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1025 "triggerSet=%{public}u",
1026 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1027 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_OUT, &listener, wakeupTrace);
1028 }
1029 if ((fdArray[i].triggerSet & EXCEPT_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, exceptSet)) {
1030 CONN_LOGW(CONN_COMMON,
1031 "trigger EXCEPTION(out-of-band data) event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1032 "triggerSet=%{public}u", wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1033 DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_EXCEPTION, &listener, wakeupTrace);
1034 }
1035 // RW_TRIGGER is already triggered in READ_TRIGGER and WRITE_TRIGGER, just skip it
1036 }
1037 SoftBusFree(fdArray);
1038 }
1039
ProcessEvent(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,const SelectThreadState * selectState,int32_t wakeupTrace)1040 static void ProcessEvent(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet,
1041 const SelectThreadState *selectState, int32_t wakeupTrace)
1042 {
1043 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1044 SoftbusListenerNode *node = GetListenerNode(module);
1045 if (node == NULL) {
1046 continue;
1047 }
1048 ProcessSpecifiedListenerNodeEvent(node, readSet, writeSet, exceptSet, wakeupTrace);
1049 ReturnListenerNode(&node);
1050 }
1051
1052 if (SoftBusSocketFdIsset(selectState->ctrlRfd, readSet)) {
1053 ProcessCtrlFdEvent(selectState->ctrlRfd, wakeupTrace);
1054 }
1055 }
1056
CollectSpecifiedModuleListenerEvents(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1057 static int32_t CollectSpecifiedModuleListenerEvents(
1058 SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1059 {
1060 CONN_CHECK_AND_RETURN_RET_LOGE(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON,
1061 "lock failed, module=%{public}d", node->module);
1062
1063 if (node->info.status != LISTENER_RUNNING) {
1064 (void)SoftBusMutexUnlock(&node->lock);
1065 return 0;
1066 }
1067
1068 int32_t maxFd = 0;
1069 if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
1070 SoftBusSocketFdSet(node->info.listenFd, readSet);
1071 maxFd = node->info.listenFd;
1072 }
1073
1074 FdNode *it = NULL;
1075 LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
1076 if ((it->triggerSet & READ_TRIGGER) != 0) {
1077 SoftBusSocketFdSet(it->fd, readSet);
1078 }
1079 if ((it->triggerSet & WRITE_TRIGGER) != 0) {
1080 SoftBusSocketFdSet(it->fd, writeSet);
1081 }
1082 if ((it->triggerSet & EXCEPT_TRIGGER) != 0) {
1083 SoftBusSocketFdSet(it->fd, exceptSet);
1084 }
1085 // RW_TRIGGER is already collected in READ_TRIGGER and WRITE_TRIGGER, just skip it
1086 maxFd = it->fd > maxFd ? it->fd : maxFd;
1087 }
1088 (void)SoftBusMutexUnlock(&node->lock);
1089 return maxFd;
1090 }
1091
CollectWaitEventFdSet(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1092 static int32_t CollectWaitEventFdSet(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1093 {
1094 int32_t maxFd = 0;
1095 int32_t statusOrFd = 0;
1096 int32_t status = SOFTBUS_OK;
1097 do {
1098 for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1099 SoftbusListenerNode *node = GetListenerNode(module);
1100 if (node == NULL) {
1101 continue;
1102 }
1103 statusOrFd = CollectSpecifiedModuleListenerEvents(node, readSet, writeSet, exceptSet);
1104 ReturnListenerNode(&node);
1105 if (statusOrFd < 0) {
1106 status = statusOrFd;
1107 CONN_LOGE(CONN_COMMON, "collect wait event fd set failed: module=%{public}d, error=%{public}d",
1108 module, status);
1109 break;
1110 }
1111 maxFd = statusOrFd > maxFd ? statusOrFd : maxFd;
1112 }
1113 } while (false);
1114
1115 if (status != SOFTBUS_OK) {
1116 SoftBusSocketFdZero(readSet);
1117 SoftBusSocketFdZero(writeSet);
1118 SoftBusSocketFdZero(exceptSet);
1119 return status;
1120 }
1121
1122 return maxFd;
1123 }
1124
SelectTask(void * arg)1125 static void *SelectTask(void *arg)
1126 {
1127 static int32_t wakeupTraceIdGenerator = 0;
1128
1129 CONN_CHECK_AND_RETURN_RET_LOGW(arg != NULL, NULL, CONN_COMMON, "invalid param");
1130 SelectThreadState *selectState = (SelectThreadState *)arg;
1131 while (true) {
1132 int status = SoftBusMutexLock(&selectState->lock);
1133 if (status != SOFTBUS_OK) {
1134 CONN_LOGE(CONN_COMMON, "lock failed, retry after some times. "
1135 "waitDelay=%{public}dms, selectTrace=%{public}d, error=%{public}d",
1136 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, status);
1137 SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1138 continue;
1139 }
1140 int32_t referenceCount = selectState->referenceCount;
1141 (void)SoftBusMutexUnlock(&selectState->lock);
1142
1143 if (referenceCount <= 0) {
1144 CONN_LOGW(CONN_COMMON, "select task, select task is not reference by others any more, exit... "
1145 "selectTrace=%{public}d", selectState->traceId);
1146 break;
1147 }
1148
1149 SoftBusFdSet readSet;
1150 SoftBusFdSet writeSet;
1151 SoftBusFdSet exceptSet;
1152 SoftBusSocketFdZero(&readSet);
1153 SoftBusSocketFdZero(&writeSet);
1154 SoftBusSocketFdZero(&exceptSet);
1155 int32_t maxFdOrStatus = CollectWaitEventFdSet(&readSet, &writeSet, &exceptSet);
1156 if (maxFdOrStatus < 0) {
1157 CONN_LOGE(CONN_COMMON, "collect wait event fd set failed, retry after some times. "
1158 "waitDelay=%{public}dms, selectTrace=%{public}d, error=%{public}d",
1159 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, maxFdOrStatus);
1160 SoftBusSocketFdZero(&readSet);
1161 SoftBusSocketFdZero(&writeSet);
1162 SoftBusSocketFdZero(&exceptSet);
1163 SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1164 continue;
1165 }
1166 SoftBusSocketFdSet(selectState->ctrlRfd, &readSet);
1167 SoftBusSockTimeOut timeout = {0};
1168 timeout.sec = SOFTBUS_LISTENER_SELECT_TIMEOUT_SEC;
1169 int32_t maxFd = maxFdOrStatus > selectState->ctrlRfd ? maxFdOrStatus : selectState->ctrlRfd;
1170 CONN_LOGI(CONN_COMMON, "select is waking up, maxFd=%{public}d, ctrlRfd=%{public}d",
1171 maxFd, selectState->ctrlRfd);
1172 int32_t nEvents = SoftBusSocketSelect(maxFd + 1, &readSet, &writeSet, &exceptSet, &timeout);
1173 int32_t wakeupTraceId = ++wakeupTraceIdGenerator;
1174 if (nEvents == 0) {
1175 continue;
1176 }
1177 if (nEvents < 0) {
1178 CONN_LOGE(CONN_COMMON, "unexpect wakeup, retry after some times. "
1179 "waitDelay=%{public}dms, wakeupTraceId=%{public}d, events=%{public}d",
1180 SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS, wakeupTraceId, nEvents);
1181 SoftBusSleepMs(SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS);
1182 continue;
1183 }
1184 CONN_LOGI(CONN_COMMON, "select task, wakeup from select, selectTrace=%{public}d, wakeupTraceId=%{public}d, "
1185 "events=%{public}d", selectState->traceId, wakeupTraceId, nEvents);
1186 ProcessEvent(&readSet, &writeSet, &exceptSet, selectState, wakeupTraceId);
1187 }
1188 CleanupSelectThreadState(&selectState);
1189 return NULL;
1190 }
1191
StartSelectThread(void)1192 static int32_t StartSelectThread(void)
1193 {
1194 static int32_t selectThreadTraceIdGenerator = 1;
1195
1196 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1197 CONN_CHECK_AND_RETURN_RET_LOGE(
1198 status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON, "lock global select thread state failed");
1199
1200 do {
1201 if (g_selectThreadState != NULL) {
1202 status = SoftBusMutexLock(&g_selectThreadState->lock);
1203 if (status != SOFTBUS_OK) {
1204 CONN_LOGE(CONN_COMMON, "lock select thread state self failed, error=%{public}d", status);
1205 status = SOFTBUS_LOCK_ERR;
1206 break;
1207 }
1208 int32_t referenceCount = ++g_selectThreadState->referenceCount;
1209 (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1210 WakeupSelectThread();
1211
1212 CONN_LOGD(CONN_COMMON,
1213 "select thread is already start, selectTrace=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d, "
1214 "referenceCount=%{public}d",
1215 g_selectThreadState->traceId, g_selectThreadState->ctrlRfd, g_selectThreadState->ctrlWfd,
1216 referenceCount);
1217 break;
1218 }
1219
1220 SelectThreadState *state = SoftBusCalloc(sizeof(SelectThreadState));
1221 if (state == NULL) {
1222 status = SOFTBUS_MALLOC_ERR;
1223 break;
1224 }
1225 state->traceId = ++selectThreadTraceIdGenerator;
1226 int32_t fds[2] = { 0 };
1227 int32_t rc = 0;
1228 #ifndef __LITEOS__
1229 rc = pipe2(fds, O_CLOEXEC | O_NONBLOCK);
1230 #endif
1231 if (rc != 0) {
1232 CONN_LOGE(CONN_COMMON, "create ctrl pipe failed, rc=%{public}d, error=%{public}d(%{public}s)",
1233 rc, errno, strerror(errno));
1234 SoftBusFree(state);
1235 status = SOFTBUS_INVALID_NUM;
1236 break;
1237 }
1238 state->ctrlRfd = fds[0];
1239 state->ctrlWfd = fds[1];
1240
1241 status = SoftBusMutexInit(&state->lock, NULL);
1242 if (status != SOFTBUS_OK) {
1243 CONN_LOGE(CONN_COMMON, "start select task async failed, error=%{public}d", status);
1244 CleanupSelectThreadState(&state);
1245 break;
1246 }
1247 state->referenceCount = 1;
1248 status = ConnStartActionAsync(state, SelectTask, "Select_Tsk");
1249 if (status != SOFTBUS_OK) {
1250 CONN_LOGE(CONN_COMMON, "init lock failed, error=%{public}d", status);
1251 CleanupSelectThreadState(&state);
1252 break;
1253 }
1254 CONN_LOGI(CONN_COMMON,
1255 "start select thread success, traceId=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d",
1256 state->traceId, state->ctrlRfd, state->ctrlWfd);
1257 g_selectThreadState = state;
1258 } while (false);
1259 (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1260 return status;
1261 }
1262
StopSelectThread(void)1263 static int32_t StopSelectThread(void)
1264 {
1265 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1266 CONN_CHECK_AND_RETURN_RET_LOGE(
1267 status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON, "lock global select thread state failed");
1268 do {
1269 if (g_selectThreadState == NULL) {
1270 CONN_LOGW(CONN_COMMON, "select thread is already stop or never start");
1271 break;
1272 }
1273
1274 status = SoftBusMutexLock(&g_selectThreadState->lock);
1275 if (status != SOFTBUS_OK) {
1276 CONN_LOGE(CONN_COMMON, "lock select thread state self");
1277 break;
1278 }
1279 g_selectThreadState->referenceCount -= 1;
1280 int32_t referenceCount = g_selectThreadState->referenceCount;
1281 (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1282 if (referenceCount <= 0) {
1283 CONN_LOGW(CONN_COMMON, "select thread is not used by other module any more, notify "
1284 "exit, thread reference count=%{public}d", referenceCount);
1285 WakeupSelectThread();
1286 g_selectThreadState = NULL;
1287 }
1288 } while (false);
1289 (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1290 return status;
1291 }
1292
WakeupSelectThread(void)1293 static void WakeupSelectThread(void)
1294 {
1295 #ifndef __LITEOS__
1296 static int32_t selectWakeupTraceIdGenerator = 0;
1297
1298 int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1299 CONN_CHECK_AND_RETURN_LOGE(
1300 status == SOFTBUS_OK, CONN_COMMON, "lock global select thread state failed, error=%{public}d", status);
1301 do {
1302 if (g_selectThreadState == NULL) {
1303 CONN_LOGW(CONN_COMMON, "select thread is not running, just skip");
1304 break;
1305 }
1306 int32_t ctrlTraceId = selectWakeupTraceIdGenerator++;
1307 ssize_t len = write(g_selectThreadState->ctrlWfd, &ctrlTraceId, sizeof(ctrlTraceId));
1308 if (len == -1) {
1309 COMM_LOGE(COMM_ADAPTER, "write message fail, len=%{public}zd, ctrlTraceId=%{public}d, "
1310 "errno=%{public}d(%{public}s)", len, ctrlTraceId, errno, strerror(errno));
1311 break;
1312 }
1313 CONN_LOGI(CONN_COMMON, "wakeup ctrl message sent, writeLength=%{public}zd, ctrlTraceId=%{public}d",
1314 len, ctrlTraceId);
1315 } while (false);
1316 SoftBusMutexUnlock(&g_selectThreadStateLock);
1317 #endif
1318 }