1 /*
2  * Copyright (C) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "coap_app.h"
17 
18 #include <errno.h>
19 #include <securec.h>
20 #include <string.h>
21 #include <inttypes.h>
22 #ifndef _WIN32
23 #include <netdb.h>
24 #include <pthread.h>
25 #include <unistd.h>
26 #endif
27 
28 #include "coap_client.h"
29 #include "coap_discover.h"
30 #include "nstackx_util.h"
31 #include "nstackx_device.h"
32 #include "nstackx_epoll.h"
33 #include "nstackx_error.h"
34 #include "nstackx_dfinder_log.h"
35 #include "nstackx_event.h"
36 #include "nstackx_statistics.h"
37 #include "nstackx_device_local.h"
38 
39 #define TAG "nStackXCoAP"
40 
41 #define DEFAULT_COAP_TIMEOUT (COAP_RESOURCE_CHECK_TIME * COAP_TICKS_PER_SECOND)
42 
43 static uint32_t GetTimeout(CoapCtxType *ctx, EpollDesc epollfd);
44 
45 static List g_ctxList = { &g_ctxList, &g_ctxList };
46 
GetCoapContextList(void)47 List *GetCoapContextList(void)
48 {
49     return &g_ctxList;
50 }
51 
CoapContextInsert(CoapCtxType * ctx)52 static void CoapContextInsert(CoapCtxType *ctx)
53 {
54     ListInsertTail(&g_ctxList, &ctx->node);
55 }
56 
CoapContextRemove(CoapCtxType * ctx)57 static void CoapContextRemove(CoapCtxType *ctx)
58 {
59     ListRemoveNode(&ctx->node);
60 }
61 
CoapGetCoapCtxType(const coap_context_t * ctx)62 CoapCtxType *CoapGetCoapCtxType(const coap_context_t *ctx)
63 {
64     List *pos = NULL;
65     LIST_FOR_EACH(pos, &g_ctxList) {
66         CoapCtxType *coapCtx = (CoapCtxType *)pos;
67         if (coapCtx->ctx == ctx) {
68             return coapCtx;
69         }
70     }
71     return NULL;
72 }
73 
IsCoapContextReady(void)74 bool IsCoapContextReady(void)
75 {
76     return !ListIsEmpty(&g_ctxList);
77 }
78 
79 #ifdef _WIN32
80 #define SEC_TO_MILISEC 1000
81 #define MILISEC_TO_MICROSEC 1000
82 static pthread_t g_coapTid;
83 
84 typedef struct {
85     EpollTask taskList[3 * MAX_COAP_SOCKET_NUM];
86     uint32_t eventList[3 * MAX_COAP_SOCKET_NUM];
87     void *dataList[3 * MAX_COAP_SOCKET_NUM];
88     uint32_t count;
89     uint32_t timeout;
90     EpollDesc epollFd;
91 } TaskListInfo;
92 
93 typedef struct {
94     uint8_t terminated;
95     sem_t waitCondition;
96     pthread_mutex_t waitLock;
97     TaskListInfo taskListInfo;
98 } CoapThreadParam;
99 
100 TaskListInfo g_taskListInfo = {0};
101 CoapThreadParam g_coapThreadParam = {0};
102 uint8_t g_coapThreadState = NSTACKX_FALSE;
103 #endif // _WIN32
104 
105 typedef enum {
106     SOCKET_READ_EVENT = 0,
107     SOCKET_WRITE_EVENT,
108     SOCKET_ERROR_EVENT,
109     SOCKET_END_EVENT
110 } SocketEventType;
111 static uint64_t g_socketEventNum[SOCKET_END_EVENT];
112 
CoAPEpollReadHandle(void * data)113 static void CoAPEpollReadHandle(void *data)
114 {
115     if (data == NULL) {
116         return;
117     }
118     EpollTask *task = data;
119     if (task->taskfd < 0) {
120         return;
121     }
122     if (task->ptr == NULL) {
123         return;
124     }
125     coap_socket_t *socket = task->ptr;
126 
127     if (socket->flags & COAP_SOCKET_WANT_READ) {
128         socket->flags |= COAP_SOCKET_CAN_READ;
129     }
130 
131     if (socket->flags & COAP_SOCKET_WANT_ACCEPT) {
132         socket->flags |= COAP_SOCKET_CAN_ACCEPT;
133     }
134     g_socketEventNum[SOCKET_READ_EVENT]++;
135 }
136 
CoAPEpollWriteHandle(void * data)137 static void CoAPEpollWriteHandle(void *data)
138 {
139     if (data == NULL) {
140         return;
141     }
142     EpollTask *task = data;
143     if (task->taskfd < 0) {
144         return;
145     }
146     if (task->ptr == NULL) {
147         return;
148     }
149     coap_socket_t *socket = task->ptr;
150 
151     if (socket->flags & COAP_SOCKET_WANT_WRITE) {
152         socket->flags |= COAP_SOCKET_CAN_WRITE;
153     }
154 
155     if (socket->flags & COAP_SOCKET_WANT_CONNECT) {
156         socket->flags |= COAP_SOCKET_CAN_CONNECT;
157     }
158     g_socketEventNum[SOCKET_WRITE_EVENT]++;
159 }
160 
CoAPEpollErrorHandle(void * data)161 static void CoAPEpollErrorHandle(void *data)
162 {
163     EpollTask *task = data;
164     if (task == NULL || task->taskfd < 0) {
165         return;
166     }
167     coap_socket_t *socket = task->ptr;
168     if (socket == NULL) {
169         return;
170     }
171 
172     IncStatistics(STATS_SOCKET_ERROR);
173     g_socketEventNum[SOCKET_ERROR_EVENT]++;
174 
175     List *pos = NULL;
176     LIST_FOR_EACH(pos, &g_ctxList) {
177         CoapCtxType *ctx = (CoapCtxType *)pos;
178         if (IsCoapCtxEndpointSocket(ctx->ctx, socket->fd)) {
179             DFINDER_LOGE(TAG, "coap epoll error occurred");
180             ctx->socketErrFlag = NSTACKX_TRUE;
181             return;
182         }
183     }
184 
185     DFINDER_LOGE(TAG, "coap session socket error occurred and close it");
186     DeRegisterEpollTask(task);
187     CloseDesc(socket->fd);
188     socket->fd = -1;
189     task->taskfd = -1;
190 }
191 
RegisterCtxTask(EpollDesc epollfd)192 static uint32_t RegisterCtxTask(EpollDesc epollfd)
193 {
194     uint32_t minTimeout = DEFAULT_COAP_TIMEOUT;
195     List *pos = NULL;
196     LIST_FOR_EACH(pos, &g_ctxList) {
197         CoapCtxType *ctx = (CoapCtxType *)pos;
198         uint32_t currentTimeout = GetTimeout(ctx, epollfd);
199         if (currentTimeout < minTimeout) {
200             minTimeout = currentTimeout;
201         }
202     }
203 
204     return minTimeout;
205 }
206 
207 #ifdef _WIN32
RegisterCoAPEpollTask(EpollDesc epollfd)208 uint32_t RegisterCoAPEpollTask(EpollDesc epollfd)
209 {
210     (void)memset_s(&g_taskListInfo, sizeof(g_taskListInfo), 0, sizeof(g_taskListInfo));
211     uint32_t timeout = RegisterCtxTask(epollfd);
212     g_taskListInfo.epollFd = epollfd;
213     g_taskListInfo.timeout = timeout;
214     // Lock to protect g_coapThreadParam
215     if (PthreadMutexLock(&g_coapThreadParam.waitLock) != 0) {
216         DFINDER_LOGE(TAG, "Failed to lock");
217         return timeout;
218     }
219     (void)memcpy_s(&g_coapThreadParam.taskListInfo, sizeof(g_coapThreadParam.taskListInfo),
220         &g_taskListInfo, sizeof(g_taskListInfo));
221     SemPost(&g_coapThreadParam.waitCondition);
222     if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
223         DFINDER_LOGE(TAG, "Failed to unlock");
224     }
225 
226     return timeout;
227 }
228 #else
RegisterCoAPEpollTask(EpollDesc epollfd)229 uint32_t RegisterCoAPEpollTask(EpollDesc epollfd)
230 {
231     return RegisterCtxTask(epollfd);
232 }
233 #endif
234 
GetTimeout(CoapCtxType * ctx,EpollDesc epollfd)235 static uint32_t GetTimeout(CoapCtxType *ctx, EpollDesc epollfd)
236 {
237     uint32_t events, timeout, i;
238     coap_tick_t now;
239     coap_socket_t *sockets[MAX_COAP_SOCKET_NUM] = {0};
240 
241     coap_ticks(&now);
242     timeout = coap_io_prepare_io(ctx->ctx, sockets, (uint32_t)(sizeof(sockets) / sizeof(sockets[0])),
243         &ctx->socketNum, now);
244     if (timeout == 0 || timeout > DEFAULT_COAP_TIMEOUT) {
245         timeout = DEFAULT_COAP_TIMEOUT;
246     }
247     if (ctx->socketNum > MAX_COAP_SOCKET_NUM) {
248         ctx->socketNum = MAX_COAP_SOCKET_NUM;
249         DFINDER_LOGI(TAG, "socketNum exccedd MAX_COAP_SOCKET_NUM, and set it to MAX_COAP_SOCKET_NUM");
250     }
251     for (i = 0; i < ctx->socketNum; i++) {
252         if (sockets[i]->fd < 0) {
253             continue;
254         }
255         events = 0;
256         if ((sockets[i]->flags & COAP_SOCKET_WANT_READ) || (sockets[i]->flags & COAP_SOCKET_WANT_ACCEPT)) {
257             events = EPOLLIN;
258         }
259         if ((sockets[i]->flags & COAP_SOCKET_WANT_WRITE) || (sockets[i]->flags & COAP_SOCKET_WANT_CONNECT)) {
260             events = events | EPOLLOUT;
261         }
262         if (sockets[i]->flags & COAP_SOCKET_WANT_CONNECT) {
263             events = events | EPOLLHUP | EPOLLERR;
264         }
265         ctx->taskList[i].taskfd = sockets[i]->fd;
266         ctx->taskList[i].epollfd = epollfd;
267         ctx->taskList[i].readHandle = CoAPEpollReadHandle;
268         ctx->taskList[i].writeHandle = CoAPEpollWriteHandle;
269         ctx->taskList[i].errorHandle = CoAPEpollErrorHandle;
270         ctx->taskList[i].ptr = sockets[i];
271 #ifdef _WIN32
272         TaskListInfo *info = &g_taskListInfo;
273         (void)memcpy_s(&info->taskList[info->count], sizeof(info->taskList[info->count]),
274             &ctx->taskList[i], sizeof(ctx->taskList[i]));
275         info->eventList[info->count] = events;
276         info->dataList[info->count] = &ctx->taskList[i];
277         info->count++;
278 #else
279         (void)RegisterEpollTask(&ctx->taskList[i], events);
280 #endif /* #ifdef _WIN32 */
281     }
282     return timeout;
283 }
284 
DeRegisteCoAPEpollTaskCtx(CoapCtxType * ctx)285 static void DeRegisteCoAPEpollTaskCtx(CoapCtxType *ctx)
286 {
287     coap_tick_t now;
288     uint32_t i;
289 
290     if (ctx->socketNum > MAX_COAP_SOCKET_NUM) {
291         ctx->socketNum = MAX_COAP_SOCKET_NUM;
292         DFINDER_LOGI(TAG, "socketNum exccedd MAX_COAP_SOCKET_NUM, and set it to MAX_COAP_SOCKET_NUM");
293     }
294 
295     for (i = 0; i < ctx->socketNum; i++) {
296         if (ctx->taskList[i].taskfd < 0) {
297             continue;
298         }
299         (void)DeRegisterEpollTask(&ctx->taskList[i]);
300     }
301     ctx->socketNum = 0;
302 
303     coap_ticks(&now);
304     coap_io_do_io(ctx->ctx, now);
305 }
306 
DeRegisterCoAPEpollTaskCb(CoapCtxType * ctx)307 static int DeRegisterCoAPEpollTaskCb(CoapCtxType *ctx)
308 {
309     if (ctx->socketErrFlag) {
310         DFINDER_LOGI(TAG, "error of ctx socket occurred and destroy g_ctx");
311         ctx->socketErrFlag = NSTACKX_FALSE;
312         NotifyDFinderMsgRecver(DFINDER_ON_INNER_ERROR);
313         DestroyLocalIface(ctx->iface, NSTACKX_FALSE);
314     } else {
315         DeRegisteCoAPEpollTaskCtx(ctx);
316     }
317     if (ctx->freeCtxLater == NSTACKX_TRUE) {
318         CoapContextRemove(ctx);
319         coap_free_context(ctx->ctx);
320         free(ctx);
321     }
322     return NSTACKX_EOK;
323 }
324 
DeRegisterCoAPEpollTask(void)325 void DeRegisterCoAPEpollTask(void)
326 {
327     List *pos = NULL;
328     List *tmp = NULL;
329     LIST_FOR_EACH_SAFE(pos, tmp, &g_ctxList) {
330         (void)DeRegisterCoAPEpollTaskCb((CoapCtxType *)pos);
331     }
332 }
333 
334 #ifdef _WIN32
CoapSelectWait(TaskListInfo * taskListInfo)335 int32_t CoapSelectWait(TaskListInfo *taskListInfo)
336 {
337     fd_set readSet, writeSet, errorSet;
338     struct timeval tv;
339     int maxFd = 0;
340 
341     FD_ZERO(&readSet);
342     FD_ZERO(&writeSet);
343     FD_ZERO(&errorSet);
344     for (int i = 0; i < taskListInfo->count; i++) {
345         EpollTask *task = &taskListInfo->taskList[i];
346         if (maxFd < task->taskfd) {
347             maxFd = task->taskfd;
348         }
349         if (taskListInfo->eventList[i] & EPOLLIN) {
350             FD_SET(task->taskfd, &readSet);
351         }
352         if (taskListInfo->eventList[i] & EPOLLOUT) {
353             FD_SET(task->taskfd, &writeSet);
354         }
355         FD_SET(task->taskfd, &errorSet);
356     }
357     tv.tv_sec = taskListInfo->timeout / SEC_TO_MILISEC;
358     tv.tv_usec = (taskListInfo->timeout % MILISEC_TO_MICROSEC) * MILISEC_TO_MICROSEC;
359     int ret = select(maxFd + 1, &readSet, &writeSet, &errorSet, &tv);
360     if (ret < 0) {
361         int lastError = WSAGetLastError();
362         if (lastError != WSAEINVAL) {
363             IncStatistics(STATS_SOCKET_ERROR);
364             DFINDER_LOGE(TAG, "select error ret lastError: %d", lastError);
365             return NSTACKX_EFAILED;
366         }
367         return NSTACKX_EAGAIN;
368     } else if (ret == 0) {
369         return NSTACKX_EAGAIN;
370     }
371     for (uint32_t i = 0; i < taskListInfo->count; i++) {
372         if (FD_ISSET(taskListInfo->taskList[i].taskfd, &readSet)) {
373             PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollReadHandle,
374                 taskListInfo->dataList[i]);
375         }
376         if (FD_ISSET(taskListInfo->taskList[i].taskfd, &writeSet)) {
377             PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollWriteHandle,
378                 taskListInfo->dataList[i]);
379         }
380         if (FD_ISSET(taskListInfo->taskList[i].taskfd, &errorSet)) {
381             PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollErrorHandle,
382                 taskListInfo->dataList[i]);
383         }
384     }
385     return NSTACKX_EOK;
386 }
387 
CoapIoMonitorLoop(void * arg)388 static void *CoapIoMonitorLoop(void *arg)
389 {
390     DFINDER_LOGI(TAG, "Enter CoapIoMonitorLoop");
391     TaskListInfo taskListInfo;
392 
393     while (!g_coapThreadParam.terminated) {
394         SemWait(&g_coapThreadParam.waitCondition);
395         if (PthreadMutexLock(&g_coapThreadParam.waitLock) != 0) {
396             DFINDER_LOGE(TAG, "Coap thread lock failed");
397             break;
398         }
399         if (g_coapThreadParam.terminated) {
400             DFINDER_LOGI(TAG, "Coap thread terminated");
401             PthreadMutexUnlock(&g_coapThreadParam.waitLock);
402             break;
403         }
404 
405         if (g_coapThreadParam.taskListInfo.count == 0) {
406             if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
407                 break;
408             }
409             continue;
410         }
411         (void)memcpy_s(&taskListInfo, sizeof(taskListInfo), &g_coapThreadParam.taskListInfo,
412             sizeof(g_coapThreadParam.taskListInfo));
413         if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
414             DFINDER_LOGE(TAG, "Coap thread unlock failed");
415             break;
416         }
417         if (CoapSelectWait(&taskListInfo) == NSTACKX_EFAILED) {
418             IncStatistics(STATS_SOCKET_ERROR);
419             DFINDER_LOGE(TAG, "Coap select failure");
420             break;
421         }
422     }
423     DFINDER_LOGI(TAG, "Exit CoapIoMonitorLoop");
424     return NULL;
425 }
426 
CoapThreadInit(void)427 int32_t CoapThreadInit(void)
428 {
429     if (SemInit(&g_coapThreadParam.waitCondition, 0, 0) != NSTACKX_EOK) {
430         DFINDER_LOGE(TAG, "Failed to init sem condition");
431         return NSTACKX_EFAILED;
432     }
433 
434     if (PthreadMutexInit(&g_coapThreadParam.waitLock, NULL) != NSTACKX_EOK) {
435         DFINDER_LOGE(TAG, "Faile to init lock");
436         SemDestroy(&g_coapThreadParam.waitCondition);
437         return NSTACKX_EFAILED;
438     }
439 
440     int32_t ret = PthreadCreate(&g_coapTid, NULL, CoapIoMonitorLoop, NULL);
441     if (ret != 0) {
442         SemDestroy(&g_coapThreadParam.waitCondition);
443         PthreadMutexDestroy(&g_coapThreadParam.waitLock);
444         (void)memset_s(&g_coapThreadParam, sizeof(g_coapThreadParam), 0, sizeof(g_coapThreadParam));
445         DFINDER_LOGE(TAG, "thread create failed");
446         return NSTACKX_EFAILED;
447     }
448     DFINDER_LOGI(TAG, "Init CoAP thread done!");
449     return NSTACKX_EOK;
450 }
451 
CoapThreadDestroy(void)452 void CoapThreadDestroy(void)
453 {
454     PthreadMutexLock(&g_coapThreadParam.waitLock);
455     g_coapThreadParam.terminated = NSTACKX_TRUE;
456     SemPost(&g_coapThreadParam.waitCondition);
457     PthreadMutexUnlock(&g_coapThreadParam.waitLock);
458     // May got block for 2 seconds.
459     PthreadJoin(g_coapTid, NULL);
460     SemDestroy(&g_coapThreadParam.waitCondition);
461     PthreadMutexDestroy(&g_coapThreadParam.waitLock);
462     (void)memset_s(&g_coapThreadParam, sizeof(g_coapThreadParam), 0, sizeof(g_coapThreadParam));
463 }
464 #endif
465 
CoapServerDestroy(CoapCtxType * ctx,bool moduleDeinit)466 void CoapServerDestroy(CoapCtxType *ctx, bool moduleDeinit)
467 {
468     DFINDER_LOGD(TAG, "coap server destroy, module deinit: %d", moduleDeinit);
469 
470     for (uint32_t i = 0; i < ctx->socketNum && i < MAX_COAP_SOCKET_NUM; ++i) {
471         if (ctx->taskList[i].taskfd < 0) {
472             continue;
473         }
474         (void)DeRegisterEpollTask(&ctx->taskList[i]);
475     }
476 
477     if (moduleDeinit) {
478         CoapContextRemove(ctx);
479         coap_free_context(ctx->ctx);
480         free(ctx);
481     } else {
482         // release the context after EpollLoop has processed this round of tasks
483         ctx->freeCtxLater = NSTACKX_TRUE;
484     }
485 }
486 
CoapServerInit(const struct in_addr * ip,void * iface)487 CoapCtxType *CoapServerInit(const struct in_addr *ip, void *iface)
488 {
489     DFINDER_LOGI(TAG, "CoapServerInit");
490     CoapCtxType *ctx = calloc(1, sizeof(CoapCtxType));
491     if (ctx == NULL) {
492         DFINDER_LOGE(TAG, "alloc failed");
493         return NULL;
494     }
495 
496     coap_startup();
497 
498     char addrStr[NI_MAXHOST] = COAP_SRV_DEFAULT_ADDR;
499     char portStr[NI_MAXSERV] = COAP_SRV_DEFAULT_PORT;
500 
501     ctx->ctx = CoapGetContext(addrStr, portStr, NSTACKX_TRUE, ip);
502     if (ctx->ctx == NULL) {
503         DFINDER_LOGE(TAG, "coap init get context failed");
504         free(ctx);
505         return NULL;
506     }
507 
508     if (CoapInitResources(ctx->ctx) != NSTACKX_EOK) {
509         DFINDER_LOGE(TAG, "init resource failed");
510         coap_free_context(ctx->ctx);
511         free(ctx);
512         return NULL;
513     }
514 
515     coap_register_response_handler(ctx->ctx, CoapMessageHandler);
516     ctx->iface = iface;
517     CoapContextInsert(ctx);
518 
519     return ctx;
520 }
521 
ResetCoapSocketTaskCount(uint8_t isBusy)522 void ResetCoapSocketTaskCount(uint8_t isBusy)
523 {
524     List *pos = NULL;
525     LIST_FOR_EACH(pos, &g_ctxList) {
526         CoapCtxType *ctx = (CoapCtxType *)pos;
527         uint64_t totalTaskCount = 0;
528         uint32_t i;
529         for (i = 0; i < ctx->socketNum && i < MAX_COAP_SOCKET_NUM; i++) {
530             if (totalTaskCount < UINT64_MAX && ctx->taskList[i].count <= UINT64_MAX - totalTaskCount) {
531                 totalTaskCount += ctx->taskList[i].count;
532             }
533             ctx->taskList[i].count = 0;
534         }
535 
536         if (isBusy) {
537             DFINDER_LOGI(TAG, "in this busy interval, socket task count of iface %s is: %" PRIu64,
538                 GetLocalIfaceName(ctx->iface), totalTaskCount);
539         }
540     }
541 
542     if (isBusy) {
543         DFINDER_LOGI(TAG, "in this busy interval, socket event count: read %" PRIu64
544             ", write %" PRIu64 ", error %" PRIu64,
545             g_socketEventNum[SOCKET_READ_EVENT], g_socketEventNum[SOCKET_WRITE_EVENT],
546             g_socketEventNum[SOCKET_ERROR_EVENT]);
547     }
548 
549     (void)memset_s(g_socketEventNum, sizeof(g_socketEventNum), 0, sizeof(g_socketEventNum));
550 }
551