1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::ffi::c_void;
15 use std::ops::{Deref, DerefMut};
16 use std::os::unix::io::RawFd;
17 use std::time::Duration;
18 use std::{cmp, io, mem, ptr};
19 
20 use libc::{c_int, uintptr_t};
21 
22 use crate::{EventTrait, Interest, Token};
23 
24 /// An wrapper for different OS polling system.
25 /// Linux: epoll
26 /// Windows: iocp
27 /// macos: kqueue
28 #[derive(Debug)]
29 pub struct Selector {
30     kq: RawFd,
31 }
32 
33 impl Selector {
34     /// Creates a new Selector.
35     ///
36     /// # Error
37     /// If the underlying syscall fails, returns the corresponding error.
new() -> io::Result<Selector>38     pub fn new() -> io::Result<Selector> {
39         let kq = syscall!(kqueue())?;
40         // make sure the fd closed when child process executes
41         syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
42 
43         Ok(Selector { kq })
44     }
45 
46     /// Waits for io events to come within a time limit.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>47     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
48         events.clear();
49 
50         let timeout = timeout.map(|time| libc::timespec {
51             tv_sec: cmp::min(time.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
52             // the cast is safe cause c_long::max > nanoseconds per second
53             tv_nsec: libc::c_long::from(time.subsec_nanos() as i32),
54         });
55 
56         let timeout_ptr = match timeout.as_ref() {
57             Some(t) => t as *const libc::timespec,
58             None => ptr::null_mut(),
59         };
60 
61         let n_events = syscall!(kevent(
62             self.kq,
63             ptr::null(),
64             0,
65             events.as_mut_ptr(),
66             events.capacity() as c_int,
67             timeout_ptr,
68         ))?;
69         unsafe { events.set_len(n_events as usize) };
70 
71         Ok(())
72     }
73 
74     /// Registers the fd with specific interested events
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>75     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
76         let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
77         let mut events = Vec::with_capacity(2);
78         if interests.is_readable() {
79             let kevent = kevent_new(fd, libc::EVFILT_READ, flags, token.0);
80             events.push(kevent);
81         }
82 
83         if interests.is_writable() {
84             let kevent = kevent_new(fd, libc::EVFILT_WRITE, flags, token.0);
85             events.push(kevent);
86         }
87 
88         kevent_register(self.kq, events.as_mut_slice())?;
89         kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
90     }
91 
92     /// Re-registers the fd with specific interested events
reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>93     pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
94         let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
95         let mut events = Vec::with_capacity(2);
96 
97         let r_flags = match interests.is_readable() {
98             true => flags | libc::EV_ADD,
99             false => flags | libc::EV_DELETE,
100         };
101 
102         let w_flags = match interests.is_writable() {
103             true => flags | libc::EV_ADD,
104             false => flags | libc::EV_DELETE,
105         };
106 
107         events.push(kevent_new(fd, libc::EVFILT_READ, r_flags, token.0));
108         events.push(kevent_new(fd, libc::EVFILT_WRITE, w_flags, token.0));
109         kevent_register(self.kq, events.as_mut_slice())?;
110         kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
111     }
112 
113     /// De-registers the fd.
deregister(&self, fd: i32) -> io::Result<()>114     pub fn deregister(&self, fd: i32) -> io::Result<()> {
115         let flags = libc::EV_DELETE | libc::EV_RECEIPT;
116         let mut events = vec![
117             kevent_new(fd, libc::EVFILT_READ, flags, 0),
118             kevent_new(fd, libc::EVFILT_WRITE, flags, 0),
119         ];
120         kevent_register(self.kq, events.as_mut_slice())?;
121         kevent_check_error(events.as_mut_slice(), &[libc::ENOENT as i64])
122     }
123 
124     /// Try-clones the kqueue.
125     ///
126     /// If succeeds, returns a duplicate of the kqueue.
127     /// If fails, returns the last OS error.
try_clone(&self) -> io::Result<Selector>128     pub fn try_clone(&self) -> io::Result<Selector> {
129         const LOWEST_FD: c_int = 3;
130 
131         let kq = syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, LOWEST_FD))?;
132         Ok(Selector { kq })
133     }
134 
135     /// Allows the kqueue to accept user-space notifications. Should be called
136     /// before `Selector::wake`
register_waker(&self, token: Token) -> io::Result<()>137     pub fn register_waker(&self, token: Token) -> io::Result<()> {
138         let event = kevent_new(
139             0,
140             libc::EVFILT_USER,
141             libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
142             token.0,
143         );
144 
145         self.kevent_notify(event)
146     }
147 
148     /// Sends a notification to wakeup the kqueue. Should be called after
149     /// `Selector::register_waker`.
wake(&self, token: Token) -> io::Result<()>150     pub fn wake(&self, token: Token) -> io::Result<()> {
151         let mut event = kevent_new(
152             0,
153             libc::EVFILT_USER,
154             libc::EV_ADD | libc::EV_RECEIPT,
155             token.0,
156         );
157         event.fflags = libc::NOTE_TRIGGER;
158         self.kevent_notify(event)
159     }
160 
161     #[inline]
kevent_notify(&self, mut event: Event) -> io::Result<()>162     fn kevent_notify(&self, mut event: Event) -> io::Result<()> {
163         syscall!(kevent(self.kq, &event, 1, &mut event, 1, ptr::null())).map(|_| {
164             if (event.flags & libc::EV_ERROR != 0) && event.data != 0 {
165                 Err(io::Error::from_raw_os_error(event.data as i32))
166             } else {
167                 Ok(())
168             }
169         })?
170     }
171 }
172 
173 #[inline]
kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()>174 fn kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()> {
175     match syscall!(kevent(
176         kq,
177         events.as_ptr(),
178         events.len() as c_int,
179         events.as_mut_ptr(),
180         events.len() as c_int,
181         ptr::null(),
182     )) {
183         Ok(_) => Ok(()),
184         Err(e) => {
185             if let Some(libc::EINTR) = e.raw_os_error() {
186                 Ok(())
187             } else {
188                 Err(e)
189             }
190         }
191     }
192 }
193 
194 // this function should be called right after register
kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()>195 fn kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()> {
196     for event in events {
197         let data = event.data as _;
198         if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored.contains(&data) {
199             return Err(io::Error::from_raw_os_error(data as i32));
200         }
201     }
202     Ok(())
203 }
204 
205 #[inline]
kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event206 fn kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event {
207     Event {
208         ident: ident as uintptr_t,
209         filter,
210         flags,
211         udata: udata as *mut c_void,
212         ..unsafe { mem::zeroed() }
213     }
214 }
215 
216 /// An io event
217 pub type Event = libc::kevent;
218 
219 /// A vector of events
220 pub struct Events(Vec<Event>);
221 
222 impl Events {
223     /// Initializes a vector of events with an initial capacity
with_capacity(capacity: usize) -> Self224     pub fn with_capacity(capacity: usize) -> Self {
225         Events(Vec::with_capacity(capacity))
226     }
227 }
228 
229 impl Deref for Events {
230     type Target = Vec<Event>;
231 
deref(&self) -> &Self::Target232     fn deref(&self) -> &Self::Target {
233         &self.0
234     }
235 }
236 
237 impl DerefMut for Events {
deref_mut(&mut self) -> &mut Self::Target238     fn deref_mut(&mut self) -> &mut Self::Target {
239         &mut self.0
240     }
241 }
242 
243 // kevent has a member `udata` which has type of `*mut c_void`, therefore it
244 // does not automatically derive Sync/Send.
245 unsafe impl Send for Events {}
246 unsafe impl Sync for Events {}
247 
248 impl EventTrait for Event {
token(&self) -> Token249     fn token(&self) -> Token {
250         Token(self.udata as usize)
251     }
252 
is_readable(&self) -> bool253     fn is_readable(&self) -> bool {
254         self.filter == libc::EVFILT_READ || self.filter == libc::EVFILT_USER
255     }
256 
is_writable(&self) -> bool257     fn is_writable(&self) -> bool {
258         self.filter == libc::EVFILT_WRITE
259     }
260 
is_read_closed(&self) -> bool261     fn is_read_closed(&self) -> bool {
262         self.filter == libc::EVFILT_READ && self.flags & libc::EV_EOF != 0
263     }
264 
is_write_closed(&self) -> bool265     fn is_write_closed(&self) -> bool {
266         self.filter == libc::EVFILT_WRITE && self.flags & libc::EV_EOF != 0
267     }
268 
is_error(&self) -> bool269     fn is_error(&self) -> bool {
270         (self.flags & libc::EV_ERROR) != 0 || ((self.flags & libc::EV_EOF) != 0 && self.fflags != 0)
271     }
272 }
273 
274 impl Drop for Selector {
drop(&mut self)275     fn drop(&mut self) {
276         if let Err(e) = syscall!(close(self.kq)) {
277             panic!("kqueue release failed: {e}");
278         }
279     }
280 }
281