1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 //! Implementation of epoll event loop.
17 
18 #![allow(dead_code)]
19 #![allow(unused_variables)]
20 
21 use std::collections::BTreeMap;
22 use std::ffi::{ c_void, c_char, c_int, CString };
23 use std::future::Future;
24 use std::io::Error;
25 use std::os::fd::RawFd;
26 use std::pin::Pin;
27 use std::sync::{ Arc, Mutex };
28 use std::sync::atomic::{ AtomicBool, Ordering };
29 use std::task::{ Context, Poll, Waker };
30 use fusion_utils_rust::{ call_debug_enter, FusionErrorCode, FusionResult };
31 use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
32 
33 /// Indicating data other than high-priority data can be read.
34 pub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32;
35 const LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32;
36 /// Indicating an error has occurred.
37 pub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32;
38 /// Indicating a hangup has occurred.
39 pub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32;
40 const LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP;
41 const LIBC_EPOLLNONE: u32 = 0;
42 const MAX_EPOLL_EVENTS: c_int = 128;
43 const EPOLL_SUCCESS: c_int = 0;
44 const EPOLL_FAILURE: c_int = -1;
45 const NO_TIMEOUT: c_int = -1;
46 const SYSTEM_IO_FAILURE: libc::ssize_t = -1;
47 const INVALID_FD: RawFd = -1;
48 const LOG_LABEL: HiLogLabel = HiLogLabel {
49     log_type: LogType::LogCore,
50     domain: 0xD002220,
51     tag: "Scheduler",
52 };
53 
54 /// Abstraction of epoll handler.
55 pub trait IEpollHandler: Send + Sync {
56     /// Return file descriptor of this epoll handler.
fd(&self) -> RawFd57     fn fd(&self) -> RawFd;
58     /// Dispatch epoll events to this epoll handler.
dispatch(&self, events: u32)59     fn dispatch(&self, events: u32);
60 }
61 
62 struct EpollEvent {
63     fd: RawFd,
64     events: u32,
65 }
66 
67 struct EpollHandler {
68     raw: Arc<dyn IEpollHandler>,
69     handle: ylong_runtime::task::JoinHandle<()>,
70     waker: Option<Waker>,
71     events: u32,
72 }
73 
74 impl EpollHandler {
new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self75     fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self
76     {
77         Self {
78             raw,
79             handle,
80             waker: Default::default(),
81             events: Default::default(),
82         }
83     }
84 
85     #[inline]
fd(&self) -> RawFd86     fn fd(&self) -> RawFd
87     {
88         self.raw.fd()
89     }
90 
91     #[inline]
raw_handler(&self) -> Arc<dyn IEpollHandler>92     fn raw_handler(&self) -> Arc<dyn IEpollHandler>
93     {
94         self.raw.clone()
95     }
96 
97     #[inline]
set_waker(&mut self, waker: &Waker)98     fn set_waker(&mut self, waker: &Waker)
99     {
100         self.waker.replace(waker.clone());
101     }
102 
103     #[inline]
take_events(&mut self) -> u32104     fn take_events(&mut self) -> u32
105     {
106         let events = self.events;
107         self.events = Default::default();
108         events
109     }
110 }
111 
112 impl Drop for EpollHandler {
drop(&mut self)113     fn drop(&mut self)
114     {
115         self.handle.cancel();
116     }
117 }
118 
119 /// `Driver` encapsulate event loop of epoll.
120 struct Driver {
121     epoll: Arc<Epoll>,
122     is_running: Arc<AtomicBool>,
123 }
124 
125 impl Driver {
new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self126     fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self
127     {
128         Self { epoll, is_running }
129     }
130 
131     #[inline]
is_running(&self) -> bool132     fn is_running(&self) -> bool
133     {
134         self.is_running.load(Ordering::Relaxed)
135     }
136 
run(&self)137     fn run(&self)
138     {
139         call_debug_enter!("Driver::run");
140         while self.is_running() {
141             if let Some(epoll_events) = self.epoll.epoll_wait() {
142                 if !self.is_running() {
143                     info!(LOG_LABEL, "Driver stopped running");
144                     break;
145                 }
146                 self.epoll.wake(&epoll_events);
147             }
148         }
149     }
150 }
151 
152 struct Epoll {
153     epoll_fd: RawFd,
154     handlers: Mutex<BTreeMap<RawFd, EpollHandler>>,
155 }
156 
157 impl Epoll {
new() -> Self158     fn new() -> Self
159     {
160         // SAFETY:
161         // The epoll API is multi-thread safe.
162         // This is a normal system call, no safety pitfall.
163         let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
164         assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error());
165         Self {
166             epoll_fd,
167             handlers: Mutex::default(),
168         }
169     }
170 
171     #[inline]
fd(&self) -> RawFd172     fn fd(&self) -> RawFd
173     {
174         self.epoll_fd
175     }
176 
epoll_add(&self, fd: RawFd) -> FusionResult<()>177     fn epoll_add(&self, fd: RawFd) -> FusionResult<()>
178     {
179         call_debug_enter!("Epoll::epoll_add");
180         let mut ev = libc::epoll_event {
181             events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
182             u64: fd as u64,
183         };
184         // SAFETY:
185         // The epoll API is multi-thread safe.
186         // We have carefully ensure that parameters are as required by system interface.
187         let ret = unsafe {
188             libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev)
189         };
190         if ret != EPOLL_SUCCESS {
191             error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}",
192                    @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
193             Err(FusionErrorCode::Fail)
194         } else {
195             Ok(())
196         }
197     }
198 
epoll_del(&self, fd: RawFd) -> FusionResult<()>199     fn epoll_del(&self, fd: RawFd) -> FusionResult<()>
200     {
201         call_debug_enter!("Epoll::epoll_del");
202         // SAFETY:
203         // The epoll API is multi-thread safe.
204         // We have carefully ensure that parameters are as required by system interface.
205         let ret = unsafe {
206             libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut())
207         };
208         if ret != EPOLL_SUCCESS {
209             error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}",
210                    @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
211             Err(FusionErrorCode::Fail)
212         } else {
213             Ok(())
214         }
215     }
216 
epoll_wait(&self) -> Option<Vec<EpollEvent>>217     fn epoll_wait(&self) -> Option<Vec<EpollEvent>>
218     {
219         call_debug_enter!("Epoll::epoll_wait");
220         let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize);
221         // SAFETY:
222         // The epoll API is multi-thread safe.
223         // We have carefully ensure that parameters are as required by system interface.
224         let ret = unsafe {
225             libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT)
226         };
227         if ret < 0 {
228             error!(LOG_LABEL, "epoll_wait({}) fail: {:?}",
229                    @public(self.epoll_fd),
230                    @public(Error::last_os_error()));
231             return None;
232         }
233         let num_of_events = ret as usize;
234         // SAFETY:
235         // `epoll_wait` returns the number of events and promise it is within the limit of
236         // `MAX_EPOLL_EVENTS`.
237         let epoll_events = unsafe {
238             std::slice::from_raw_parts(events.as_ptr(), num_of_events)
239         };
240         let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| {
241             EpollEvent {
242                 fd: e.u64 as RawFd,
243                 events: e.events,
244             }
245         }).collect();
246         Some(epoll_events)
247     }
248 
epoll_reset(&self, fd: RawFd) -> FusionResult<()>249     fn epoll_reset(&self, fd: RawFd) -> FusionResult<()>
250     {
251         call_debug_enter!("Epoll::epoll_reset");
252         let mut ev = libc::epoll_event {
253             events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR,
254             u64: fd as u64,
255         };
256         // SAFETY:
257         // The epoll API is multi-thread safe.
258         // We have carefully ensure that parameters are as required by system interface.
259         let ret = unsafe {
260             libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev)
261         };
262         if ret != EPOLL_SUCCESS {
263             error!(LOG_LABEL, "In reset_fd, epoll_ctl_mod({},{}) fail: {:?}",
264                    @public(self.epoll_fd), @public(fd), @public(Error::last_os_error()));
265             Err(FusionErrorCode::Fail)
266         } else {
267             Ok(())
268         }
269     }
270 
add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler) -> FusionResult<Arc<dyn IEpollHandler>>271     fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler)
272         -> FusionResult<Arc<dyn IEpollHandler>>
273     {
274         call_debug_enter!("Epoll::add_epoll_handler");
275         let mut guard = self.handlers.lock().unwrap();
276         if guard.contains_key(&fd) {
277             error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd));
278             return Err(FusionErrorCode::Fail);
279         }
280         debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd));
281         let raw = epoll_handler.raw_handler();
282         guard.insert(fd, epoll_handler);
283         let _ = self.epoll_add(fd);
284         Ok(raw)
285     }
286 
remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>>287     fn remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>>
288     {
289         call_debug_enter!("Epoll::remove_epoll_handler");
290         let mut guard = self.handlers.lock().unwrap();
291         let _ = self.epoll_del(fd);
292         if let Some(h) = guard.remove(&fd) {
293             debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd));
294             Ok(h.raw_handler())
295         } else {
296             error!(LOG_LABEL, "No epoll handler ({})", @public(fd));
297             Err(FusionErrorCode::Fail)
298         }
299     }
300 
wake(&self, events: &[EpollEvent])301     fn wake(&self, events: &[EpollEvent])
302     {
303         call_debug_enter!("Epoll::wake");
304         let mut guard = self.handlers.lock().unwrap();
305         for e in events {
306             if let Some(handler) = guard.get_mut(&e.fd) {
307                 debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd));
308                 handler.events = e.events;
309                 if let Some(waker) = &handler.waker {
310                     waker.wake_by_ref();
311                 }
312             } else {
313                 error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd));
314             }
315         }
316     }
317 
dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>318     fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>
319     {
320         call_debug_enter!("Epoll::dispatch_inner");
321         let mut guard = self.handlers.lock().unwrap();
322         if let Some(handler) = guard.get_mut(&fd) {
323             handler.set_waker(waker);
324             let events = handler.take_events() & LIBC_EPOLLALL;
325             if events != LIBC_EPOLLNONE {
326                 Some((handler.raw_handler(), events))
327             } else {
328                 debug!(LOG_LABEL, "No epoll event");
329                 None
330             }
331         } else {
332             error!(LOG_LABEL, "No epoll handler with ({})", @public(fd));
333             None
334         }
335     }
336 
dispatch(&self, fd: RawFd, waker: &Waker)337     fn dispatch(&self, fd: RawFd, waker: &Waker)
338     {
339         call_debug_enter!("Epoll::dispatch");
340         if let Some((handler, events)) = self.dispatch_inner(fd, waker) {
341             handler.dispatch(events);
342             let _ = self.epoll_reset(fd);
343         }
344     }
345 }
346 
347 impl Default for Epoll {
default() -> Self348     fn default() -> Self
349     {
350         Self::new()
351     }
352 }
353 
354 impl Drop for Epoll {
drop(&mut self)355     fn drop(&mut self)
356     {
357         // SAFETY:
358         // Parameter is as required by system, so consider it safe here.
359         let ret = unsafe { libc::close(self.epoll_fd) };
360         if ret != 0 {
361             error!(LOG_LABEL, "close({}) fail: {:?}",
362                    @public(self.epoll_fd),
363                    @public(Error::last_os_error()));
364         }
365     }
366 }
367 
368 struct EpollHandlerFuture {
369     fd: RawFd,
370     epoll: Arc<Epoll>,
371 }
372 
373 impl EpollHandlerFuture {
new(fd: RawFd, epoll: Arc<Epoll>) -> Self374     fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self
375     {
376         Self { fd, epoll }
377     }
378 }
379 
380 impl Future for EpollHandlerFuture {
381     type Output = ();
382 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>383     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
384     {
385         call_debug_enter!("EpollHandlerFuture::poll");
386         self.epoll.dispatch(self.fd, cx.waker());
387         Poll::Pending
388     }
389 }
390 
391 struct EpollWaker {
392     fds: [RawFd; 2],
393 }
394 
395 impl EpollWaker {
new() -> Self396     fn new() -> Self
397     {
398         let mut fds: [c_int; 2] = [-1; 2];
399         // SAFETY:
400         // The pipe API is multi-thread safe.
401         // We have carefully checked that parameters are as required by system interface.
402         let ret = unsafe {
403             libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK)
404         };
405         if ret != 0 {
406             error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error()));
407         }
408         Self { fds }
409     }
410 
wake(&self)411     fn wake(&self)
412     {
413         call_debug_enter!("EpollWaker::wake");
414         let data: i32 = 0;
415         // SAFETY:
416         // We have carefully checked that parameters are as required by system interface.
417         let ret = unsafe {
418             libc::write(self.fds[1],
419                         std::ptr::addr_of!(data) as *const c_void,
420                         std::mem::size_of_val(&data))
421         };
422         if ret == SYSTEM_IO_FAILURE {
423             error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error()));
424         }
425     }
426 }
427 
428 impl IEpollHandler for EpollWaker {
fd(&self) -> RawFd429     fn fd(&self) -> RawFd
430     {
431         self.fds[0]
432     }
433 
dispatch(&self, events: u32)434     fn dispatch(&self, events: u32)
435     {
436         if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
437             let data: i32 = 0;
438             // SAFETY:
439             // Parameters are as required by system and business logic, so it can be trusted.
440             let ret = unsafe {
441                 libc::read(self.fd(),
442                            std::ptr::addr_of!(data) as *mut c_void,
443                            std::mem::size_of_val(&data))
444             };
445             if ret == SYSTEM_IO_FAILURE {
446                 error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error()));
447             }
448         }
449     }
450 }
451 
452 impl Default for EpollWaker {
default() -> Self453     fn default() -> Self
454     {
455         Self::new()
456     }
457 }
458 
459 impl Drop for EpollWaker {
drop(&mut self)460     fn drop(&mut self)
461     {
462         for fd in &mut self.fds {
463             if *fd != INVALID_FD {
464                 // SAFETY:
465                 // Parameter is as required by system, so consider it safe here.
466                 let ret = unsafe { libc::close(*fd) };
467                 if ret != EPOLL_SUCCESS {
468                     error!(LOG_LABEL, "close({}) fail: {:?}",
469                            @public(*fd),
470                            @public(Error::last_os_error()));
471                 }
472             }
473         }
474     }
475 }
476 
477 /// Bookkeeping of epoll handling.
478 pub struct Scheduler {
479     epoll: Arc<Epoll>,
480     epoll_waker: Arc<EpollWaker>,
481     is_running: Arc<AtomicBool>,
482     join_handle: Option<std::thread::JoinHandle<()>>,
483 }
484 
485 impl Scheduler {
new() -> Self486     pub(crate) fn new() -> Self
487     {
488         call_debug_enter!("Scheduler::new");
489         let epoll: Arc<Epoll> = Arc::default();
490         let is_running = Arc::new(AtomicBool::new(true));
491         let driver = Driver::new(epoll.clone(), is_running.clone());
492         let join_handle = std::thread::spawn(move || {
493             driver.run();
494         });
495         let scheduler = Self {
496             epoll,
497             epoll_waker: Arc::default(),
498             is_running,
499             join_handle: Some(join_handle),
500         };
501         let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone());
502         scheduler
503     }
504 
add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>505     pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
506         -> FusionResult<Arc<dyn IEpollHandler>>
507     {
508         call_debug_enter!("Scheduler::add_epoll_handler");
509         let fd: RawFd = handler.fd();
510         let join_handle = ylong_runtime::spawn(
511             EpollHandlerFuture::new(fd, self.epoll.clone())
512         );
513         self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle))
514     }
515 
remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>516     pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
517         -> FusionResult<Arc<dyn IEpollHandler>>
518     {
519         call_debug_enter!("Scheduler::remove_epoll_handler");
520         self.epoll.remove_epoll_handler(handler.fd())
521     }
522 }
523 
524 impl Default for Scheduler {
default() -> Self525     fn default() -> Self
526     {
527         Self::new()
528     }
529 }
530 
531 impl Drop for Scheduler {
drop(&mut self)532     fn drop(&mut self)
533     {
534         call_debug_enter!("Scheduler::drop");
535         self.is_running.store(false, Ordering::Relaxed);
536         self.epoll_waker.wake();
537         if let Some(join_handle) = self.join_handle.take() {
538             let _ = join_handle.join();
539         }
540     }
541 }
542