1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "le_task.h"
16 #include <time.h>
17 #include <sys/eventfd.h>
18 
19 #include "le_loop.h"
20 
21 #define MILLION_MICROSECOND 1000000
22 #define THOUSAND_MILLISECOND 1000
23 
DoAsyncEvent_(const LoopHandle loopHandle,AsyncEventTask * asyncTask)24 static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask)
25 {
26     LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters");
27 #ifdef LOOP_DEBUG
28     struct timespec startTime = {0};
29     struct timespec endTime = {0};
30     long long diff;
31     clock_gettime(CLOCK_MONOTONIC, &(startTime));
32 #endif
33     StreamTask *task = &asyncTask->stream;
34     ListNode *node = task->buffHead.next;
35     if (node != &task->buffHead) {
36         LE_Buffer *buffer = ListEntry(node, LE_Buffer, node);
37         uint64_t eventId = *(uint64_t*)(buffer->data);
38         if (asyncTask->processAsyncEvent) {
39             asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId,
40                 (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize);
41         }
42         OH_ListRemove(&buffer->node);
43         free(buffer);
44 #ifdef LOOP_DEBUG
45         clock_gettime(CLOCK_MONOTONIC, &(endTime));
46         diff = (long long)(endTime.tv_sec - startTime.tv_sec) * MILLION_MICROSECOND;
47         if (endTime.tv_nsec > startTime.tv_nsec) {
48             diff += (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
49         } else {
50             diff -= (endTime.tv_nsec - startTime.tv_nsec) / THOUSAND_MILLISECOND; // 1000 ms
51         }
52         LE_LOGI("DoAsyncEvent_ diff %ld",  diff);
53 #endif
54     }
55 }
56 
57 #ifdef STARTUP_INIT_TEST
LE_DoAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle)58 void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle)
59 {
60     AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
61     while (!IsBufferEmpty(&asyncTask->stream)) {
62         DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle);
63     }
64 }
65 #endif
66 
HandleAsyncEvent_(const LoopHandle loopHandle,const TaskHandle taskHandle,uint32_t oper)67 static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper)
68 {
69     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
70     LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper);
71     EventLoop *loop = (EventLoop *)loopHandle;
72     AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle;
73     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
74         uint64_t eventId = 0;
75         int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
76         LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId);
77         DoAsyncEvent_(loopHandle, asyncTask);
78         if (!IsBufferEmpty(&asyncTask->stream)) {
79             loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_WRITE);
80             return LE_SUCCESS;
81         }
82     } else {
83         static uint64_t eventId = 0;
84         (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId));
85         loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
86         eventId++;
87     }
88     return LE_SUCCESS;
89 }
90 
HandleAsyncTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)91 static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
92 {
93     BaseTask *task = (BaseTask *)taskHandle;
94     DelTask((EventLoop *)loopHandle, task);
95     CloseTask(loopHandle, task);
96     close(task->taskId.fd);
97 }
98 
DumpEventTaskInfo_(const TaskHandle task)99 static void DumpEventTaskInfo_(const TaskHandle task)
100 {
101     INIT_CHECK(task != NULL, return);
102     BaseTask *baseTask = (BaseTask *)task;
103     AsyncEventTask *eventTask = (AsyncEventTask *)baseTask;
104     printf("\tfd: %d \n", eventTask->stream.base.taskId.fd);
105     printf("\t  TaskType: %s\n", "EventTask");
106 }
107 
LE_CreateAsyncTask(const LoopHandle loopHandle,TaskHandle * taskHandle,LE_ProcessAsyncEvent processAsyncEvent)108 LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle,
109     TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent)
110 {
111     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
112     LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent ");
113 
114     int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
115     LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd ");
116     LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL};
117     AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask));
118     LE_CHECK(task != NULL, close(fd);
119         return LE_NO_MEMORY, "Failed to create task");
120     task->stream.base.handleEvent = HandleAsyncEvent_;
121     task->stream.base.innerClose = HandleAsyncTaskClose_;
122     task->stream.base.dumpTaskInfo = DumpEventTaskInfo_;
123     OH_ListInit(&task->stream.buffHead);
124     LoopMutexInit(&task->stream.mutex);
125     task->processAsyncEvent = processAsyncEvent;
126     EventLoop *loop = (EventLoop *)loopHandle;
127     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
128     *taskHandle = (TaskHandle)task;
129     return LE_SUCCESS;
130 }
131 
LE_StartAsyncEvent(const LoopHandle loopHandle,const TaskHandle taskHandle,uint64_t eventId,const uint8_t * data,uint32_t buffLen)132 LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle,
133     const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen)
134 {
135     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
136     BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId));
137     char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL);
138     LE_CHECK(buff != NULL, return LE_FAILURE, "Failed to get buff");
139     int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId));
140     LE_CHECK(ret == 0, return -1, "Failed to copy data");
141     if (data != NULL && buffLen > 0) {
142         ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen);
143         LE_CHECK(ret == 0, return -1, "Failed to copy data");
144         buff[sizeof(eventId) + buffLen] = '\0';
145     }
146     return LE_Send(loopHandle, taskHandle, handle, buffLen);
147 }
148 
LE_StopAsyncTask(LoopHandle loopHandle,TaskHandle taskHandle)149 void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle)
150 {
151     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
152     LE_CloseTask(loopHandle, taskHandle);
153 }