1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "platform/include/reactor.h"
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <pthread.h>
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include "platform/include/mutex.h"
23 #include "platform/include/list.h"
24 #include "platform/include/semaphore.h"
25 #include "platform/include/platform_def.h"
26 
27 typedef struct Reactor {
28     int epollFd;
29     int stopFd;
30     bool isRunning;
31     bool itemRemoved;
32     pthread_t threadId;
33     List *movedItems;
34     Mutex *apiMutex;
35 } ReactorInternal;
36 
37 typedef struct ReactorItem {
38     int fd;
39     Mutex *lock;
40     Reactor *reactor;
41     void *context;
42     void (*onReadReady)(void *context);
43     void (*onWriteReady)(void *context);
44 } ReactorItemInternal;
45 
46 const int MAXEPOLLEVENTS = 64;
47 
DataCmp(void * data1,void * data2)48 static inline bool DataCmp(void *data1, void *data2)
49 {
50     if (data1 != data2) {
51         return false;
52     }
53     return true;
54 }
55 
ReactorSetThreadId(Reactor * reactor,unsigned long threadId)56 void ReactorSetThreadId(Reactor *reactor, unsigned long threadId)
57 {
58     reactor->threadId = (pthread_t)threadId;
59 }
60 
ReactorCreate()61 Reactor *ReactorCreate()
62 {
63     Reactor *reactor = (Reactor *)calloc(1, sizeof(Reactor));
64     reactor->epollFd = -1;
65     reactor->stopFd = -1;
66 
67     int epollFd = epoll_create1(EPOLL_CLOEXEC);
68     if (epollFd == -1) {
69         LOG_ERROR("ReatorCreate: epoll create failed, error no: %{public}d.", errno);
70         goto ERROR;
71     }
72 
73     int stopFd = eventfd(0, 0);
74     if (stopFd == -1) {
75         LOG_ERROR("ReatorCreate: eventfd failed, error no: %{public}d.", errno);
76         goto ERROR;
77     }
78 
79     struct epoll_event event = {0};
80     event.data.ptr = NULL;
81     event.events = EPOLLIN;
82 
83     if (epoll_ctl(epollFd, EPOLL_CTL_ADD, stopFd, &event) == -1) {
84         LOG_ERROR("ReatorCreate: epoll_ctl ADD-Option failed, error no: %{public}d.", errno);
85         goto ERROR;
86     }
87 
88     reactor->movedItems = ListCreate(NULL);
89     if (reactor->movedItems == NULL) {
90         goto ERROR;
91     }
92     reactor->apiMutex = MutexCreate();
93     if (reactor->apiMutex == NULL) {
94         goto ERROR;
95     }
96     reactor->epollFd = epollFd;
97     reactor->stopFd = stopFd;
98 
99     return reactor;
100 
101 ERROR:
102     ReactorDelete(reactor);
103     return NULL;
104 }
105 
ReactorDelete(Reactor * reactor)106 void ReactorDelete(Reactor *reactor)
107 {
108     if (reactor == NULL) {
109         return;
110     }
111 
112     MutexDelete(reactor->apiMutex);
113     ListDelete(reactor->movedItems);
114     close(reactor->stopFd);
115     close(reactor->epollFd);
116     free(reactor);
117 }
118 
ReactorStart(Reactor * reactor)119 int32_t ReactorStart(Reactor *reactor)
120 {
121     ASSERT(reactor);
122 
123     reactor->isRunning = true;
124 
125     struct epoll_event events[MAXEPOLLEVENTS];
126     for (;;) {
127         MutexLock(reactor->apiMutex);
128         ListClear(reactor->movedItems);
129         MutexUnlock(reactor->apiMutex);
130 
131         int nfds;
132         CHECK_EXCEPT_INTR(nfds = epoll_wait(reactor->epollFd, events, MAXEPOLLEVENTS, -1));
133         if (nfds == -1) {
134             reactor->isRunning = false;
135             LOG_ERROR("ReactorStart: epoll_wait failed, error no: %{public}d.", errno);
136             return -1;
137         }
138 
139         for (int i = 0; i < nfds; ++i) {
140             if (events[i].data.ptr == NULL) {
141                 eventfd_t val;
142                 eventfd_read(reactor->stopFd, &val);
143                 reactor->isRunning = false;
144                 return 0;
145             }
146 
147             ReactorItem *item = (ReactorItem *)events[i].data.ptr;
148 
149             MutexLock(reactor->apiMutex);
150             if (ListForEachData(reactor->movedItems, DataCmp, item) != NULL) {
151                 MutexUnlock(reactor->apiMutex);
152                 continue;
153             }
154 
155             MutexLock(item->lock);
156             MutexUnlock(reactor->apiMutex);
157             reactor->itemRemoved = false;
158             if ((events[i].events & (EPOLLIN | EPOLLRDHUP)) && (item->onReadReady != NULL)) {
159                 item->onReadReady(item->context);
160             }
161             if ((events[i].events & EPOLLOUT) && (item->onWriteReady != NULL) && (!reactor->itemRemoved)) {
162                 item->onWriteReady(item->context);
163             }
164 
165             MutexUnlock(item->lock);
166 
167             if (reactor->itemRemoved) {
168                 free(item->lock);
169                 free(item);
170             }
171         }
172     }
173 }
174 
ReactorStop(const Reactor * reactor)175 void ReactorStop(const Reactor *reactor)
176 {
177     ASSERT(reactor);
178     eventfd_write(reactor->stopFd, 1);
179 }
180 
ReactorRegister(Reactor * reactor,int fd,void * context,void (* onReadReady)(void * context),void (* onWriteReady)(void * context))181 ReactorItem *ReactorRegister(
182     Reactor *reactor, int fd, void *context, void (*onReadReady)(void *context), void (*onWriteReady)(void *context))
183 {
184     ASSERT(reactor);
185 
186     ReactorItem *item = (ReactorItem *)calloc(1, (sizeof(ReactorItem)));
187 
188     item->lock = MutexCreate();
189     if (item->lock == NULL) {
190         goto ERROR;
191     }
192 
193     item->fd = fd;
194     item->context = context;
195     item->reactor = reactor;
196     item->onReadReady = onReadReady;
197     item->onWriteReady = onWriteReady;
198 
199     struct epoll_event event = {0};
200     event.data.ptr = item;
201     if (onReadReady != NULL) {
202         event.events |= (EPOLLIN | EPOLLRDHUP);
203     }
204     if (onWriteReady != NULL) {
205         event.events |= EPOLLOUT;
206     }
207 
208     if (epoll_ctl(reactor->epollFd, EPOLL_CTL_ADD, item->fd, &event) == -1) {
209         goto ERROR;
210     }
211 
212     return item;
213 
214 ERROR:
215     if (item != NULL) {
216         MutexDelete(item->lock);
217         free(item);
218     }
219 
220     return NULL;
221 }
222 
ReactorUnregister(ReactorItem * item)223 void ReactorUnregister(ReactorItem *item)
224 {
225     ASSERT(item);
226 
227     struct epoll_event event = {0};
228     if (epoll_ctl(item->reactor->epollFd, EPOLL_CTL_DEL, item->fd, &event) != 0) {
229         LOG_ERROR("ReactorUnregister: epoll_ctl delete-option failed, error no: %{public}d.", errno);
230     }
231 
232     if (pthread_equal(item->reactor->threadId, pthread_self())) {
233         if (item->reactor->isRunning) {
234             item->reactor->itemRemoved = true;
235             return;
236         }
237     }
238 
239     free(item->lock);
240     free(item);
241 }