1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::ffi::c_void;
15 use std::ops::{Deref, DerefMut};
16 use std::os::unix::io::RawFd;
17 use std::time::Duration;
18 use std::{cmp, io, mem, ptr};
19
20 use libc::{c_int, uintptr_t};
21
22 use crate::{EventTrait, Interest, Token};
23
24 /// An wrapper for different OS polling system.
25 /// Linux: epoll
26 /// Windows: iocp
27 /// macos: kqueue
28 #[derive(Debug)]
29 pub struct Selector {
30 kq: RawFd,
31 }
32
33 impl Selector {
34 /// Creates a new Selector.
35 ///
36 /// # Error
37 /// If the underlying syscall fails, returns the corresponding error.
new() -> io::Result<Selector>38 pub fn new() -> io::Result<Selector> {
39 let kq = syscall!(kqueue())?;
40 // make sure the fd closed when child process executes
41 syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?;
42
43 Ok(Selector { kq })
44 }
45
46 /// Waits for io events to come within a time limit.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>47 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
48 events.clear();
49
50 let timeout = timeout.map(|time| libc::timespec {
51 tv_sec: cmp::min(time.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
52 // the cast is safe cause c_long::max > nanoseconds per second
53 tv_nsec: libc::c_long::from(time.subsec_nanos() as i32),
54 });
55
56 let timeout_ptr = match timeout.as_ref() {
57 Some(t) => t as *const libc::timespec,
58 None => ptr::null_mut(),
59 };
60
61 let n_events = syscall!(kevent(
62 self.kq,
63 ptr::null(),
64 0,
65 events.as_mut_ptr(),
66 events.capacity() as c_int,
67 timeout_ptr,
68 ))?;
69 unsafe { events.set_len(n_events as usize) };
70
71 Ok(())
72 }
73
74 /// Registers the fd with specific interested events
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>75 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
76 let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
77 let mut events = Vec::with_capacity(2);
78 if interests.is_readable() {
79 let kevent = kevent_new(fd, libc::EVFILT_READ, flags, token.0);
80 events.push(kevent);
81 }
82
83 if interests.is_writable() {
84 let kevent = kevent_new(fd, libc::EVFILT_WRITE, flags, token.0);
85 events.push(kevent);
86 }
87
88 kevent_register(self.kq, events.as_mut_slice())?;
89 kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
90 }
91
92 /// Re-registers the fd with specific interested events
reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>93 pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> {
94 let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
95 let mut events = Vec::with_capacity(2);
96
97 let r_flags = match interests.is_readable() {
98 true => flags | libc::EV_ADD,
99 false => flags | libc::EV_DELETE,
100 };
101
102 let w_flags = match interests.is_writable() {
103 true => flags | libc::EV_ADD,
104 false => flags | libc::EV_DELETE,
105 };
106
107 events.push(kevent_new(fd, libc::EVFILT_READ, r_flags, token.0));
108 events.push(kevent_new(fd, libc::EVFILT_WRITE, w_flags, token.0));
109 kevent_register(self.kq, events.as_mut_slice())?;
110 kevent_check_error(events.as_mut_slice(), &[libc::EPIPE as i64])
111 }
112
113 /// De-registers the fd.
deregister(&self, fd: i32) -> io::Result<()>114 pub fn deregister(&self, fd: i32) -> io::Result<()> {
115 let flags = libc::EV_DELETE | libc::EV_RECEIPT;
116 let mut events = vec![
117 kevent_new(fd, libc::EVFILT_READ, flags, 0),
118 kevent_new(fd, libc::EVFILT_WRITE, flags, 0),
119 ];
120 kevent_register(self.kq, events.as_mut_slice())?;
121 kevent_check_error(events.as_mut_slice(), &[libc::ENOENT as i64])
122 }
123
124 /// Try-clones the kqueue.
125 ///
126 /// If succeeds, returns a duplicate of the kqueue.
127 /// If fails, returns the last OS error.
try_clone(&self) -> io::Result<Selector>128 pub fn try_clone(&self) -> io::Result<Selector> {
129 const LOWEST_FD: c_int = 3;
130
131 let kq = syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, LOWEST_FD))?;
132 Ok(Selector { kq })
133 }
134
135 /// Allows the kqueue to accept user-space notifications. Should be called
136 /// before `Selector::wake`
register_waker(&self, token: Token) -> io::Result<()>137 pub fn register_waker(&self, token: Token) -> io::Result<()> {
138 let event = kevent_new(
139 0,
140 libc::EVFILT_USER,
141 libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
142 token.0,
143 );
144
145 self.kevent_notify(event)
146 }
147
148 /// Sends a notification to wakeup the kqueue. Should be called after
149 /// `Selector::register_waker`.
wake(&self, token: Token) -> io::Result<()>150 pub fn wake(&self, token: Token) -> io::Result<()> {
151 let mut event = kevent_new(
152 0,
153 libc::EVFILT_USER,
154 libc::EV_ADD | libc::EV_RECEIPT,
155 token.0,
156 );
157 event.fflags = libc::NOTE_TRIGGER;
158 self.kevent_notify(event)
159 }
160
161 #[inline]
kevent_notify(&self, mut event: Event) -> io::Result<()>162 fn kevent_notify(&self, mut event: Event) -> io::Result<()> {
163 syscall!(kevent(self.kq, &event, 1, &mut event, 1, ptr::null())).map(|_| {
164 if (event.flags & libc::EV_ERROR != 0) && event.data != 0 {
165 Err(io::Error::from_raw_os_error(event.data as i32))
166 } else {
167 Ok(())
168 }
169 })?
170 }
171 }
172
173 #[inline]
kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()>174 fn kevent_register(kq: RawFd, events: &mut [Event]) -> io::Result<()> {
175 match syscall!(kevent(
176 kq,
177 events.as_ptr(),
178 events.len() as c_int,
179 events.as_mut_ptr(),
180 events.len() as c_int,
181 ptr::null(),
182 )) {
183 Ok(_) => Ok(()),
184 Err(e) => {
185 if let Some(libc::EINTR) = e.raw_os_error() {
186 Ok(())
187 } else {
188 Err(e)
189 }
190 }
191 }
192 }
193
194 // this function should be called right after register
kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()>195 fn kevent_check_error(events: &mut [Event], ignored: &[i64]) -> io::Result<()> {
196 for event in events {
197 let data = event.data as _;
198 if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored.contains(&data) {
199 return Err(io::Error::from_raw_os_error(data as i32));
200 }
201 }
202 Ok(())
203 }
204
205 #[inline]
kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event206 fn kevent_new(ident: RawFd, filter: i16, flags: u16, udata: usize) -> Event {
207 Event {
208 ident: ident as uintptr_t,
209 filter,
210 flags,
211 udata: udata as *mut c_void,
212 ..unsafe { mem::zeroed() }
213 }
214 }
215
216 /// An io event
217 pub type Event = libc::kevent;
218
219 /// A vector of events
220 pub struct Events(Vec<Event>);
221
222 impl Events {
223 /// Initializes a vector of events with an initial capacity
with_capacity(capacity: usize) -> Self224 pub fn with_capacity(capacity: usize) -> Self {
225 Events(Vec::with_capacity(capacity))
226 }
227 }
228
229 impl Deref for Events {
230 type Target = Vec<Event>;
231
deref(&self) -> &Self::Target232 fn deref(&self) -> &Self::Target {
233 &self.0
234 }
235 }
236
237 impl DerefMut for Events {
deref_mut(&mut self) -> &mut Self::Target238 fn deref_mut(&mut self) -> &mut Self::Target {
239 &mut self.0
240 }
241 }
242
243 // kevent has a member `udata` which has type of `*mut c_void`, therefore it
244 // does not automatically derive Sync/Send.
245 unsafe impl Send for Events {}
246 unsafe impl Sync for Events {}
247
248 impl EventTrait for Event {
token(&self) -> Token249 fn token(&self) -> Token {
250 Token(self.udata as usize)
251 }
252
is_readable(&self) -> bool253 fn is_readable(&self) -> bool {
254 self.filter == libc::EVFILT_READ || self.filter == libc::EVFILT_USER
255 }
256
is_writable(&self) -> bool257 fn is_writable(&self) -> bool {
258 self.filter == libc::EVFILT_WRITE
259 }
260
is_read_closed(&self) -> bool261 fn is_read_closed(&self) -> bool {
262 self.filter == libc::EVFILT_READ && self.flags & libc::EV_EOF != 0
263 }
264
is_write_closed(&self) -> bool265 fn is_write_closed(&self) -> bool {
266 self.filter == libc::EVFILT_WRITE && self.flags & libc::EV_EOF != 0
267 }
268
is_error(&self) -> bool269 fn is_error(&self) -> bool {
270 (self.flags & libc::EV_ERROR) != 0 || ((self.flags & libc::EV_EOF) != 0 && self.fflags != 0)
271 }
272 }
273
274 impl Drop for Selector {
drop(&mut self)275 fn drop(&mut self) {
276 if let Err(e) = syscall!(close(self.kq)) {
277 panic!("kqueue release failed: {e}");
278 }
279 }
280 }
281