1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::os::raw::c_int; 16 use std::sync::atomic::{AtomicUsize, Ordering}; 17 use std::time::Duration; 18 19 use crate::{EventTrait, Interest, Token}; 20 21 static NEXT_ID: AtomicUsize = AtomicUsize::new(1); 22 23 /// An wrapper for different OS polling system. 24 /// Linux: epoll 25 /// Windows: iocp 26 /// macos: kqueue 27 pub struct Selector { 28 // selector id 29 id: usize, 30 // epoll fd 31 ep: i32, 32 } 33 34 impl Selector { 35 /// Creates a new Selector. 36 /// 37 /// # Error 38 /// If the underlying syscall fails, returns the corresponding error. new() -> io::Result<Selector>39 pub fn new() -> io::Result<Selector> { 40 let ep = match syscall!(epoll_create1(libc::EPOLL_CLOEXEC)) { 41 Ok(ep_sys) => ep_sys, 42 Err(err) => { 43 return Err(err); 44 } 45 }; 46 47 Ok(Selector { 48 id: NEXT_ID.fetch_add(1, Ordering::Relaxed), 49 ep, 50 }) 51 } 52 53 /// Waits for io events to come within a time limit. select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>54 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { 55 // Convert to milliseconds, if input time is none, it means the timeout is -1 56 // and wait permanently. 57 let timeout = timeout.map(|time| time.as_millis() as c_int).unwrap_or(-1); 58 59 events.clear(); 60 61 match syscall!(epoll_wait( 62 self.ep, 63 events.as_mut_ptr(), 64 events.capacity() as i32, 65 timeout 66 )) { 67 Ok(n_events) => unsafe { events.set_len(n_events as usize) }, 68 Err(err) => { 69 return Err(err); 70 } 71 } 72 Ok(()) 73 } 74 75 /// Registers the fd with specific interested events register(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>76 pub fn register(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> { 77 let mut sys_event = libc::epoll_event { 78 events: interests.into_io_event(), 79 u64: usize::from(token) as u64, 80 }; 81 82 match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut sys_event)) { 83 Ok(_) => Ok(()), 84 Err(err) => Err(err), 85 } 86 } 87 88 /// Re-registers the fd with specific interested events reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()>89 pub fn reregister(&self, fd: i32, token: Token, interests: Interest) -> io::Result<()> { 90 let mut sys_event = libc::epoll_event { 91 events: interests.into_io_event(), 92 u64: usize::from(token) as u64, 93 }; 94 95 match syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut sys_event)) { 96 Ok(_) => Ok(()), 97 Err(err) => Err(err), 98 } 99 } 100 101 /// De-registers the fd. deregister(&self, fd: i32) -> io::Result<()>102 pub fn deregister(&self, fd: i32) -> io::Result<()> { 103 match syscall!(epoll_ctl( 104 self.ep, 105 libc::EPOLL_CTL_DEL, 106 fd, 107 std::ptr::null_mut() 108 )) { 109 Ok(_) => Ok(()), 110 Err(err) => Err(err), 111 } 112 } 113 } 114 115 impl Drop for Selector { drop(&mut self)116 fn drop(&mut self) { 117 if let Err(_err) = syscall!(close(self.ep)) { 118 // todo: log the error 119 } 120 } 121 } 122 123 impl std::fmt::Debug for Selector { fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result124 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 125 write!(fmt, "epoll fd: {}, Select id: {}", self.ep, self.id) 126 } 127 } 128 129 /// A vector of events 130 pub type Events = Vec<Event>; 131 132 /// An io event 133 pub type Event = libc::epoll_event; 134 135 impl EventTrait for Event { token(&self) -> Token136 fn token(&self) -> Token { 137 Token(self.u64 as usize) 138 } 139 is_readable(&self) -> bool140 fn is_readable(&self) -> bool { 141 (self.events as libc::c_int & libc::EPOLLIN) != 0 142 || (self.events as libc::c_int & libc::EPOLLPRI) != 0 143 } 144 is_writable(&self) -> bool145 fn is_writable(&self) -> bool { 146 (self.events as libc::c_int & libc::EPOLLOUT) != 0 147 } 148 is_read_closed(&self) -> bool149 fn is_read_closed(&self) -> bool { 150 self.events as libc::c_int & libc::EPOLLHUP != 0 151 || (self.events as libc::c_int & libc::EPOLLIN != 0 152 && self.events as libc::c_int & libc::EPOLLRDHUP != 0) 153 } 154 is_write_closed(&self) -> bool155 fn is_write_closed(&self) -> bool { 156 self.events as libc::c_int & libc::EPOLLHUP != 0 157 || (self.events as libc::c_int & libc::EPOLLOUT != 0 158 && self.events as libc::c_int & libc::EPOLLERR != 0) 159 || self.events as libc::c_int == libc::EPOLLERR 160 } 161 is_error(&self) -> bool162 fn is_error(&self) -> bool { 163 (self.events as libc::c_int & libc::EPOLLERR) != 0 164 } 165 } 166 167 #[cfg(test)] 168 mod test { 169 use crate::sys::socket; 170 use crate::{Event, EventTrait, Interest, Selector, Token}; 171 172 /// UT cases for `Selector::reregister`. 173 /// 174 /// # Brief 175 /// 1. Create a Selector 176 /// 2. Reregister the selector 177 #[test] ut_epoll_reregister()178 fn ut_epoll_reregister() { 179 let selector = Selector::new().unwrap(); 180 let sock = socket::socket_new(libc::AF_UNIX, libc::SOCK_STREAM).unwrap(); 181 let ret = selector.register(sock, Token::from_usize(0), Interest::READABLE); 182 assert!(ret.is_ok()); 183 let ret = selector.reregister(sock, Token::from_usize(0), Interest::WRITABLE); 184 assert!(ret.is_ok()); 185 } 186 187 /// UT case for `Event::is_error` 188 /// 189 /// # Brief 190 /// 1. Create an event from libc::EPOLLERR 191 /// 2. Check if it's an error 192 #[test] ut_event_is_err()193 fn ut_event_is_err() { 194 let event = Event { 195 events: libc::EPOLLERR as u32, 196 u64: 0, 197 }; 198 assert!(event.is_error()); 199 } 200 } 201