1 /*
2 * Copyright (C) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "coap_app.h"
17
18 #include <errno.h>
19 #include <securec.h>
20 #include <string.h>
21 #include <inttypes.h>
22 #ifndef _WIN32
23 #include <netdb.h>
24 #include <pthread.h>
25 #include <unistd.h>
26 #endif
27
28 #include "coap_client.h"
29 #include "coap_discover.h"
30 #include "nstackx_util.h"
31 #include "nstackx_device.h"
32 #include "nstackx_epoll.h"
33 #include "nstackx_error.h"
34 #include "nstackx_dfinder_log.h"
35 #include "nstackx_event.h"
36 #include "nstackx_statistics.h"
37 #include "nstackx_device_local.h"
38
39 #define TAG "nStackXCoAP"
40
41 #define DEFAULT_COAP_TIMEOUT (COAP_RESOURCE_CHECK_TIME * COAP_TICKS_PER_SECOND)
42
43 static uint32_t GetTimeout(CoapCtxType *ctx, EpollDesc epollfd);
44
45 static List g_ctxList = { &g_ctxList, &g_ctxList };
46
GetCoapContextList(void)47 List *GetCoapContextList(void)
48 {
49 return &g_ctxList;
50 }
51
CoapContextInsert(CoapCtxType * ctx)52 static void CoapContextInsert(CoapCtxType *ctx)
53 {
54 ListInsertTail(&g_ctxList, &ctx->node);
55 }
56
CoapContextRemove(CoapCtxType * ctx)57 static void CoapContextRemove(CoapCtxType *ctx)
58 {
59 ListRemoveNode(&ctx->node);
60 }
61
CoapGetCoapCtxType(const coap_context_t * ctx)62 CoapCtxType *CoapGetCoapCtxType(const coap_context_t *ctx)
63 {
64 List *pos = NULL;
65 LIST_FOR_EACH(pos, &g_ctxList) {
66 CoapCtxType *coapCtx = (CoapCtxType *)pos;
67 if (coapCtx->ctx == ctx) {
68 return coapCtx;
69 }
70 }
71 return NULL;
72 }
73
IsCoapContextReady(void)74 bool IsCoapContextReady(void)
75 {
76 return !ListIsEmpty(&g_ctxList);
77 }
78
79 #ifdef _WIN32
80 #define SEC_TO_MILISEC 1000
81 #define MILISEC_TO_MICROSEC 1000
82 static pthread_t g_coapTid;
83
84 typedef struct {
85 EpollTask taskList[3 * MAX_COAP_SOCKET_NUM];
86 uint32_t eventList[3 * MAX_COAP_SOCKET_NUM];
87 void *dataList[3 * MAX_COAP_SOCKET_NUM];
88 uint32_t count;
89 uint32_t timeout;
90 EpollDesc epollFd;
91 } TaskListInfo;
92
93 typedef struct {
94 uint8_t terminated;
95 sem_t waitCondition;
96 pthread_mutex_t waitLock;
97 TaskListInfo taskListInfo;
98 } CoapThreadParam;
99
100 TaskListInfo g_taskListInfo = {0};
101 CoapThreadParam g_coapThreadParam = {0};
102 uint8_t g_coapThreadState = NSTACKX_FALSE;
103 #endif // _WIN32
104
105 typedef enum {
106 SOCKET_READ_EVENT = 0,
107 SOCKET_WRITE_EVENT,
108 SOCKET_ERROR_EVENT,
109 SOCKET_END_EVENT
110 } SocketEventType;
111 static uint64_t g_socketEventNum[SOCKET_END_EVENT];
112
CoAPEpollReadHandle(void * data)113 static void CoAPEpollReadHandle(void *data)
114 {
115 if (data == NULL) {
116 return;
117 }
118 EpollTask *task = data;
119 if (task->taskfd < 0) {
120 return;
121 }
122 if (task->ptr == NULL) {
123 return;
124 }
125 coap_socket_t *socket = task->ptr;
126
127 if (socket->flags & COAP_SOCKET_WANT_READ) {
128 socket->flags |= COAP_SOCKET_CAN_READ;
129 }
130
131 if (socket->flags & COAP_SOCKET_WANT_ACCEPT) {
132 socket->flags |= COAP_SOCKET_CAN_ACCEPT;
133 }
134 g_socketEventNum[SOCKET_READ_EVENT]++;
135 }
136
CoAPEpollWriteHandle(void * data)137 static void CoAPEpollWriteHandle(void *data)
138 {
139 if (data == NULL) {
140 return;
141 }
142 EpollTask *task = data;
143 if (task->taskfd < 0) {
144 return;
145 }
146 if (task->ptr == NULL) {
147 return;
148 }
149 coap_socket_t *socket = task->ptr;
150
151 if (socket->flags & COAP_SOCKET_WANT_WRITE) {
152 socket->flags |= COAP_SOCKET_CAN_WRITE;
153 }
154
155 if (socket->flags & COAP_SOCKET_WANT_CONNECT) {
156 socket->flags |= COAP_SOCKET_CAN_CONNECT;
157 }
158 g_socketEventNum[SOCKET_WRITE_EVENT]++;
159 }
160
CoAPEpollErrorHandle(void * data)161 static void CoAPEpollErrorHandle(void *data)
162 {
163 EpollTask *task = data;
164 if (task == NULL || task->taskfd < 0) {
165 return;
166 }
167 coap_socket_t *socket = task->ptr;
168 if (socket == NULL) {
169 return;
170 }
171
172 IncStatistics(STATS_SOCKET_ERROR);
173 g_socketEventNum[SOCKET_ERROR_EVENT]++;
174
175 List *pos = NULL;
176 LIST_FOR_EACH(pos, &g_ctxList) {
177 CoapCtxType *ctx = (CoapCtxType *)pos;
178 if (IsCoapCtxEndpointSocket(ctx->ctx, socket->fd)) {
179 DFINDER_LOGE(TAG, "coap epoll error occurred");
180 ctx->socketErrFlag = NSTACKX_TRUE;
181 return;
182 }
183 }
184
185 DFINDER_LOGE(TAG, "coap session socket error occurred and close it");
186 DeRegisterEpollTask(task);
187 CloseDesc(socket->fd);
188 socket->fd = -1;
189 task->taskfd = -1;
190 }
191
RegisterCtxTask(EpollDesc epollfd)192 static uint32_t RegisterCtxTask(EpollDesc epollfd)
193 {
194 uint32_t minTimeout = DEFAULT_COAP_TIMEOUT;
195 List *pos = NULL;
196 LIST_FOR_EACH(pos, &g_ctxList) {
197 CoapCtxType *ctx = (CoapCtxType *)pos;
198 uint32_t currentTimeout = GetTimeout(ctx, epollfd);
199 if (currentTimeout < minTimeout) {
200 minTimeout = currentTimeout;
201 }
202 }
203
204 return minTimeout;
205 }
206
207 #ifdef _WIN32
RegisterCoAPEpollTask(EpollDesc epollfd)208 uint32_t RegisterCoAPEpollTask(EpollDesc epollfd)
209 {
210 (void)memset_s(&g_taskListInfo, sizeof(g_taskListInfo), 0, sizeof(g_taskListInfo));
211 uint32_t timeout = RegisterCtxTask(epollfd);
212 g_taskListInfo.epollFd = epollfd;
213 g_taskListInfo.timeout = timeout;
214 // Lock to protect g_coapThreadParam
215 if (PthreadMutexLock(&g_coapThreadParam.waitLock) != 0) {
216 DFINDER_LOGE(TAG, "Failed to lock");
217 return timeout;
218 }
219 (void)memcpy_s(&g_coapThreadParam.taskListInfo, sizeof(g_coapThreadParam.taskListInfo),
220 &g_taskListInfo, sizeof(g_taskListInfo));
221 SemPost(&g_coapThreadParam.waitCondition);
222 if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
223 DFINDER_LOGE(TAG, "Failed to unlock");
224 }
225
226 return timeout;
227 }
228 #else
RegisterCoAPEpollTask(EpollDesc epollfd)229 uint32_t RegisterCoAPEpollTask(EpollDesc epollfd)
230 {
231 return RegisterCtxTask(epollfd);
232 }
233 #endif
234
GetTimeout(CoapCtxType * ctx,EpollDesc epollfd)235 static uint32_t GetTimeout(CoapCtxType *ctx, EpollDesc epollfd)
236 {
237 uint32_t events, timeout, i;
238 coap_tick_t now;
239 coap_socket_t *sockets[MAX_COAP_SOCKET_NUM] = {0};
240
241 coap_ticks(&now);
242 timeout = coap_io_prepare_io(ctx->ctx, sockets, (uint32_t)(sizeof(sockets) / sizeof(sockets[0])),
243 &ctx->socketNum, now);
244 if (timeout == 0 || timeout > DEFAULT_COAP_TIMEOUT) {
245 timeout = DEFAULT_COAP_TIMEOUT;
246 }
247 if (ctx->socketNum > MAX_COAP_SOCKET_NUM) {
248 ctx->socketNum = MAX_COAP_SOCKET_NUM;
249 DFINDER_LOGI(TAG, "socketNum exccedd MAX_COAP_SOCKET_NUM, and set it to MAX_COAP_SOCKET_NUM");
250 }
251 for (i = 0; i < ctx->socketNum; i++) {
252 if (sockets[i]->fd < 0) {
253 continue;
254 }
255 events = 0;
256 if ((sockets[i]->flags & COAP_SOCKET_WANT_READ) || (sockets[i]->flags & COAP_SOCKET_WANT_ACCEPT)) {
257 events = EPOLLIN;
258 }
259 if ((sockets[i]->flags & COAP_SOCKET_WANT_WRITE) || (sockets[i]->flags & COAP_SOCKET_WANT_CONNECT)) {
260 events = events | EPOLLOUT;
261 }
262 if (sockets[i]->flags & COAP_SOCKET_WANT_CONNECT) {
263 events = events | EPOLLHUP | EPOLLERR;
264 }
265 ctx->taskList[i].taskfd = sockets[i]->fd;
266 ctx->taskList[i].epollfd = epollfd;
267 ctx->taskList[i].readHandle = CoAPEpollReadHandle;
268 ctx->taskList[i].writeHandle = CoAPEpollWriteHandle;
269 ctx->taskList[i].errorHandle = CoAPEpollErrorHandle;
270 ctx->taskList[i].ptr = sockets[i];
271 #ifdef _WIN32
272 TaskListInfo *info = &g_taskListInfo;
273 (void)memcpy_s(&info->taskList[info->count], sizeof(info->taskList[info->count]),
274 &ctx->taskList[i], sizeof(ctx->taskList[i]));
275 info->eventList[info->count] = events;
276 info->dataList[info->count] = &ctx->taskList[i];
277 info->count++;
278 #else
279 (void)RegisterEpollTask(&ctx->taskList[i], events);
280 #endif /* #ifdef _WIN32 */
281 }
282 return timeout;
283 }
284
DeRegisteCoAPEpollTaskCtx(CoapCtxType * ctx)285 static void DeRegisteCoAPEpollTaskCtx(CoapCtxType *ctx)
286 {
287 coap_tick_t now;
288 uint32_t i;
289
290 if (ctx->socketNum > MAX_COAP_SOCKET_NUM) {
291 ctx->socketNum = MAX_COAP_SOCKET_NUM;
292 DFINDER_LOGI(TAG, "socketNum exccedd MAX_COAP_SOCKET_NUM, and set it to MAX_COAP_SOCKET_NUM");
293 }
294
295 for (i = 0; i < ctx->socketNum; i++) {
296 if (ctx->taskList[i].taskfd < 0) {
297 continue;
298 }
299 (void)DeRegisterEpollTask(&ctx->taskList[i]);
300 }
301 ctx->socketNum = 0;
302
303 coap_ticks(&now);
304 coap_io_do_io(ctx->ctx, now);
305 }
306
DeRegisterCoAPEpollTaskCb(CoapCtxType * ctx)307 static int DeRegisterCoAPEpollTaskCb(CoapCtxType *ctx)
308 {
309 if (ctx->socketErrFlag) {
310 DFINDER_LOGI(TAG, "error of ctx socket occurred and destroy g_ctx");
311 ctx->socketErrFlag = NSTACKX_FALSE;
312 NotifyDFinderMsgRecver(DFINDER_ON_INNER_ERROR);
313 DestroyLocalIface(ctx->iface, NSTACKX_FALSE);
314 } else {
315 DeRegisteCoAPEpollTaskCtx(ctx);
316 }
317 if (ctx->freeCtxLater == NSTACKX_TRUE) {
318 CoapContextRemove(ctx);
319 coap_free_context(ctx->ctx);
320 free(ctx);
321 }
322 return NSTACKX_EOK;
323 }
324
DeRegisterCoAPEpollTask(void)325 void DeRegisterCoAPEpollTask(void)
326 {
327 List *pos = NULL;
328 List *tmp = NULL;
329 LIST_FOR_EACH_SAFE(pos, tmp, &g_ctxList) {
330 (void)DeRegisterCoAPEpollTaskCb((CoapCtxType *)pos);
331 }
332 }
333
334 #ifdef _WIN32
CoapSelectWait(TaskListInfo * taskListInfo)335 int32_t CoapSelectWait(TaskListInfo *taskListInfo)
336 {
337 fd_set readSet, writeSet, errorSet;
338 struct timeval tv;
339 int maxFd = 0;
340
341 FD_ZERO(&readSet);
342 FD_ZERO(&writeSet);
343 FD_ZERO(&errorSet);
344 for (int i = 0; i < taskListInfo->count; i++) {
345 EpollTask *task = &taskListInfo->taskList[i];
346 if (maxFd < task->taskfd) {
347 maxFd = task->taskfd;
348 }
349 if (taskListInfo->eventList[i] & EPOLLIN) {
350 FD_SET(task->taskfd, &readSet);
351 }
352 if (taskListInfo->eventList[i] & EPOLLOUT) {
353 FD_SET(task->taskfd, &writeSet);
354 }
355 FD_SET(task->taskfd, &errorSet);
356 }
357 tv.tv_sec = taskListInfo->timeout / SEC_TO_MILISEC;
358 tv.tv_usec = (taskListInfo->timeout % MILISEC_TO_MICROSEC) * MILISEC_TO_MICROSEC;
359 int ret = select(maxFd + 1, &readSet, &writeSet, &errorSet, &tv);
360 if (ret < 0) {
361 int lastError = WSAGetLastError();
362 if (lastError != WSAEINVAL) {
363 IncStatistics(STATS_SOCKET_ERROR);
364 DFINDER_LOGE(TAG, "select error ret lastError: %d", lastError);
365 return NSTACKX_EFAILED;
366 }
367 return NSTACKX_EAGAIN;
368 } else if (ret == 0) {
369 return NSTACKX_EAGAIN;
370 }
371 for (uint32_t i = 0; i < taskListInfo->count; i++) {
372 if (FD_ISSET(taskListInfo->taskList[i].taskfd, &readSet)) {
373 PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollReadHandle,
374 taskListInfo->dataList[i]);
375 }
376 if (FD_ISSET(taskListInfo->taskList[i].taskfd, &writeSet)) {
377 PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollWriteHandle,
378 taskListInfo->dataList[i]);
379 }
380 if (FD_ISSET(taskListInfo->taskList[i].taskfd, &errorSet)) {
381 PostEvent(GetMainLoopEvendChain(), taskListInfo->epollFd, CoAPEpollErrorHandle,
382 taskListInfo->dataList[i]);
383 }
384 }
385 return NSTACKX_EOK;
386 }
387
CoapIoMonitorLoop(void * arg)388 static void *CoapIoMonitorLoop(void *arg)
389 {
390 DFINDER_LOGI(TAG, "Enter CoapIoMonitorLoop");
391 TaskListInfo taskListInfo;
392
393 while (!g_coapThreadParam.terminated) {
394 SemWait(&g_coapThreadParam.waitCondition);
395 if (PthreadMutexLock(&g_coapThreadParam.waitLock) != 0) {
396 DFINDER_LOGE(TAG, "Coap thread lock failed");
397 break;
398 }
399 if (g_coapThreadParam.terminated) {
400 DFINDER_LOGI(TAG, "Coap thread terminated");
401 PthreadMutexUnlock(&g_coapThreadParam.waitLock);
402 break;
403 }
404
405 if (g_coapThreadParam.taskListInfo.count == 0) {
406 if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
407 break;
408 }
409 continue;
410 }
411 (void)memcpy_s(&taskListInfo, sizeof(taskListInfo), &g_coapThreadParam.taskListInfo,
412 sizeof(g_coapThreadParam.taskListInfo));
413 if (PthreadMutexUnlock(&g_coapThreadParam.waitLock) != 0) {
414 DFINDER_LOGE(TAG, "Coap thread unlock failed");
415 break;
416 }
417 if (CoapSelectWait(&taskListInfo) == NSTACKX_EFAILED) {
418 IncStatistics(STATS_SOCKET_ERROR);
419 DFINDER_LOGE(TAG, "Coap select failure");
420 break;
421 }
422 }
423 DFINDER_LOGI(TAG, "Exit CoapIoMonitorLoop");
424 return NULL;
425 }
426
CoapThreadInit(void)427 int32_t CoapThreadInit(void)
428 {
429 if (SemInit(&g_coapThreadParam.waitCondition, 0, 0) != NSTACKX_EOK) {
430 DFINDER_LOGE(TAG, "Failed to init sem condition");
431 return NSTACKX_EFAILED;
432 }
433
434 if (PthreadMutexInit(&g_coapThreadParam.waitLock, NULL) != NSTACKX_EOK) {
435 DFINDER_LOGE(TAG, "Faile to init lock");
436 SemDestroy(&g_coapThreadParam.waitCondition);
437 return NSTACKX_EFAILED;
438 }
439
440 int32_t ret = PthreadCreate(&g_coapTid, NULL, CoapIoMonitorLoop, NULL);
441 if (ret != 0) {
442 SemDestroy(&g_coapThreadParam.waitCondition);
443 PthreadMutexDestroy(&g_coapThreadParam.waitLock);
444 (void)memset_s(&g_coapThreadParam, sizeof(g_coapThreadParam), 0, sizeof(g_coapThreadParam));
445 DFINDER_LOGE(TAG, "thread create failed");
446 return NSTACKX_EFAILED;
447 }
448 DFINDER_LOGI(TAG, "Init CoAP thread done!");
449 return NSTACKX_EOK;
450 }
451
CoapThreadDestroy(void)452 void CoapThreadDestroy(void)
453 {
454 PthreadMutexLock(&g_coapThreadParam.waitLock);
455 g_coapThreadParam.terminated = NSTACKX_TRUE;
456 SemPost(&g_coapThreadParam.waitCondition);
457 PthreadMutexUnlock(&g_coapThreadParam.waitLock);
458 // May got block for 2 seconds.
459 PthreadJoin(g_coapTid, NULL);
460 SemDestroy(&g_coapThreadParam.waitCondition);
461 PthreadMutexDestroy(&g_coapThreadParam.waitLock);
462 (void)memset_s(&g_coapThreadParam, sizeof(g_coapThreadParam), 0, sizeof(g_coapThreadParam));
463 }
464 #endif
465
CoapServerDestroy(CoapCtxType * ctx,bool moduleDeinit)466 void CoapServerDestroy(CoapCtxType *ctx, bool moduleDeinit)
467 {
468 DFINDER_LOGD(TAG, "coap server destroy, module deinit: %d", moduleDeinit);
469
470 for (uint32_t i = 0; i < ctx->socketNum && i < MAX_COAP_SOCKET_NUM; ++i) {
471 if (ctx->taskList[i].taskfd < 0) {
472 continue;
473 }
474 (void)DeRegisterEpollTask(&ctx->taskList[i]);
475 }
476
477 if (moduleDeinit) {
478 CoapContextRemove(ctx);
479 coap_free_context(ctx->ctx);
480 free(ctx);
481 } else {
482 // release the context after EpollLoop has processed this round of tasks
483 ctx->freeCtxLater = NSTACKX_TRUE;
484 }
485 }
486
CoapServerInit(const struct in_addr * ip,void * iface)487 CoapCtxType *CoapServerInit(const struct in_addr *ip, void *iface)
488 {
489 DFINDER_LOGI(TAG, "CoapServerInit");
490 CoapCtxType *ctx = calloc(1, sizeof(CoapCtxType));
491 if (ctx == NULL) {
492 DFINDER_LOGE(TAG, "alloc failed");
493 return NULL;
494 }
495
496 coap_startup();
497
498 char addrStr[NI_MAXHOST] = COAP_SRV_DEFAULT_ADDR;
499 char portStr[NI_MAXSERV] = COAP_SRV_DEFAULT_PORT;
500
501 ctx->ctx = CoapGetContext(addrStr, portStr, NSTACKX_TRUE, ip);
502 if (ctx->ctx == NULL) {
503 DFINDER_LOGE(TAG, "coap init get context failed");
504 free(ctx);
505 return NULL;
506 }
507
508 if (CoapInitResources(ctx->ctx) != NSTACKX_EOK) {
509 DFINDER_LOGE(TAG, "init resource failed");
510 coap_free_context(ctx->ctx);
511 free(ctx);
512 return NULL;
513 }
514
515 coap_register_response_handler(ctx->ctx, CoapMessageHandler);
516 ctx->iface = iface;
517 CoapContextInsert(ctx);
518
519 return ctx;
520 }
521
ResetCoapSocketTaskCount(uint8_t isBusy)522 void ResetCoapSocketTaskCount(uint8_t isBusy)
523 {
524 List *pos = NULL;
525 LIST_FOR_EACH(pos, &g_ctxList) {
526 CoapCtxType *ctx = (CoapCtxType *)pos;
527 uint64_t totalTaskCount = 0;
528 uint32_t i;
529 for (i = 0; i < ctx->socketNum && i < MAX_COAP_SOCKET_NUM; i++) {
530 if (totalTaskCount < UINT64_MAX && ctx->taskList[i].count <= UINT64_MAX - totalTaskCount) {
531 totalTaskCount += ctx->taskList[i].count;
532 }
533 ctx->taskList[i].count = 0;
534 }
535
536 if (isBusy) {
537 DFINDER_LOGI(TAG, "in this busy interval, socket task count of iface %s is: %" PRIu64,
538 GetLocalIfaceName(ctx->iface), totalTaskCount);
539 }
540 }
541
542 if (isBusy) {
543 DFINDER_LOGI(TAG, "in this busy interval, socket event count: read %" PRIu64
544 ", write %" PRIu64 ", error %" PRIu64,
545 g_socketEventNum[SOCKET_READ_EVENT], g_socketEventNum[SOCKET_WRITE_EVENT],
546 g_socketEventNum[SOCKET_ERROR_EVENT]);
547 }
548
549 (void)memset_s(g_socketEventNum, sizeof(g_socketEventNum), 0, sizeof(g_socketEventNum));
550 }
551