1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "platform/include/reactor.h"
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <pthread.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include "platform/include/mutex.h"
23 #include "platform/include/list.h"
24 #include "platform/include/semaphore.h"
25 #include "platform/include/platform_def.h"
26
27 typedef struct Reactor {
28 int epollFd;
29 int stopFd;
30 bool isRunning;
31 bool itemRemoved;
32 pthread_t threadId;
33 List *movedItems;
34 Mutex *apiMutex;
35 } ReactorInternal;
36
37 typedef struct ReactorItem {
38 int fd;
39 Mutex *lock;
40 Reactor *reactor;
41 void *context;
42 void (*onReadReady)(void *context);
43 void (*onWriteReady)(void *context);
44 } ReactorItemInternal;
45
46 const int MAXEPOLLEVENTS = 64;
47
DataCmp(void * data1,void * data2)48 static inline bool DataCmp(void *data1, void *data2)
49 {
50 if (data1 != data2) {
51 return false;
52 }
53 return true;
54 }
55
ReactorSetThreadId(Reactor * reactor,unsigned long threadId)56 void ReactorSetThreadId(Reactor *reactor, unsigned long threadId)
57 {
58 reactor->threadId = (pthread_t)threadId;
59 }
60
ReactorCreate()61 Reactor *ReactorCreate()
62 {
63 Reactor *reactor = (Reactor *)calloc(1, sizeof(Reactor));
64 reactor->epollFd = -1;
65 reactor->stopFd = -1;
66
67 int epollFd = epoll_create1(EPOLL_CLOEXEC);
68 if (epollFd == -1) {
69 LOG_ERROR("ReatorCreate: epoll create failed, error no: %{public}d.", errno);
70 goto ERROR;
71 }
72
73 int stopFd = eventfd(0, 0);
74 if (stopFd == -1) {
75 LOG_ERROR("ReatorCreate: eventfd failed, error no: %{public}d.", errno);
76 goto ERROR;
77 }
78
79 struct epoll_event event = {0};
80 event.data.ptr = NULL;
81 event.events = EPOLLIN;
82
83 if (epoll_ctl(epollFd, EPOLL_CTL_ADD, stopFd, &event) == -1) {
84 LOG_ERROR("ReatorCreate: epoll_ctl ADD-Option failed, error no: %{public}d.", errno);
85 goto ERROR;
86 }
87
88 reactor->movedItems = ListCreate(NULL);
89 if (reactor->movedItems == NULL) {
90 goto ERROR;
91 }
92 reactor->apiMutex = MutexCreate();
93 if (reactor->apiMutex == NULL) {
94 goto ERROR;
95 }
96 reactor->epollFd = epollFd;
97 reactor->stopFd = stopFd;
98
99 return reactor;
100
101 ERROR:
102 ReactorDelete(reactor);
103 return NULL;
104 }
105
ReactorDelete(Reactor * reactor)106 void ReactorDelete(Reactor *reactor)
107 {
108 if (reactor == NULL) {
109 return;
110 }
111
112 MutexDelete(reactor->apiMutex);
113 ListDelete(reactor->movedItems);
114 close(reactor->stopFd);
115 close(reactor->epollFd);
116 free(reactor);
117 }
118
ReactorStart(Reactor * reactor)119 int32_t ReactorStart(Reactor *reactor)
120 {
121 ASSERT(reactor);
122
123 reactor->isRunning = true;
124
125 struct epoll_event events[MAXEPOLLEVENTS];
126 for (;;) {
127 MutexLock(reactor->apiMutex);
128 ListClear(reactor->movedItems);
129 MutexUnlock(reactor->apiMutex);
130
131 int nfds;
132 CHECK_EXCEPT_INTR(nfds = epoll_wait(reactor->epollFd, events, MAXEPOLLEVENTS, -1));
133 if (nfds == -1) {
134 reactor->isRunning = false;
135 LOG_ERROR("ReactorStart: epoll_wait failed, error no: %{public}d.", errno);
136 return -1;
137 }
138
139 for (int i = 0; i < nfds; ++i) {
140 if (events[i].data.ptr == NULL) {
141 eventfd_t val;
142 eventfd_read(reactor->stopFd, &val);
143 reactor->isRunning = false;
144 return 0;
145 }
146
147 ReactorItem *item = (ReactorItem *)events[i].data.ptr;
148
149 MutexLock(reactor->apiMutex);
150 if (ListForEachData(reactor->movedItems, DataCmp, item) != NULL) {
151 MutexUnlock(reactor->apiMutex);
152 continue;
153 }
154
155 MutexLock(item->lock);
156 MutexUnlock(reactor->apiMutex);
157 reactor->itemRemoved = false;
158 if ((events[i].events & (EPOLLIN | EPOLLRDHUP)) && (item->onReadReady != NULL)) {
159 item->onReadReady(item->context);
160 }
161 if ((events[i].events & EPOLLOUT) && (item->onWriteReady != NULL) && (!reactor->itemRemoved)) {
162 item->onWriteReady(item->context);
163 }
164
165 MutexUnlock(item->lock);
166
167 if (reactor->itemRemoved) {
168 free(item->lock);
169 free(item);
170 }
171 }
172 }
173 }
174
ReactorStop(const Reactor * reactor)175 void ReactorStop(const Reactor *reactor)
176 {
177 ASSERT(reactor);
178 eventfd_write(reactor->stopFd, 1);
179 }
180
ReactorRegister(Reactor * reactor,int fd,void * context,void (* onReadReady)(void * context),void (* onWriteReady)(void * context))181 ReactorItem *ReactorRegister(
182 Reactor *reactor, int fd, void *context, void (*onReadReady)(void *context), void (*onWriteReady)(void *context))
183 {
184 ASSERT(reactor);
185
186 ReactorItem *item = (ReactorItem *)calloc(1, (sizeof(ReactorItem)));
187
188 item->lock = MutexCreate();
189 if (item->lock == NULL) {
190 goto ERROR;
191 }
192
193 item->fd = fd;
194 item->context = context;
195 item->reactor = reactor;
196 item->onReadReady = onReadReady;
197 item->onWriteReady = onWriteReady;
198
199 struct epoll_event event = {0};
200 event.data.ptr = item;
201 if (onReadReady != NULL) {
202 event.events |= (EPOLLIN | EPOLLRDHUP);
203 }
204 if (onWriteReady != NULL) {
205 event.events |= EPOLLOUT;
206 }
207
208 if (epoll_ctl(reactor->epollFd, EPOLL_CTL_ADD, item->fd, &event) == -1) {
209 goto ERROR;
210 }
211
212 return item;
213
214 ERROR:
215 if (item != NULL) {
216 MutexDelete(item->lock);
217 free(item);
218 }
219
220 return NULL;
221 }
222
ReactorUnregister(ReactorItem * item)223 void ReactorUnregister(ReactorItem *item)
224 {
225 ASSERT(item);
226
227 struct epoll_event event = {0};
228 if (epoll_ctl(item->reactor->epollFd, EPOLL_CTL_DEL, item->fd, &event) != 0) {
229 LOG_ERROR("ReactorUnregister: epoll_ctl delete-option failed, error no: %{public}d.", errno);
230 }
231
232 if (pthread_equal(item->reactor->threadId, pthread_self())) {
233 if (item->reactor->isRunning) {
234 item->reactor->itemRemoved = true;
235 return;
236 }
237 }
238
239 free(item->lock);
240 free(item);
241 }