1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::os::raw::c_int;
16 use std::sync::atomic::{AtomicUsize, Ordering};
17 use std::time::Duration;
18 
19 use crate::{EventTrait, Interest, Token};
20 
21 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
22 
23 /// An wrapper for different OS polling system.
24 /// Linux: epoll
25 /// Windows: iocp
26 /// macos: kqueue
27 pub struct Selector {
28     // selector id
29     id: usize,
30     // epoll fd
31     ep: i32,
32 }
33 
34 impl Selector {
35     /// Creates a new Selector.
36     ///
37     /// # Error
38     /// If the underlying syscall fails, returns the corresponding error.
new() -> io::Result<Selector>39     pub fn new() -> io::Result<Selector> {
40         let ep = match syscall!(epoll_create1(libc::EPOLL_CLOEXEC)) {
41             Ok(ep_sys) => ep_sys,
42             Err(err) => {
43                 return Err(err);
44             }
45         };
46 
47         Ok(Selector {
48             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
49             ep,
50         })
51     }
52 
53     /// Waits for io events to come within a time limit.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>54     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
55         // Convert to milliseconds, if input time is none, it means the timeout is -1
56         // and wait permanently.
57         let timeout = timeout.map(|time| time.as_millis() as c_int).unwrap_or(-1);
58 
59         events.clear();
60 
61         match syscall!(epoll_wait(
62             self.ep,
63             events.as_mut_ptr(),
64             events.capacity() as i32,
65             timeout
66         )) {
67             Ok(n_events) => unsafe { events.set_len(n_events as usize) },
68             Err(err) => {
69                 return Err(err);
70             }
71         }
72         Ok(())
73     }
74 
75     /// Registers the fd with specific interested events
register(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>76     pub fn register(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
77         let mut sys_event = libc::epoll_event {
78             events: interests.into_io_event(),
79             u64: usize::from(token) as u64,
80         };
81 
82         match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut sys_event)) {
83             Ok(_) => Ok(()),
84             Err(err) => Err(err),
85         }
86     }
87 
88     /// Re-registers the fd with specific interested events
reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>89     pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
90         let mut sys_event = libc::epoll_event {
91             events: interests.into_io_event(),
92             u64: usize::from(token) as u64,
93         };
94 
95         match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut sys_event)) {
96             Ok(_) => Ok(()),
97             Err(err) => Err(err),
98         }
99     }
100 
101     /// De-registers the fd.
deregister(&self, fd: i32) -> io::Result<()>102     pub fn deregister(&self, fd: i32) -> io::Result<()> {
103         match syscall!(epoll_ctl(
104             self.ep,
105             libc::EPOLL_CTL_DEL,
106             fd,
107             std::ptr::null_mut()
108         )) {
109             Ok(_) => Ok(()),
110             Err(err) => Err(err),
111         }
112     }
113 }
114 
115 impl Drop for Selector {
drop(&mut self)116     fn drop(&mut self) {
117         if let Err(_err) = syscall!(close(self.ep)) {
118             // todo: log the error
119         }
120     }
121 }
122 
123 impl std::fmt::Debug for Selector {
fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result124     fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125         write!(fmt, "epoll fd: {}, Select id: {}", self.ep, self.id)
126     }
127 }
128 
129 /// A vector of events
130 pub type Events = Vec<Event>;
131 
132 /// An io event
133 pub type Event = libc::epoll_event;
134 
135 impl EventTrait for Event {
token(&self) -> Token136     fn token(&self) -> Token {
137         Token(self.u64 as usize)
138     }
139 
is_readable(&self) -> bool140     fn is_readable(&self) -> bool {
141         (self.events as libc::c_int & libc::EPOLLIN) != 0
142             || (self.events as libc::c_int & libc::EPOLLPRI) != 0
143     }
144 
is_writable(&self) -> bool145     fn is_writable(&self) -> bool {
146         (self.events as libc::c_int & libc::EPOLLOUT) != 0
147     }
148 
is_read_closed(&self) -> bool149     fn is_read_closed(&self) -> bool {
150         self.events as libc::c_int & libc::EPOLLHUP != 0
151             || (self.events as libc::c_int & libc::EPOLLIN != 0
152                 && self.events as libc::c_int & libc::EPOLLRDHUP != 0)
153     }
154 
is_write_closed(&self) -> bool155     fn is_write_closed(&self) -> bool {
156         self.events as libc::c_int & libc::EPOLLHUP != 0
157             || (self.events as libc::c_int & libc::EPOLLOUT != 0
158                 && self.events as libc::c_int & libc::EPOLLERR != 0)
159             || self.events as libc::c_int == libc::EPOLLERR
160     }
161 
is_error(&self) -> bool162     fn is_error(&self) -> bool {
163         (self.events as libc::c_int & libc::EPOLLERR) != 0
164     }
165 }
166 
167 #[cfg(test)]
168 mod test {
169     use crate::sys::socket;
170     use crate::{Event, EventTrait, Interest, Selector, Token};
171 
172     /// UT cases for `Selector::reregister`.
173     ///
174     /// # Brief
175     /// 1. Create a Selector
176     /// 2. Reregister the selector
177     #[test]
ut_epoll_reregister()178     fn ut_epoll_reregister() {
179         let selector = Selector::new().unwrap();
180         let sock = socket::socket_new(libc::AF_UNIX, libc::SOCK_STREAM).unwrap();
181         let ret = selector.register(sock, Token::from_usize(0), Interest::READABLE);
182         assert!(ret.is_ok());
183         let ret = selector.reregister(sock, Token::from_usize(0), Interest::WRITABLE);
184         assert!(ret.is_ok());
185     }
186 
187     /// UT case for `Event::is_error`
188     ///
189     /// # Brief
190     /// 1. Create an event from libc::EPOLLERR
191     /// 2. Check if it's an error
192     #[test]
ut_event_is_err()193     fn ut_event_is_err() {
194         let event = Event {
195             events: libc::EPOLLERR as u32,
196             u64: 0,
197         };
198         assert!(event.is_error());
199     }
200 }
201