1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_base_listener.h"
17 
18 #include <fcntl.h>
19 #include <securec.h>
20 #include <stdatomic.h>
21 #include <unistd.h>
22 
23 #include "common_list.h"
24 #include "conn_event.h"
25 #include "conn_log.h"
26 #include "softbus_adapter_errcode.h"
27 #include "softbus_adapter_mem.h"
28 #include "softbus_adapter_socket.h"
29 #include "softbus_conn_common.h"
30 #include "softbus_conn_interface.h"
31 #include "softbus_def.h"
32 #include "softbus_errcode.h"
33 #include "softbus_feature_config.h"
34 #include "softbus_socket.h"
35 #include "softbus_utils.h"
36 
37 #define MAX_LISTEN_EVENTS 1024
38 #define DEFAULT_BACKLOG   4
39 #define FDARR_EXPAND_BASE 2
40 #define SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS (3 * 1000)
41 #define SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS (3 * 10) /* wait retry time for an abnotmal event by select*/
42 #define SOFTBUS_LISTENER_SELECT_TIMEOUT_SEC (6 * 60 * 60)
43 
44 enum BaseListenerStatus {
45     LISTENER_IDLE = 0,
46     LISTENER_RUNNING,
47 };
48 
49 typedef struct {
50     ListNode node;
51     int32_t fd;
52     uint32_t triggerSet;
53 } FdNode;
54 
55 typedef struct {
56     ListNode waitEventFds;
57     uint32_t waitEventFdsLen;
58 
59     ModeType modeType;
60     LocalListenerInfo listenerInfo;
61     int32_t listenFd;
62     int32_t listenPort;
63 
64     enum BaseListenerStatus status;
65 } SoftbusBaseListenerInfo;
66 
67 typedef struct {
68     ListenerModule module;
69     SoftBusMutex lock;
70     SoftbusBaseListener listener;
71     const SocketInterface *socketIf;
72     SoftbusBaseListenerInfo info;
73     int32_t objectRc;
74 } SoftbusListenerNode;
75 
76 typedef struct {
77     uint32_t traceId;
78     // pipe fds, to wakeup select thread in time
79     int32_t ctrlRfd;
80     int32_t ctrlWfd;
81 
82     SoftBusMutex lock;
83     int32_t referenceCount;
84 } SelectThreadState;
85 
86 static int32_t ShutdownBaseListener(SoftbusListenerNode *node);
87 static int32_t StartSelectThread(void);
88 static int32_t StopSelectThread(void);
89 static void WakeupSelectThread(void);
90 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module);
91 
92 static SoftBusMutex g_listenerListLock = { 0 };
93 static SoftbusListenerNode *g_listenerList[UNUSE_BUTT] = { 0 };
94 static SoftBusMutex g_selectThreadStateLock = { 0 };
95 static SelectThreadState *g_selectThreadState = NULL;
96 static _Atomic bool g_initBaseListener = false;
97 
GetListenerNodeCommon(ListenerModule module,bool create)98 static SoftbusListenerNode *GetListenerNodeCommon(ListenerModule module, bool create)
99 {
100     int32_t status = SoftBusMutexLock(&g_listenerListLock);
101     CONN_CHECK_AND_RETURN_RET_LOGE(
102         status == SOFTBUS_OK, NULL, CONN_COMMON, "lock failed, module=%{public}d, error=%{public}d",
103         module, status);
104     SoftbusListenerNode *node = g_listenerList[module];
105     do {
106         if (node == NULL) {
107             if (create) {
108                 node = CreateSpecifiedListenerModule(module);
109             }
110             if (node == NULL) {
111                 break;
112             }
113             g_listenerList[module] = node;
114         }
115         status = SoftBusMutexLock(&node->lock);
116         if (status != SOFTBUS_OK) {
117             CONN_LOGE(CONN_COMMON, "lock listener failed, module=%{public}d, error=%{public}d", module, status);
118             node = NULL;
119             break;
120         }
121         node->objectRc += 1;
122         SoftBusMutexUnlock(&node->lock);
123     } while (false);
124     (void)SoftBusMutexUnlock(&g_listenerListLock);
125     return node;
126 }
127 
GetListenerNode(ListenerModule module)128 static SoftbusListenerNode *GetListenerNode(ListenerModule module)
129 {
130     return GetListenerNodeCommon(module, false);
131 }
132 
GetOrCreateListenerNode(ListenerModule module)133 static SoftbusListenerNode *GetOrCreateListenerNode(ListenerModule module)
134 {
135     return GetListenerNodeCommon(module, true);
136 }
137 
RemoveListenerNode(SoftbusListenerNode * node)138 static void RemoveListenerNode(SoftbusListenerNode *node)
139 {
140     int32_t status = SoftBusMutexLock(&g_listenerListLock);
141     CONN_CHECK_AND_RETURN_LOGE(
142         status == SOFTBUS_OK, CONN_COMMON, "lock listener lists failed, module=%{public}d, error=%{public}d",
143         node->module, status);
144     do {
145         if (g_listenerList[node->module] != node) {
146             CONN_LOGW(CONN_COMMON, "listener node is not in listener list, just skip, module=%{public}d",
147                 node->module);
148             break;
149         }
150         status = SoftBusMutexLock(&node->lock);
151         if (status != SOFTBUS_OK) {
152             CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d", node->module);
153             break;
154         }
155         // decrease root object reference
156         node->objectRc -= 1;
157         g_listenerList[node->module] = NULL;
158         (void)SoftBusMutexUnlock(&node->lock);
159     } while (false);
160     (void)SoftBusMutexUnlock(&g_listenerListLock);
161 }
162 
ReturnListenerNode(SoftbusListenerNode ** nodePtr)163 static void ReturnListenerNode(SoftbusListenerNode **nodePtr)
164 {
165     SoftbusListenerNode *node = *nodePtr;
166     do {
167         int32_t status = SoftBusMutexLock(&node->lock);
168         if (status != SOFTBUS_OK) {
169             CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d", node->module);
170             break;
171         }
172         node->objectRc -= 1;
173         int32_t objectRc = node->objectRc;
174         (void)SoftBusMutexUnlock(&node->lock);
175 
176         if (objectRc > 0) {
177             break;
178         }
179         CONN_LOGI(CONN_COMMON, "object reference count <= 0, free listener node, module=%{public}d, "
180                                "objectReference=%{public}d", node->module, objectRc);
181         (void)ShutdownBaseListener(node);
182         SoftBusMutexDestroy(&node->lock);
183         SoftBusFree(node);
184     } while (false);
185 
186     *nodePtr = NULL;
187 }
188 
CreateSpecifiedListenerModule(ListenerModule module)189 static SoftbusListenerNode *CreateSpecifiedListenerModule(ListenerModule module)
190 {
191     SoftbusListenerNode *node = (SoftbusListenerNode *)SoftBusCalloc(sizeof(SoftbusListenerNode));
192     CONN_CHECK_AND_RETURN_RET_LOGE(
193         node != NULL, NULL, CONN_COMMON, "calloc failed, module=%{public}d", module);
194 
195     node->module = module;
196     // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
197     int32_t status = SoftBusMutexInit(&node->lock, NULL);
198     if (status != SOFTBUS_OK) {
199         CONN_LOGE(CONN_COMMON, "init lock failed, module=%{public}d, error=%{public}d", module, status);
200         SoftBusFree(node);
201         return NULL;
202     }
203     node->listener.onConnectEvent = NULL;
204     node->listener.onDataEvent = NULL;
205 
206     node->socketIf = NULL;
207 
208     ListInit(&node->info.waitEventFds);
209     node->info.waitEventFdsLen = 0;
210     node->info.modeType = UNSET_MODE;
211     (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
212     node->info.listenFd = -1;
213     node->info.listenPort = -1;
214     // set root object reference count 1
215     node->objectRc = 1;
216     return node;
217 }
218 
InitBaseListener(void)219 int32_t InitBaseListener(void)
220 {
221     if (atomic_load_explicit(&g_initBaseListener, memory_order_acquire)) {
222         return SOFTBUS_OK;
223     }
224     // flag : if the client and server are in the same process, this function can be executed only once.
225     static bool flag = false;
226     if (flag) {
227         return SOFTBUS_OK;
228     }
229     flag = true;
230 
231     // stop select thread need re-enter lock
232     SoftBusMutexAttr attr = {
233         .type = SOFTBUS_MUTEX_RECURSIVE,
234     };
235     int32_t status = SoftBusMutexInit(&g_selectThreadStateLock, &attr);
236     if (status != SOFTBUS_OK) {
237         CONN_LOGE(CONN_INIT, "init select thread lock failed, error=%{public}d", status);
238         return SOFTBUS_LOCK_ERR;
239     }
240     // NOT apply recursive lock on purpose, problem will be exposes quickly if exist
241     status = SoftBusMutexInit(&g_listenerListLock, NULL);
242     if (status != SOFTBUS_OK) {
243         SoftBusMutexDestroy(&g_selectThreadStateLock);
244         CONN_LOGE(CONN_INIT, "init listener list lock failed, error=%{public}d", status);
245         return SOFTBUS_LOCK_ERR;
246     }
247 
248     status = SoftBusMutexLock(&g_listenerListLock);
249     if (status != SOFTBUS_OK) {
250         CONN_LOGE(CONN_INIT, "lock listener list failed, error=%{public}d", status);
251         SoftBusMutexDestroy(&g_selectThreadStateLock);
252         SoftBusMutexDestroy(&g_listenerListLock);
253         return SOFTBUS_LOCK_ERR;
254     }
255     (void)memset_s(g_listenerList, sizeof(g_listenerList), 0, sizeof(g_listenerList));
256     (void)SoftBusMutexUnlock(&g_listenerListLock);
257     if (status != SOFTBUS_OK) {
258         CONN_LOGE(CONN_INIT, "create static module listener failed, error=%{public}d", status);
259         SoftBusMutexDestroy(&g_selectThreadStateLock);
260         SoftBusMutexDestroy(&g_listenerListLock);
261         return status;
262     }
263     atomic_store_explicit(&g_initBaseListener, true, memory_order_release);
264     return SOFTBUS_OK;
265 }
266 
DeinitBaseListener(void)267 void DeinitBaseListener(void)
268 {
269     if (!atomic_load_explicit(&g_initBaseListener, memory_order_acquire)) {
270         return;
271     }
272 
273     for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
274         SoftbusListenerNode *node = GetListenerNode(module);
275         if (node == NULL) {
276             continue;
277         }
278         RemoveListenerNode(node);
279         ReturnListenerNode(&node);
280     }
281     atomic_store_explicit(&g_initBaseListener, false, memory_order_release);
282 }
283 
CreateListenerModule(void)284 uint32_t CreateListenerModule(void)
285 {
286     int32_t status = SoftBusMutexLock(&g_listenerListLock);
287     CONN_CHECK_AND_RETURN_RET_LOGE(
288         status == SOFTBUS_OK, UNUSE_BUTT, CONN_COMMON, "lock failed, error=%{public}d", status);
289 
290     ListenerModule module = LISTENER_MODULE_DYNAMIC_START;
291     for (; module <= LISTENER_MODULE_DYNAMIC_END; module++) {
292         if (g_listenerList[module] != NULL) {
293             continue;
294         }
295         SoftbusListenerNode *node = CreateSpecifiedListenerModule(module);
296         if (node == NULL) {
297             CONN_LOGE(CONN_COMMON, "create specified listener module failed, module=%{public}d", module);
298             module = UNUSE_BUTT;
299         } else {
300             CONN_LOGI(CONN_COMMON, "create listener module success, module=%{public}d", module);
301             g_listenerList[module] = node;
302         }
303         break;
304     }
305     (void)SoftBusMutexUnlock(&g_listenerListLock);
306     return module;
307 }
308 
DestroyBaseListener(ListenerModule module)309 void DestroyBaseListener(ListenerModule module)
310 {
311     CONN_CHECK_AND_RETURN_LOGW(module >= LISTENER_MODULE_DYNAMIC_START && module <= LISTENER_MODULE_DYNAMIC_END,
312         CONN_COMMON, "only dynamic module support destroy, module=%{public}d", module);
313 
314     CONN_LOGI(CONN_COMMON, "receive request, module=%{public}d", module);
315     SoftbusListenerNode *node = GetListenerNode(module);
316     if (node == NULL) {
317         CONN_LOGW(CONN_COMMON, "listener not exist, module=%{public}d", module);
318         return;
319     }
320     RemoveListenerNode(node);
321     ReturnListenerNode(&node);
322 }
323 
StartBaseClient(ListenerModule module,const SoftbusBaseListener * listener)324 int32_t StartBaseClient(ListenerModule module, const SoftbusBaseListener *listener)
325 {
326     CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
327         "invalid module, module=%{public}d", module);
328     CONN_CHECK_AND_RETURN_RET_LOGW(listener != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
329         "listener is null, module=%{public}d", module);
330     CONN_CHECK_AND_RETURN_RET_LOGW(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
331         "listener onConnectEvent is null, module=%{public}d", module);
332     CONN_CHECK_AND_RETURN_RET_LOGW(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
333         "listener onDataEvent is null, module=%{public}d", module);
334 
335     SoftbusListenerNode *node = GetOrCreateListenerNode(module);
336     CONN_CHECK_AND_RETURN_RET_LOGW(
337         node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "get listener node failed, module=%{public}d", module);
338 
339     int32_t status = SoftBusMutexLock(&node->lock);
340     if (status != SOFTBUS_OK) {
341         CONN_LOGE(CONN_COMMON, "lock listener node failed, module=%{public}d, error=%{public}d", module, status);
342         ReturnListenerNode(&node);
343         return SOFTBUS_LOCK_ERR;
344     }
345     do {
346         if (node->info.status != LISTENER_IDLE) {
347             CONN_LOGE(CONN_COMMON, "listener is not idle status, module=%{public}d, status=%{public}d",
348                 module, node->info.status);
349             status = SOFTBUS_CONN_LISTENER_NOT_IDLE;
350             break;
351         }
352         node->listener.onConnectEvent = listener->onConnectEvent;
353         node->listener.onDataEvent = listener->onDataEvent;
354         status = StartSelectThread();
355         if (status != SOFTBUS_OK) {
356             CONN_LOGE(CONN_COMMON, "start select thread failed, module=%{public}d, "
357                 "status=%{public}d", module, status);
358             break;
359         }
360         node->info.status = LISTENER_RUNNING;
361         CONN_LOGI(CONN_COMMON, "start base client listener success, module=%{public}d", module);
362     } while (false);
363     (void)SoftBusMutexUnlock(&node->lock);
364     ReturnListenerNode(&node);
365     return status;
366 }
367 
StartServerListenUnsafe(SoftbusListenerNode * node,const LocalListenerInfo * info)368 static int32_t StartServerListenUnsafe(SoftbusListenerNode *node, const LocalListenerInfo *info)
369 {
370     ListenerModule module = node->module;
371     ProtocolType protocol = info->socketOption.protocol;
372     const SocketInterface *socketIf = GetSocketInterface(protocol);
373     if (socketIf == NULL) {
374         CONN_LOGE(CONN_COMMON, "not find protocal implement, module=%{public}d, protocal=%{public}d", module, protocol);
375         return SOFTBUS_NOT_FIND;
376     }
377     node->socketIf = socketIf;
378 
379     int32_t listenFd = -1;
380     int32_t listenPort = -1;
381     int32_t status = SOFTBUS_OK;
382     do {
383         listenFd = socketIf->OpenServerSocket(info);
384         if (listenFd < 0) {
385             CONN_LOGE(CONN_COMMON, "create server socket failed: module=%{public}d, listenFd=%{public}d",
386                 module, listenFd);
387             status = listenFd;
388             break;
389         }
390         status = SoftBusSocketListen(listenFd, DEFAULT_BACKLOG);
391         if (status != SOFTBUS_OK) {
392             CONN_LOGE(CONN_COMMON, "listen server socket failed: module=%{public}d, error=%{public}d", module, status);
393             break;
394         }
395         listenPort = socketIf->GetSockPort(listenFd);
396         if (listenPort < 0) {
397             CONN_LOGE(CONN_COMMON, "get listen server port failed: module=%{public}d, listenFd=%{public}d, "
398                                    "error=%{public}d", module, listenFd, status);
399             status = SOFTBUS_TCP_SOCKET_ERR;
400             break;
401         }
402         if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
403             CONN_LOGE(CONN_COMMON, "memcpy_s local listener info object failed: module=%{public}d", module);
404             status = SOFTBUS_MEM_ERR;
405             break;
406         }
407         node->info.modeType = SERVER_MODE;
408         node->info.listenFd = listenFd;
409         node->info.listenPort = listenPort;
410     } while (false);
411     if (status != SOFTBUS_OK && listenFd > 0) {
412         ConnShutdownSocket(listenFd);
413     }
414     return status == SOFTBUS_OK ? listenPort : status;
415 }
416 
CleanupServerListenInfoUnsafe(SoftbusListenerNode * node)417 static void CleanupServerListenInfoUnsafe(SoftbusListenerNode *node)
418 {
419     memset_s(&node->info.listenerInfo, sizeof(SoftbusBaseListenerInfo), 0, sizeof(SoftbusBaseListenerInfo));
420     if (node->info.listenFd > 0) {
421         ConnShutdownSocket(node->info.listenFd);
422     }
423     node->info.listenFd = -1;
424     node->info.listenPort = -1;
425 }
426 
FillConnEventExtra(const LocalListenerInfo * info,ConnEventExtra * extra,int32_t err)427 static void FillConnEventExtra(const LocalListenerInfo *info, ConnEventExtra *extra, int32_t err)
428 {
429     if (info == NULL || extra == NULL) {
430         return;
431     }
432     extra->errcode = err;
433     extra->result = err == SOFTBUS_OK ? EVENT_STAGE_RESULT_OK : EVENT_STAGE_RESULT_FAILED;
434     extra->linkType = info->type;
435     extra->moduleId = info->socketOption.moduleId;
436     extra->proType = info->socketOption.protocol;
437 }
438 
StartBaseListener(const LocalListenerInfo * info,const SoftbusBaseListener * listener)439 int32_t StartBaseListener(const LocalListenerInfo *info, const SoftbusBaseListener *listener)
440 {
441     ConnEventExtra extra = {
442         .result = 0
443     };
444     CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
445     CONN_CHECK_AND_RETURN_RET_LOGW(info != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON, "info is null");
446     CONN_CHECK_AND_RETURN_RET_LOGW(info->type == CONNECT_TCP || info->type == CONNECT_P2P || info->type == CONNECT_HML,
447         SOFTBUS_INVALID_PARAM, CONN_COMMON, "only CONNECT_TCP, CONNECT_P2P and CONNECT_HML is permitted, "
448         "CONNECT_TCP=%{public}d, CONNECT_P2P=%{public}d, CONNECT_HML=%{public}d, type=%{public}d",
449         CONNECT_TCP, CONNECT_P2P, CONNECT_HML, info->type);
450     CONN_CHECK_AND_RETURN_RET_LOGW(info->socketOption.port >= 0, SOFTBUS_INVALID_PARAM, CONN_COMMON,
451         "port is invalid, port=%{public}d", info->socketOption.port);
452     CONN_CHECK_AND_RETURN_RET_LOGW(info->socketOption.moduleId >= 0 && info->socketOption.moduleId < UNUSE_BUTT,
453         SOFTBUS_INVALID_PARAM, CONN_COMMON, "invalid module, module=%{public}d", info->socketOption.moduleId);
454     CONN_CHECK_AND_RETURN_RET_LOGW(listener != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
455         "listener is null, module=%{public}d", info->socketOption.moduleId);
456     CONN_CHECK_AND_RETURN_RET_LOGW(listener->onConnectEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
457         "listener onConnectEvent is null, module=%{public}d", info->socketOption.moduleId);
458     CONN_CHECK_AND_RETURN_RET_LOGW(listener->onDataEvent != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
459         "listener onDataEvent is null, module=%{public}d", info->socketOption.moduleId);
460 
461     ListenerModule module = info->socketOption.moduleId;
462     SoftbusListenerNode *node = GetOrCreateListenerNode(module);
463     CONN_CHECK_AND_RETURN_RET_LOGW(
464         node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "get listener node failed, module=%{public}d", module);
465     int32_t status = SoftBusMutexLock(&node->lock);
466     if (status != SOFTBUS_OK) {
467         CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, error=%{public}d", module, status);
468         ReturnListenerNode(&node);
469         FillConnEventExtra(info, &extra, SOFTBUS_LOCK_ERR);
470         CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
471         return SOFTBUS_LOCK_ERR;
472     }
473 
474     int32_t listenPort = -1;
475     do {
476         if (node->info.status != LISTENER_IDLE) {
477             CONN_LOGE(CONN_COMMON, "listener is not idle status, module=%{public}d, status=%{public}d",
478                 module, node->info.status);
479             status = SOFTBUS_CONN_LISTENER_NOT_IDLE;
480             break;
481         }
482 
483         node->listener.onConnectEvent = listener->onConnectEvent;
484         node->listener.onDataEvent = listener->onDataEvent;
485         if (memcpy_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), info, sizeof(LocalListenerInfo)) != EOK) {
486             CONN_LOGE(CONN_COMMON, "memcpy_s listener info failed, module=%{public}d", node->module);
487             status = SOFTBUS_LOCK_ERR;
488             break;
489         }
490         listenPort = StartServerListenUnsafe(node, info);
491         if (listenPort <= 0) {
492             CONN_LOGE(CONN_COMMON, "start server failed, module=%{public}d, listenPort=%{public}d",
493                 module, listenPort);
494             status = listenPort;
495             break;
496         }
497 
498         status = StartSelectThread();
499         if (status != SOFTBUS_OK) {
500             CONN_LOGE(CONN_COMMON, "start listener thread failed, module=%{public}d, status=%{public}d",
501                 module, status);
502             CleanupServerListenInfoUnsafe(node);
503             break;
504         }
505         node->info.status = LISTENER_RUNNING;
506         CONN_LOGI(CONN_COMMON, "start base listener success, module=%{public}d, listenFd=%{public}d, "
507                                "listenPort=%{public}d", module, node->info.listenFd, listenPort);
508     } while (false);
509     (void)SoftBusMutexUnlock(&node->lock);
510     ReturnListenerNode(&node);
511     FillConnEventExtra(info, &extra, status);
512     CONN_EVENT(EVENT_SCENE_START_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
513     return status == SOFTBUS_OK ? listenPort : status;
514 }
515 
StopBaseListener(ListenerModule module)516 int32_t StopBaseListener(ListenerModule module)
517 {
518     ConnEventExtra extra = {
519         .moduleId = module,
520         .result = 0
521     };
522     CONN_EVENT(EVENT_SCENE_STOP_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
523     CONN_CHECK_AND_RETURN_RET_LOGW(
524         module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
525         "invalid module, module=%{public}d", module);
526 
527     CONN_LOGI(CONN_COMMON, "receive request, module=%{public}d", module);
528     SoftbusListenerNode *node = GetListenerNode(module);
529     CONN_CHECK_AND_RETURN_RET_LOGW(
530         node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON, "listener node not exist, module=%{public}d", module);
531 
532     int32_t status = ShutdownBaseListener(node);
533     if (status != SOFTBUS_OK) {
534         CONN_LOGE(CONN_COMMON, "stop listen thread failed, module=%{public}d, error=%{public}d", module, status);
535     }
536     ReturnListenerNode(&node);
537     extra.errcode = status;
538     extra.result = status == SOFTBUS_OK ? EVENT_STAGE_RESULT_OK : EVENT_STAGE_RESULT_FAILED;
539     CONN_EVENT(EVENT_SCENE_STOP_BASE_LISTENER, EVENT_STAGE_TCP_COMMON_ONE, extra);
540     return status;
541 }
542 
ShutdownBaseListener(SoftbusListenerNode * node)543 static int32_t ShutdownBaseListener(SoftbusListenerNode *node)
544 {
545     int32_t status = SoftBusMutexLock(&node->lock);
546     CONN_CHECK_AND_RETURN_RET_LOGE(status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON,
547         "lock failed, module=%{public}d, error=%{public}d", node->module, status);
548 
549     do {
550         if (node->info.status != LISTENER_RUNNING) {
551             CONN_LOGW(CONN_COMMON, "listener is not running, just skip, module=%{public}d, error=%{public}d",
552                 node->module, node->info.status);
553             break;
554         }
555         status = StopSelectThread();
556         if (status != SOFTBUS_OK) {
557             CONN_LOGE(CONN_COMMON, "stop select thread failed, module=%{public}d, error=%{public}d",
558                 node->module, status);
559             // fall-through
560         }
561         node->info.status = LISTENER_IDLE;
562 
563         FdNode *it = NULL;
564         FdNode *next = NULL;
565         LIST_FOR_EACH_ENTRY_SAFE(it, next, &node->info.waitEventFds, FdNode, node) {
566             CONN_LOGE(CONN_COMMON, "listener node there is fd not close, module=%{public}d, fd=%{public}d, "
567                                    "triggerSet=%{public}u", node->module, it->fd, it->triggerSet);
568             // not close fd, repeat close will crash process
569             ListDelete(&it->node);
570             SoftBusFree(it);
571         }
572         node->info.waitEventFdsLen = 0;
573 
574         int32_t listenFd = node->info.listenFd;
575         int32_t listenPort = node->info.listenPort;
576         if (node->info.modeType == SERVER_MODE && listenFd > 0) {
577             CONN_LOGE(CONN_COMMON, "close server, module=%{public}d, listenFd=%{public}d, port=%{public}d",
578                 node->module, listenFd, listenPort);
579             ConnCloseSocket(listenFd);
580         }
581         node->info.modeType = UNSET_MODE;
582         node->info.listenFd = -1;
583         node->info.listenPort = -1;
584         (void)memset_s(&node->info.listenerInfo, sizeof(LocalListenerInfo), 0, sizeof(LocalListenerInfo));
585     } while (false);
586 
587     SoftBusMutexUnlock(&node->lock);
588     return status;
589 }
590 
IsValidTriggerType(TriggerType trigger)591 static bool IsValidTriggerType(TriggerType trigger)
592 {
593     switch (trigger) {
594         case READ_TRIGGER:
595         case WRITE_TRIGGER:
596         case EXCEPT_TRIGGER:
597         case RW_TRIGGER:
598             return true;
599         default:
600             return false;
601     }
602 }
603 
IsListenerNodeExist(ListenerModule module)604 bool IsListenerNodeExist(ListenerModule module)
605 {
606     SoftbusListenerNode *node = GetListenerNode(module);
607     bool exist = false;
608     if (node != NULL) {
609         exist = true;
610         ReturnListenerNode(&node);
611     }
612     return exist;
613 }
614 
AddTrigger(ListenerModule module,int32_t fd,TriggerType trigger)615 int32_t AddTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
616 {
617     CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
618         "invalid module, module=%{public}d", module);
619     CONN_CHECK_AND_RETURN_RET_LOGW(
620         fd > 0, SOFTBUS_INVALID_PARAM, CONN_COMMON, "invalid fd, module=%{public}d, fd=%{public}d", module, fd);
621     CONN_CHECK_AND_RETURN_RET_LOGW(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM, CONN_COMMON,
622         "invalid trigger, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
623 
624     SoftbusListenerNode *node = GetListenerNode(module);
625     CONN_CHECK_AND_RETURN_RET_LOGW(node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON,
626         "listener node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
627 
628     int32_t status = SoftBusMutexLock(&node->lock);
629     if (status != SOFTBUS_OK) {
630         CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
631                                "error=%{public}d", module, fd, trigger, status);
632         ReturnListenerNode(&node);
633         return SOFTBUS_LOCK_ERR;
634     }
635 
636     bool wakeup = false;
637     do {
638         if (node->info.status != LISTENER_RUNNING) {
639             CONN_LOGE(CONN_COMMON, "module is not running, module=%{public}d, fd=%{public}d, trigger=%{public}d",
640                 module, fd, trigger);
641             status = SOFTBUS_CONN_FAIL;
642             break;
643         }
644 
645         if (node->info.waitEventFdsLen > MAX_LISTEN_EVENTS) {
646             CONN_LOGE(CONN_COMMON, "can not trigger more, fd=%{public}d > MAX_LISTEN_EVENTS=%{public}d, "
647                 "module=%{public}d, trigger=%{public}d, waitEventFdsLen=%{public}d",
648                 fd, MAX_LISTEN_EVENTS, module, trigger, node->info.waitEventFdsLen);
649             status = SOFTBUS_CONN_FAIL;
650             break;
651         }
652 
653         FdNode *target = NULL;
654         FdNode *it = NULL;
655         LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
656             if (fd == it->fd) {
657                 target = it;
658                 break;
659             }
660         }
661 
662         if (target != NULL) {
663             if ((it->triggerSet & trigger) == trigger) {
664                 CONN_LOGW(CONN_COMMON, "repeat add trigger, just skip, module=%{public}d, fd=%{public}d, "
665                                        "trigger=%{public}d, triggerSet=%{public}u",
666                     module, fd, trigger, it->triggerSet);
667                 break;
668             }
669             it->triggerSet |= trigger;
670             CONN_LOGI(CONN_COMMON, "add trigger success, module=%{public}d, fd=%{public}d, newAddTrigger=%{public}d, "
671                                    "triggerSet=%{public}u", module, fd, trigger, it->triggerSet);
672             wakeup = true;
673             break;
674         }
675 
676         FdNode *fdNode = (FdNode *)SoftBusCalloc(sizeof(FdNode));
677         if (fdNode == NULL) {
678             CONN_LOGE(CONN_COMMON, "calloc failed, module=%{public}d, fd=%{public}d, trigger=%{public}d",
679                 module, fd, trigger);
680             status = SOFTBUS_MALLOC_ERR;
681             break;
682         }
683         ListInit(&fdNode->node);
684         fdNode->fd = fd;
685         fdNode->triggerSet = trigger;
686         ListAdd(&node->info.waitEventFds, &fdNode->node);
687         node->info.waitEventFdsLen += 1;
688         wakeup = true;
689         CONN_LOGI(CONN_COMMON, "add trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d",
690             module, fd, trigger);
691     } while (false);
692 
693     (void)SoftBusMutexUnlock(&node->lock);
694     ReturnListenerNode(&node);
695 
696     if (status == SOFTBUS_OK && wakeup) {
697         WakeupSelectThread();
698     }
699     return status;
700 }
701 
DelTrigger(ListenerModule module,int32_t fd,TriggerType trigger)702 int32_t DelTrigger(ListenerModule module, int32_t fd, TriggerType trigger)
703 {
704     CONN_CHECK_AND_RETURN_RET_LOGW(module >= 0 && module < UNUSE_BUTT, SOFTBUS_INVALID_PARAM, CONN_COMMON,
705         "invalid module, module=%{public}d", module);
706     CONN_CHECK_AND_RETURN_RET_LOGW(fd > 0, SOFTBUS_INVALID_PARAM, CONN_COMMON,
707         "invalid fd, module=%{public}d, fd=%{public}d", module, fd);
708     CONN_CHECK_AND_RETURN_RET_LOGW(IsValidTriggerType(trigger), SOFTBUS_INVALID_PARAM, CONN_COMMON,
709         "invalid trigger, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
710 
711     SoftbusListenerNode *node = GetListenerNode(module);
712     CONN_CHECK_AND_RETURN_RET_LOGW(node != NULL, SOFTBUS_NOT_FIND, CONN_COMMON,
713         "listener node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d", module, fd, trigger);
714 
715     int32_t status = SoftBusMutexLock(&node->lock);
716     if (status != SOFTBUS_OK) {
717         CONN_LOGE(CONN_COMMON, "lock failed, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
718                                "error=%{public}d", module, fd, trigger, status);
719         ReturnListenerNode(&node);
720         return SOFTBUS_LOCK_ERR;
721     }
722 
723     bool wakeup = false;
724     do {
725         FdNode *target = NULL;
726         FdNode *it = NULL;
727         LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
728             if (fd == it->fd) {
729                 target = it;
730                 break;
731             }
732         }
733 
734         if (target == NULL) {
735             CONN_LOGW(CONN_COMMON, "fd node not exist, module=%{public}d, fd=%{public}d, trigger=%{public}d",
736                 module, fd, trigger);
737             // consider delete trigger success,
738             status = SOFTBUS_OK;
739             break;
740         }
741 
742         if ((target->triggerSet & trigger) == 0) {
743             CONN_LOGW(CONN_COMMON,
744                 "without add trigger before, repeat delete trigger or mismatch module. "
745                 "module=%{public}d, fd=%{public}d, wantDeleteTrigger=%{public}d, triggerSet=%{public}u",
746                 module, fd, trigger, it->triggerSet);
747             // consider delete trigger success,
748             status = SOFTBUS_OK;
749             break;
750         }
751 
752         target->triggerSet &= ~trigger;
753         wakeup = true;
754         if (target->triggerSet != 0) {
755             CONN_LOGI(CONN_COMMON, "delete trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d, "
756                                    "triggerSet=%{public}u", module, fd, trigger, target->triggerSet);
757             status = SOFTBUS_OK;
758             break;
759         }
760         CONN_LOGI(
761             CONN_COMMON,
762             "delete trigger success, module=%{public}d, fd=%{public}d, trigger=%{public}d",
763             module, fd, trigger);
764         ListDelete(&target->node);
765         SoftBusFree(target);
766         node->info.waitEventFdsLen -= 1;
767     } while (false);
768 
769     SoftBusMutexUnlock(&node->lock);
770     ReturnListenerNode(&node);
771 
772     if (status == SOFTBUS_OK && wakeup) {
773         WakeupSelectThread();
774     }
775     return status;
776 }
777 
CleanupSelectThreadState(SelectThreadState ** statePtr)778 static void CleanupSelectThreadState(SelectThreadState **statePtr)
779 {
780     SelectThreadState *state = *statePtr;
781     if (state->ctrlRfd != 0) {
782         ConnCloseSocket(state->ctrlRfd);
783     }
784     if (state->ctrlWfd != 0) {
785         ConnCloseSocket(state->ctrlWfd);
786     }
787 
788     CONN_LOGI(CONN_COMMON, "cleanup select thread state, traceId=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d",
789         state->traceId, state->ctrlRfd, state->ctrlWfd);
790     (void)SoftBusMutexDestroy(&state->lock);
791     SoftBusFree(state);
792     *statePtr = NULL;
793 }
794 
ProcessCtrlFdEvent(int32_t fd,int32_t wakeupTrace)795 static void ProcessCtrlFdEvent(int32_t fd, int32_t wakeupTrace)
796 {
797 #ifndef __LITEOS__
798     while (true) {
799         int32_t ctrlTraceId = 0;
800         ssize_t len = read(fd, &ctrlTraceId, sizeof(ctrlTraceId));
801         if (len < 0) {
802             int32_t status = errno;
803             if (status == EINTR) {
804                 continue;
805             } else if (status == EAGAIN) {
806                 break;
807             } else {
808                 CONN_LOGE(
809                     CONN_COMMON, "wakeupTrace=%{public}d, fd=%{public}d, readLen=%{public}zd, error=%{public}d",
810                     wakeupTrace, fd, len, status);
811                 break;
812             }
813         }
814         CONN_LOGI(CONN_COMMON, "wakeup ctrl message received, wakeupTrace=%{public}d, fd=%{public}d, "
815                                "ctrlTraceId=%{public}d, readLength=%{public}zd", wakeupTrace, fd, ctrlTraceId, len);
816     }
817 #endif
818 }
819 
DispatchFdEvent(int32_t fd,ListenerModule module,enum SocketEvent event,const SoftbusBaseListener * listener,int32_t wakeupTrace)820 static void DispatchFdEvent(
821     int32_t fd, ListenerModule module, enum SocketEvent event, const SoftbusBaseListener *listener, int32_t wakeupTrace)
822 {
823     if (listener->onDataEvent != NULL) {
824         listener->onDataEvent(module, event, fd);
825         CONN_LOGI(CONN_COMMON,
826             "wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, event=%{public}d",
827             wakeupTrace, module, fd, event);
828     } else {
829         CONN_LOGE(CONN_COMMON,
830             "listener not registered, to avoid repeat wakeup, close it,"
831             "wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, event=%{public}d",
832             wakeupTrace, module, fd, event);
833         ConnCloseSocket(fd);
834     }
835 }
836 
ProcessSpecifiedServerAcceptEvent(ListenerModule module,int32_t listenFd,ConnectType connectType,const SocketInterface * socketIf,const SoftbusBaseListener * listener,int32_t wakeupTrace)837 static int32_t ProcessSpecifiedServerAcceptEvent(ListenerModule module, int32_t listenFd, ConnectType connectType,
838     const SocketInterface *socketIf, const SoftbusBaseListener *listener, int32_t wakeupTrace)
839 {
840     CONN_CHECK_AND_RETURN_RET_LOGW(socketIf != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
841         "socket interface implement is null, wakeupTrace=%{public}d, module=%{public}d", wakeupTrace, module);
842     CONN_CHECK_AND_RETURN_RET_LOGW(socketIf->AcceptClient != NULL, SOFTBUS_INVALID_PARAM, CONN_COMMON,
843         "socket interface implement not support AcceptClient method, wakeupTrace=%{public}d, module=%{public}d",
844         wakeupTrace, module);
845 
846     int32_t status = SOFTBUS_OK;
847     while (true) {
848         int32_t clientFd = -1;
849         ConnectOption clientAddr = {
850             .type = connectType,
851             .socketOption = {
852                 .addr = { 0 },
853                 .port = 0,
854                 .moduleId = module,
855                 .protocol = 0,
856                 .keepAlive = 0,
857             },
858         };
859         status = SOFTBUS_TEMP_FAILURE_RETRY(socketIf->AcceptClient(listenFd, &clientAddr, &clientFd));
860         if (status != SOFTBUS_OK) {
861             break;
862         }
863 
864         char animizedIp[IP_LEN] = { 0 };
865         ConvertAnonymizeIpAddress(animizedIp, IP_LEN, clientAddr.socketOption.addr, IP_LEN);
866         if (listener->onConnectEvent != NULL) {
867             CONN_LOGI(CONN_COMMON,
868                 "trigger ACCEPT event, wakeupTrace=%{public}d, module=%{public}d, listenFd=%{public}d, "
869                 "clientIp=%{public}s, clientFd=%{public}d", wakeupTrace, module, listenFd, animizedIp, clientFd);
870             listener->onConnectEvent(module, clientFd, &clientAddr);
871         } else {
872             CONN_LOGE(CONN_COMMON,
873                 "event listener not registered, wakeupTrace=%{public}d, module=%{public}d, "
874                 "listenFd=%{public}d, clientIp=%{public}s, clientFd=%{public}d",
875                 wakeupTrace, module, listenFd, animizedIp, clientFd);
876             ConnCloseSocket(clientFd);
877         }
878     }
879     return status;
880 }
881 
CopyWaitEventFdsUnsafe(const SoftbusListenerNode * node,FdNode ** outArray,uint32_t * outArrayLen)882 static int32_t CopyWaitEventFdsUnsafe(const SoftbusListenerNode *node, FdNode **outArray, uint32_t *outArrayLen)
883 {
884     if (node->info.waitEventFdsLen == 0) {
885         *outArray = NULL;
886         outArrayLen = 0;
887         return SOFTBUS_OK;
888     }
889 
890     uint32_t fdArrayLen = node->info.waitEventFdsLen;
891     FdNode *fdArray = (FdNode *)SoftBusCalloc(fdArrayLen * sizeof(FdNode));
892     CONN_CHECK_AND_RETURN_RET_LOGE(fdArray != NULL, SOFTBUS_MALLOC_ERR, CONN_COMMON,
893         "calloc failed, module=%{public}d, eventLen=%{public}u", node->module, fdArrayLen);
894 
895     uint32_t i = 0;
896     FdNode *item = NULL;
897     bool expand = false;
898     LIST_FOR_EACH_ENTRY(item, &node->info.waitEventFds, FdNode, node) {
899         if (i >= fdArrayLen) {
900             uint32_t tmpLen = fdArrayLen * FDARR_EXPAND_BASE;
901             FdNode *tmp = (FdNode *)SoftBusCalloc(tmpLen * sizeof(FdNode));
902             if (tmp == NULL) {
903                 CONN_LOGE(CONN_COMMON, "expand calloc fd node array object failed, module=%{public}d, "
904                                        "eventLen=%{public}u", node->module, tmpLen);
905                 SoftBusFree(fdArray);
906                 return SOFTBUS_MALLOC_ERR;
907             }
908             for (uint32_t j = 0; j < fdArrayLen; j++) {
909                 tmp[j].fd = fdArray[j].fd;
910                 tmp[j].triggerSet = fdArray[j].triggerSet;
911             }
912             SoftBusFree(fdArray);
913             fdArray = tmp;
914             fdArrayLen = tmpLen;
915             expand = true;
916         }
917         fdArray[i].fd = item->fd;
918         fdArray[i].triggerSet = item->triggerSet;
919         i++;
920     }
921 
922     // diagnose by the way
923     if (expand) {
924         CONN_LOGE(CONN_COMMON, "listener node 'waitEventFdsLen' field is unexpected, "
925             "actualWaitEventFdsLen=%{public}u > waitEventFdsLen=%{public}u, module=%{public}d",
926             i, node->info.waitEventFdsLen, node->module);
927     } else if (i != fdArrayLen) {
928         CONN_LOGE(CONN_COMMON, "listener node 'waitEventFdsLen' field is unexpected, "
929             "actualWaitEventFdsLen=%{public}u < waitEventFdsLen=%{public}u, module=%{public}d",
930             i, node->info.waitEventFdsLen, node->module);
931     }
932 
933     *outArrayLen = i;
934     *outArray = fdArray;
935     return SOFTBUS_OK;
936 }
937 
CloseInvalidListenForcely(SoftbusListenerNode * node,int32_t listenFd,const char * anomizedIp,int32_t reason)938 static void CloseInvalidListenForcely(SoftbusListenerNode *node, int32_t listenFd, const char *anomizedIp,
939     int32_t reason)
940 {
941     CONN_CHECK_AND_RETURN_LOGE(
942         SoftBusMutexLock(&node->lock) == SOFTBUS_OK, CONN_COMMON, "lock failed, module=%{public}d",
943         node->module);
944     do {
945         if (node->info.status != LISTENER_RUNNING || node->info.modeType != SERVER_MODE ||
946             node->info.listenFd != listenFd) {
947             break;
948         }
949         CONN_LOGW(CONN_COMMON, "forcely close to prevent repeat wakeup select, module=%{public}d, "
950             "listenFd=%{public}d, port=%{public}d, ip=%{public}s, error=%{public}d",
951             node->module, node->info.listenFd, node->info.listenPort, anomizedIp, reason);
952         ConnCloseSocket(node->info.listenFd);
953         node->info.listenFd = -1;
954         node->info.listenPort = -1;
955     } while (false);
956     SoftBusMutexUnlock(&node->lock);
957 }
958 
ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,int32_t wakeupTrace)959 static void ProcessSpecifiedListenerNodeEvent(SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet,
960     SoftBusFdSet *exceptSet, int32_t wakeupTrace)
961 {
962     CONN_CHECK_AND_RETURN_LOGE(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, CONN_COMMON,
963         "lock failed, wakeupTrace=%{public}d, module=%{public}d", wakeupTrace, node->module);
964     if (node->info.status != LISTENER_RUNNING) {
965         SoftBusMutexUnlock(&node->lock);
966         return;
967     }
968 
969     int32_t listenFd = -1;
970     int32_t listenPort = -1;
971     char animizedIp[IP_LEN] = { 0 };
972     if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
973         listenFd = node->info.listenFd;
974         listenPort = node->info.listenPort;
975         ConvertAnonymizeIpAddress(animizedIp, IP_LEN, node->info.listenerInfo.socketOption.addr, IP_LEN);
976     }
977     const SocketInterface *socketIf = node->socketIf;
978     SoftbusBaseListener listener = node->listener;
979     ConnectType connectType = node->info.listenerInfo.type;
980 
981     FdNode *fdArray = NULL;
982     uint32_t fdArrayLen = 0;
983     int32_t status = CopyWaitEventFdsUnsafe(node, &fdArray, &fdArrayLen);
984     SoftBusMutexUnlock(&node->lock);
985     if (status != SOFTBUS_OK) {
986         CONN_LOGE(CONN_COMMON,
987             "copy wait event fds failed, wakeupTrace=%{public}d, module=%{public}d, error=%{public}d",
988             wakeupTrace, node->module, status);
989         return;
990     }
991 
992     if (listenFd > 0 && SoftBusSocketFdIsset(listenFd, readSet)) {
993         status =
994             ProcessSpecifiedServerAcceptEvent(node->module, listenFd, connectType, socketIf, &listener, wakeupTrace);
995         switch (status) {
996             case SOFTBUS_OK:
997             case SOFTBUS_ADAPTER_SOCKET_EAGAIN:
998                 break;
999             case SOFTBUS_ADAPTER_SOCKET_EINVAL:
1000             case SOFTBUS_ADAPTER_SOCKET_EBADF:
1001                 CloseInvalidListenForcely(node, listenFd, animizedIp, status);
1002                 break;
1003             default:
1004                 CONN_LOGD(CONN_COMMON,
1005                     "accept client failed, wakeupTrace=%{public}d, module=%{public}d, listenFd=%{public}d, "
1006                     "port=%{public}d, ip=%{public}s, error=%{public}d",
1007                     wakeupTrace, node->module, listenFd, listenPort, animizedIp, status);
1008                 break;
1009         }
1010     }
1011 
1012     if (fdArrayLen == 0) {
1013         return;
1014     }
1015 
1016     for (uint32_t i = 0; i < fdArrayLen; i++) {
1017         if ((fdArray[i].triggerSet & READ_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, readSet)) {
1018             CONN_LOGD(CONN_COMMON, "trigger IN event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1019                                    "triggerSet=%{public}u",
1020                 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1021             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_IN, &listener, wakeupTrace);
1022         }
1023         if ((fdArray[i].triggerSet & WRITE_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, writeSet)) {
1024             CONN_LOGD(CONN_COMMON, "trigger OUT event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1025                                    "triggerSet=%{public}u",
1026                 wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1027             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_OUT, &listener, wakeupTrace);
1028         }
1029         if ((fdArray[i].triggerSet & EXCEPT_TRIGGER) != 0 && SoftBusSocketFdIsset(fdArray[i].fd, exceptSet)) {
1030             CONN_LOGW(CONN_COMMON,
1031                 "trigger EXCEPTION(out-of-band data) event, wakeupTrace=%{public}d, module=%{public}d, fd=%{public}d, "
1032                 "triggerSet=%{public}u", wakeupTrace, node->module, fdArray[i].fd, fdArray[i].triggerSet);
1033             DispatchFdEvent(fdArray[i].fd, node->module, SOFTBUS_SOCKET_EXCEPTION, &listener, wakeupTrace);
1034         }
1035         // RW_TRIGGER is already triggered in READ_TRIGGER and WRITE_TRIGGER, just skip it
1036     }
1037     SoftBusFree(fdArray);
1038 }
1039 
ProcessEvent(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet,const SelectThreadState * selectState,int32_t wakeupTrace)1040 static void ProcessEvent(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet,
1041     const SelectThreadState *selectState, int32_t wakeupTrace)
1042 {
1043     for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1044         SoftbusListenerNode *node = GetListenerNode(module);
1045         if (node == NULL) {
1046             continue;
1047         }
1048         ProcessSpecifiedListenerNodeEvent(node, readSet, writeSet, exceptSet, wakeupTrace);
1049         ReturnListenerNode(&node);
1050     }
1051 
1052     if (SoftBusSocketFdIsset(selectState->ctrlRfd, readSet)) {
1053         ProcessCtrlFdEvent(selectState->ctrlRfd, wakeupTrace);
1054     }
1055 }
1056 
CollectSpecifiedModuleListenerEvents(SoftbusListenerNode * node,SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1057 static int32_t CollectSpecifiedModuleListenerEvents(
1058     SoftbusListenerNode *node, SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1059 {
1060     CONN_CHECK_AND_RETURN_RET_LOGE(SoftBusMutexLock(&node->lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON,
1061         "lock failed, module=%{public}d", node->module);
1062 
1063     if (node->info.status != LISTENER_RUNNING) {
1064         (void)SoftBusMutexUnlock(&node->lock);
1065         return 0;
1066     }
1067 
1068     int32_t maxFd = 0;
1069     if (node->info.modeType == SERVER_MODE && node->info.listenFd > 0) {
1070         SoftBusSocketFdSet(node->info.listenFd, readSet);
1071         maxFd = node->info.listenFd;
1072     }
1073 
1074     FdNode *it = NULL;
1075     LIST_FOR_EACH_ENTRY(it, &node->info.waitEventFds, FdNode, node) {
1076         if ((it->triggerSet & READ_TRIGGER) != 0) {
1077             SoftBusSocketFdSet(it->fd, readSet);
1078         }
1079         if ((it->triggerSet & WRITE_TRIGGER) != 0) {
1080             SoftBusSocketFdSet(it->fd, writeSet);
1081         }
1082         if ((it->triggerSet & EXCEPT_TRIGGER) != 0) {
1083             SoftBusSocketFdSet(it->fd, exceptSet);
1084         }
1085         // RW_TRIGGER is already collected in READ_TRIGGER and WRITE_TRIGGER, just skip it
1086         maxFd = it->fd > maxFd ? it->fd : maxFd;
1087     }
1088     (void)SoftBusMutexUnlock(&node->lock);
1089     return maxFd;
1090 }
1091 
CollectWaitEventFdSet(SoftBusFdSet * readSet,SoftBusFdSet * writeSet,SoftBusFdSet * exceptSet)1092 static int32_t CollectWaitEventFdSet(SoftBusFdSet *readSet, SoftBusFdSet *writeSet, SoftBusFdSet *exceptSet)
1093 {
1094     int32_t maxFd = 0;
1095     int32_t statusOrFd = 0;
1096     int32_t status = SOFTBUS_OK;
1097     do {
1098         for (ListenerModule module = 0; module < UNUSE_BUTT; module++) {
1099             SoftbusListenerNode *node = GetListenerNode(module);
1100             if (node == NULL) {
1101                 continue;
1102             }
1103             statusOrFd = CollectSpecifiedModuleListenerEvents(node, readSet, writeSet, exceptSet);
1104             ReturnListenerNode(&node);
1105             if (statusOrFd < 0) {
1106                 status = statusOrFd;
1107                 CONN_LOGE(CONN_COMMON, "collect wait event fd set failed: module=%{public}d, error=%{public}d",
1108                     module, status);
1109                 break;
1110             }
1111             maxFd = statusOrFd > maxFd ? statusOrFd : maxFd;
1112         }
1113     } while (false);
1114 
1115     if (status != SOFTBUS_OK) {
1116         SoftBusSocketFdZero(readSet);
1117         SoftBusSocketFdZero(writeSet);
1118         SoftBusSocketFdZero(exceptSet);
1119         return status;
1120     }
1121 
1122     return maxFd;
1123 }
1124 
SelectTask(void * arg)1125 static void *SelectTask(void *arg)
1126 {
1127     static int32_t wakeupTraceIdGenerator = 0;
1128 
1129     CONN_CHECK_AND_RETURN_RET_LOGW(arg != NULL, NULL, CONN_COMMON, "invalid param");
1130     SelectThreadState *selectState = (SelectThreadState *)arg;
1131     while (true) {
1132         int status = SoftBusMutexLock(&selectState->lock);
1133         if (status != SOFTBUS_OK) {
1134             CONN_LOGE(CONN_COMMON, "lock failed, retry after some times. "
1135                                    "waitDelay=%{public}dms, selectTrace=%{public}d, error=%{public}d",
1136                 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, status);
1137             SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1138             continue;
1139         }
1140         int32_t referenceCount = selectState->referenceCount;
1141         (void)SoftBusMutexUnlock(&selectState->lock);
1142 
1143         if (referenceCount <= 0) {
1144             CONN_LOGW(CONN_COMMON, "select task, select task is not reference by others any more, exit... "
1145                                    "selectTrace=%{public}d", selectState->traceId);
1146             break;
1147         }
1148 
1149         SoftBusFdSet readSet;
1150         SoftBusFdSet writeSet;
1151         SoftBusFdSet exceptSet;
1152         SoftBusSocketFdZero(&readSet);
1153         SoftBusSocketFdZero(&writeSet);
1154         SoftBusSocketFdZero(&exceptSet);
1155         int32_t maxFdOrStatus = CollectWaitEventFdSet(&readSet, &writeSet, &exceptSet);
1156         if (maxFdOrStatus < 0) {
1157             CONN_LOGE(CONN_COMMON, "collect wait event fd set failed, retry after some times. "
1158                                    "waitDelay=%{public}dms, selectTrace=%{public}d, error=%{public}d",
1159                 SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS, selectState->traceId, maxFdOrStatus);
1160             SoftBusSocketFdZero(&readSet);
1161             SoftBusSocketFdZero(&writeSet);
1162             SoftBusSocketFdZero(&exceptSet);
1163             SoftBusSleepMs(SELECT_UNEXPECT_FAIL_RETRY_WAIT_MILLIS);
1164             continue;
1165         }
1166         SoftBusSocketFdSet(selectState->ctrlRfd, &readSet);
1167         SoftBusSockTimeOut timeout = {0};
1168         timeout.sec = SOFTBUS_LISTENER_SELECT_TIMEOUT_SEC;
1169         int32_t maxFd = maxFdOrStatus > selectState->ctrlRfd ? maxFdOrStatus : selectState->ctrlRfd;
1170         CONN_LOGI(CONN_COMMON, "select is waking up, maxFd=%{public}d, ctrlRfd=%{public}d",
1171             maxFd, selectState->ctrlRfd);
1172         int32_t nEvents = SoftBusSocketSelect(maxFd + 1, &readSet, &writeSet, &exceptSet, &timeout);
1173         int32_t wakeupTraceId = ++wakeupTraceIdGenerator;
1174         if (nEvents == 0) {
1175             continue;
1176         }
1177         if (nEvents < 0) {
1178             CONN_LOGE(CONN_COMMON, "unexpect wakeup, retry after some times. "
1179                                    "waitDelay=%{public}dms, wakeupTraceId=%{public}d, events=%{public}d",
1180                 SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS, wakeupTraceId, nEvents);
1181             SoftBusSleepMs(SELECT_ABNORMAL_EVENT_RETRY_WAIT_MILLIS);
1182             continue;
1183         }
1184         CONN_LOGI(CONN_COMMON, "select task, wakeup from select, selectTrace=%{public}d, wakeupTraceId=%{public}d, "
1185                                "events=%{public}d", selectState->traceId, wakeupTraceId, nEvents);
1186         ProcessEvent(&readSet, &writeSet, &exceptSet, selectState, wakeupTraceId);
1187     }
1188     CleanupSelectThreadState(&selectState);
1189     return NULL;
1190 }
1191 
StartSelectThread(void)1192 static int32_t StartSelectThread(void)
1193 {
1194     static int32_t selectThreadTraceIdGenerator = 1;
1195 
1196     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1197     CONN_CHECK_AND_RETURN_RET_LOGE(
1198         status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON, "lock global select thread state failed");
1199 
1200     do {
1201         if (g_selectThreadState != NULL) {
1202             status = SoftBusMutexLock(&g_selectThreadState->lock);
1203             if (status != SOFTBUS_OK) {
1204                 CONN_LOGE(CONN_COMMON, "lock select thread state self failed, error=%{public}d", status);
1205                 status = SOFTBUS_LOCK_ERR;
1206                 break;
1207             }
1208             int32_t referenceCount = ++g_selectThreadState->referenceCount;
1209             (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1210             WakeupSelectThread();
1211 
1212             CONN_LOGD(CONN_COMMON,
1213                 "select thread is already start, selectTrace=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d, "
1214                 "referenceCount=%{public}d",
1215                 g_selectThreadState->traceId, g_selectThreadState->ctrlRfd, g_selectThreadState->ctrlWfd,
1216                 referenceCount);
1217             break;
1218         }
1219 
1220         SelectThreadState *state = SoftBusCalloc(sizeof(SelectThreadState));
1221         if (state == NULL) {
1222             status = SOFTBUS_MALLOC_ERR;
1223             break;
1224         }
1225         state->traceId = ++selectThreadTraceIdGenerator;
1226         int32_t fds[2] = { 0 };
1227         int32_t rc = 0;
1228 #ifndef __LITEOS__
1229         rc = pipe2(fds, O_CLOEXEC | O_NONBLOCK);
1230 #endif
1231         if (rc != 0) {
1232             CONN_LOGE(CONN_COMMON, "create ctrl pipe failed, rc=%{public}d, error=%{public}d(%{public}s)",
1233                 rc, errno, strerror(errno));
1234             SoftBusFree(state);
1235             status = SOFTBUS_INVALID_NUM;
1236             break;
1237         }
1238         state->ctrlRfd = fds[0];
1239         state->ctrlWfd = fds[1];
1240 
1241         status = SoftBusMutexInit(&state->lock, NULL);
1242         if (status != SOFTBUS_OK) {
1243             CONN_LOGE(CONN_COMMON, "start select task async failed, error=%{public}d", status);
1244             CleanupSelectThreadState(&state);
1245             break;
1246         }
1247         state->referenceCount = 1;
1248         status = ConnStartActionAsync(state, SelectTask, "Select_Tsk");
1249         if (status != SOFTBUS_OK) {
1250             CONN_LOGE(CONN_COMMON, "init lock failed, error=%{public}d", status);
1251             CleanupSelectThreadState(&state);
1252             break;
1253         }
1254         CONN_LOGI(CONN_COMMON,
1255             "start select thread success, traceId=%{public}d, ctrlRfd=%{public}d, ctrlWfd=%{public}d",
1256             state->traceId, state->ctrlRfd, state->ctrlWfd);
1257         g_selectThreadState = state;
1258     } while (false);
1259     (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1260     return status;
1261 }
1262 
StopSelectThread(void)1263 static int32_t StopSelectThread(void)
1264 {
1265     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1266     CONN_CHECK_AND_RETURN_RET_LOGE(
1267         status == SOFTBUS_OK, SOFTBUS_LOCK_ERR, CONN_COMMON, "lock global select thread state failed");
1268     do {
1269         if (g_selectThreadState == NULL) {
1270             CONN_LOGW(CONN_COMMON, "select thread is already stop or never start");
1271             break;
1272         }
1273 
1274         status = SoftBusMutexLock(&g_selectThreadState->lock);
1275         if (status != SOFTBUS_OK) {
1276             CONN_LOGE(CONN_COMMON, "lock select thread state self");
1277             break;
1278         }
1279         g_selectThreadState->referenceCount -= 1;
1280         int32_t referenceCount = g_selectThreadState->referenceCount;
1281         (void)SoftBusMutexUnlock(&g_selectThreadState->lock);
1282         if (referenceCount <= 0) {
1283             CONN_LOGW(CONN_COMMON, "select thread is not used by other module any more, notify "
1284                 "exit, thread reference count=%{public}d", referenceCount);
1285             WakeupSelectThread();
1286             g_selectThreadState = NULL;
1287         }
1288     } while (false);
1289     (void)SoftBusMutexUnlock(&g_selectThreadStateLock);
1290     return status;
1291 }
1292 
WakeupSelectThread(void)1293 static void WakeupSelectThread(void)
1294 {
1295 #ifndef __LITEOS__
1296     static int32_t selectWakeupTraceIdGenerator = 0;
1297 
1298     int32_t status = SoftBusMutexLock(&g_selectThreadStateLock);
1299     CONN_CHECK_AND_RETURN_LOGE(
1300         status == SOFTBUS_OK, CONN_COMMON, "lock global select thread state failed, error=%{public}d", status);
1301     do {
1302         if (g_selectThreadState == NULL) {
1303             CONN_LOGW(CONN_COMMON, "select thread is not running, just skip");
1304             break;
1305         }
1306         int32_t ctrlTraceId = selectWakeupTraceIdGenerator++;
1307         ssize_t len = write(g_selectThreadState->ctrlWfd, &ctrlTraceId, sizeof(ctrlTraceId));
1308         if (len == -1) {
1309             COMM_LOGE(COMM_ADAPTER, "write message fail, len=%{public}zd, ctrlTraceId=%{public}d, "
1310                 "errno=%{public}d(%{public}s)", len, ctrlTraceId, errno, strerror(errno));
1311                 break;
1312         }
1313         CONN_LOGI(CONN_COMMON, "wakeup ctrl message sent, writeLength=%{public}zd, ctrlTraceId=%{public}d",
1314             len, ctrlTraceId);
1315     } while (false);
1316     SoftBusMutexUnlock(&g_selectThreadStateLock);
1317 #endif
1318 }