1 /* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 //! Implementation of epoll event loop. 17 18 #![allow(dead_code)] 19 #![allow(unused_variables)] 20 21 use std::collections::BTreeMap; 22 use std::ffi::{ c_void, c_char, c_int, CString }; 23 use std::future::Future; 24 use std::io::Error; 25 use std::os::fd::RawFd; 26 use std::pin::Pin; 27 use std::sync::{ Arc, Mutex }; 28 use std::sync::atomic::{ AtomicBool, Ordering }; 29 use std::task::{ Context, Poll, Waker }; 30 use fusion_utils_rust::{ call_debug_enter, FusionErrorCode, FusionResult }; 31 use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType }; 32 33 /// Indicating data other than high-priority data can be read. 34 pub const LIBC_EPOLLIN: u32 = libc::EPOLLIN as u32; 35 const LIBC_EPOLLONESHOT: u32 = libc::EPOLLONESHOT as u32; 36 /// Indicating an error has occurred. 37 pub const LIBC_EPOLLERR: u32 = libc::EPOLLERR as u32; 38 /// Indicating a hangup has occurred. 39 pub const LIBC_EPOLLHUP: u32 = libc::EPOLLHUP as u32; 40 const LIBC_EPOLLALL: u32 = LIBC_EPOLLIN | LIBC_EPOLLERR | LIBC_EPOLLHUP; 41 const LIBC_EPOLLNONE: u32 = 0; 42 const MAX_EPOLL_EVENTS: c_int = 128; 43 const EPOLL_SUCCESS: c_int = 0; 44 const EPOLL_FAILURE: c_int = -1; 45 const NO_TIMEOUT: c_int = -1; 46 const SYSTEM_IO_FAILURE: libc::ssize_t = -1; 47 const INVALID_FD: RawFd = -1; 48 const LOG_LABEL: HiLogLabel = HiLogLabel { 49 log_type: LogType::LogCore, 50 domain: 0xD002220, 51 tag: "Scheduler", 52 }; 53 54 /// Abstraction of epoll handler. 55 pub trait IEpollHandler: Send + Sync { 56 /// Return file descriptor of this epoll handler. fd(&self) -> RawFd57 fn fd(&self) -> RawFd; 58 /// Dispatch epoll events to this epoll handler. dispatch(&self, events: u32)59 fn dispatch(&self, events: u32); 60 } 61 62 struct EpollEvent { 63 fd: RawFd, 64 events: u32, 65 } 66 67 struct EpollHandler { 68 raw: Arc<dyn IEpollHandler>, 69 handle: ylong_runtime::task::JoinHandle<()>, 70 waker: Option<Waker>, 71 events: u32, 72 } 73 74 impl EpollHandler { new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self75 fn new(raw: Arc<dyn IEpollHandler>, handle: ylong_runtime::task::JoinHandle<()>) -> Self 76 { 77 Self { 78 raw, 79 handle, 80 waker: Default::default(), 81 events: Default::default(), 82 } 83 } 84 85 #[inline] fd(&self) -> RawFd86 fn fd(&self) -> RawFd 87 { 88 self.raw.fd() 89 } 90 91 #[inline] raw_handler(&self) -> Arc<dyn IEpollHandler>92 fn raw_handler(&self) -> Arc<dyn IEpollHandler> 93 { 94 self.raw.clone() 95 } 96 97 #[inline] set_waker(&mut self, waker: &Waker)98 fn set_waker(&mut self, waker: &Waker) 99 { 100 self.waker.replace(waker.clone()); 101 } 102 103 #[inline] take_events(&mut self) -> u32104 fn take_events(&mut self) -> u32 105 { 106 let events = self.events; 107 self.events = Default::default(); 108 events 109 } 110 } 111 112 impl Drop for EpollHandler { drop(&mut self)113 fn drop(&mut self) 114 { 115 self.handle.cancel(); 116 } 117 } 118 119 /// `Driver` encapsulate event loop of epoll. 120 struct Driver { 121 epoll: Arc<Epoll>, 122 is_running: Arc<AtomicBool>, 123 } 124 125 impl Driver { new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self126 fn new(epoll: Arc<Epoll>, is_running: Arc<AtomicBool>) -> Self 127 { 128 Self { epoll, is_running } 129 } 130 131 #[inline] is_running(&self) -> bool132 fn is_running(&self) -> bool 133 { 134 self.is_running.load(Ordering::Relaxed) 135 } 136 run(&self)137 fn run(&self) 138 { 139 call_debug_enter!("Driver::run"); 140 while self.is_running() { 141 if let Some(epoll_events) = self.epoll.epoll_wait() { 142 if !self.is_running() { 143 info!(LOG_LABEL, "Driver stopped running"); 144 break; 145 } 146 self.epoll.wake(&epoll_events); 147 } 148 } 149 } 150 } 151 152 struct Epoll { 153 epoll_fd: RawFd, 154 handlers: Mutex<BTreeMap<RawFd, EpollHandler>>, 155 } 156 157 impl Epoll { new() -> Self158 fn new() -> Self 159 { 160 // SAFETY: 161 // The epoll API is multi-thread safe. 162 // This is a normal system call, no safety pitfall. 163 let epoll_fd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }; 164 assert_ne!(epoll_fd, INVALID_FD, "epoll_create1 fail: {:?}", Error::last_os_error()); 165 Self { 166 epoll_fd, 167 handlers: Mutex::default(), 168 } 169 } 170 171 #[inline] fd(&self) -> RawFd172 fn fd(&self) -> RawFd 173 { 174 self.epoll_fd 175 } 176 epoll_add(&self, fd: RawFd) -> FusionResult<()>177 fn epoll_add(&self, fd: RawFd) -> FusionResult<()> 178 { 179 call_debug_enter!("Epoll::epoll_add"); 180 let mut ev = libc::epoll_event { 181 events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR, 182 u64: fd as u64, 183 }; 184 // SAFETY: 185 // The epoll API is multi-thread safe. 186 // We have carefully ensure that parameters are as required by system interface. 187 let ret = unsafe { 188 libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev) 189 }; 190 if ret != EPOLL_SUCCESS { 191 error!(LOG_LABEL, "epoll_ctl_add({},{}) fail: {:?}", 192 @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 193 Err(FusionErrorCode::Fail) 194 } else { 195 Ok(()) 196 } 197 } 198 epoll_del(&self, fd: RawFd) -> FusionResult<()>199 fn epoll_del(&self, fd: RawFd) -> FusionResult<()> 200 { 201 call_debug_enter!("Epoll::epoll_del"); 202 // SAFETY: 203 // The epoll API is multi-thread safe. 204 // We have carefully ensure that parameters are as required by system interface. 205 let ret = unsafe { 206 libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut()) 207 }; 208 if ret != EPOLL_SUCCESS { 209 error!(LOG_LABEL, "epoll_ctl_remove({},{}) fail: {:?}", 210 @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 211 Err(FusionErrorCode::Fail) 212 } else { 213 Ok(()) 214 } 215 } 216 epoll_wait(&self) -> Option<Vec<EpollEvent>>217 fn epoll_wait(&self) -> Option<Vec<EpollEvent>> 218 { 219 call_debug_enter!("Epoll::epoll_wait"); 220 let mut events: Vec<libc::epoll_event> = Vec::with_capacity(MAX_EPOLL_EVENTS as usize); 221 // SAFETY: 222 // The epoll API is multi-thread safe. 223 // We have carefully ensure that parameters are as required by system interface. 224 let ret = unsafe { 225 libc::epoll_wait(self.epoll_fd, events.as_mut_ptr(), MAX_EPOLL_EVENTS, NO_TIMEOUT) 226 }; 227 if ret < 0 { 228 error!(LOG_LABEL, "epoll_wait({}) fail: {:?}", 229 @public(self.epoll_fd), 230 @public(Error::last_os_error())); 231 return None; 232 } 233 let num_of_events = ret as usize; 234 // SAFETY: 235 // `epoll_wait` returns the number of events and promise it is within the limit of 236 // `MAX_EPOLL_EVENTS`. 237 let epoll_events = unsafe { 238 std::slice::from_raw_parts(events.as_ptr(), num_of_events) 239 }; 240 let epoll_events: Vec<EpollEvent> = epoll_events.iter().map(|e| { 241 EpollEvent { 242 fd: e.u64 as RawFd, 243 events: e.events, 244 } 245 }).collect(); 246 Some(epoll_events) 247 } 248 epoll_reset(&self, fd: RawFd) -> FusionResult<()>249 fn epoll_reset(&self, fd: RawFd) -> FusionResult<()> 250 { 251 call_debug_enter!("Epoll::epoll_reset"); 252 let mut ev = libc::epoll_event { 253 events: LIBC_EPOLLIN | LIBC_EPOLLONESHOT | LIBC_EPOLLHUP | LIBC_EPOLLERR, 254 u64: fd as u64, 255 }; 256 // SAFETY: 257 // The epoll API is multi-thread safe. 258 // We have carefully ensure that parameters are as required by system interface. 259 let ret = unsafe { 260 libc::epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev) 261 }; 262 if ret != EPOLL_SUCCESS { 263 error!(LOG_LABEL, "In reset_fd, epoll_ctl_mod({},{}) fail: {:?}", 264 @public(self.epoll_fd), @public(fd), @public(Error::last_os_error())); 265 Err(FusionErrorCode::Fail) 266 } else { 267 Ok(()) 268 } 269 } 270 add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler) -> FusionResult<Arc<dyn IEpollHandler>>271 fn add_epoll_handler(&self, fd: RawFd, epoll_handler: EpollHandler) 272 -> FusionResult<Arc<dyn IEpollHandler>> 273 { 274 call_debug_enter!("Epoll::add_epoll_handler"); 275 let mut guard = self.handlers.lock().unwrap(); 276 if guard.contains_key(&fd) { 277 error!(LOG_LABEL, "Epoll handler ({}) has been added", @public(fd)); 278 return Err(FusionErrorCode::Fail); 279 } 280 debug!(LOG_LABEL, "Add epoll handler ({})", @public(fd)); 281 let raw = epoll_handler.raw_handler(); 282 guard.insert(fd, epoll_handler); 283 let _ = self.epoll_add(fd); 284 Ok(raw) 285 } 286 remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>>287 fn remove_epoll_handler(&self, fd: RawFd) -> FusionResult<Arc<dyn IEpollHandler>> 288 { 289 call_debug_enter!("Epoll::remove_epoll_handler"); 290 let mut guard = self.handlers.lock().unwrap(); 291 let _ = self.epoll_del(fd); 292 if let Some(h) = guard.remove(&fd) { 293 debug!(LOG_LABEL, "Remove epoll handler ({})", @public(fd)); 294 Ok(h.raw_handler()) 295 } else { 296 error!(LOG_LABEL, "No epoll handler ({})", @public(fd)); 297 Err(FusionErrorCode::Fail) 298 } 299 } 300 wake(&self, events: &[EpollEvent])301 fn wake(&self, events: &[EpollEvent]) 302 { 303 call_debug_enter!("Epoll::wake"); 304 let mut guard = self.handlers.lock().unwrap(); 305 for e in events { 306 if let Some(handler) = guard.get_mut(&e.fd) { 307 debug!(LOG_LABEL, "Wake epoll handler ({})", @public(e.fd)); 308 handler.events = e.events; 309 if let Some(waker) = &handler.waker { 310 waker.wake_by_ref(); 311 } 312 } else { 313 error!(LOG_LABEL, "No epoll handler ({})", @public(e.fd)); 314 } 315 } 316 } 317 dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)>318 fn dispatch_inner(&self, fd: RawFd, waker: &Waker) -> Option<(Arc<dyn IEpollHandler>, u32)> 319 { 320 call_debug_enter!("Epoll::dispatch_inner"); 321 let mut guard = self.handlers.lock().unwrap(); 322 if let Some(handler) = guard.get_mut(&fd) { 323 handler.set_waker(waker); 324 let events = handler.take_events() & LIBC_EPOLLALL; 325 if events != LIBC_EPOLLNONE { 326 Some((handler.raw_handler(), events)) 327 } else { 328 debug!(LOG_LABEL, "No epoll event"); 329 None 330 } 331 } else { 332 error!(LOG_LABEL, "No epoll handler with ({})", @public(fd)); 333 None 334 } 335 } 336 dispatch(&self, fd: RawFd, waker: &Waker)337 fn dispatch(&self, fd: RawFd, waker: &Waker) 338 { 339 call_debug_enter!("Epoll::dispatch"); 340 if let Some((handler, events)) = self.dispatch_inner(fd, waker) { 341 handler.dispatch(events); 342 let _ = self.epoll_reset(fd); 343 } 344 } 345 } 346 347 impl Default for Epoll { default() -> Self348 fn default() -> Self 349 { 350 Self::new() 351 } 352 } 353 354 impl Drop for Epoll { drop(&mut self)355 fn drop(&mut self) 356 { 357 // SAFETY: 358 // Parameter is as required by system, so consider it safe here. 359 let ret = unsafe { libc::close(self.epoll_fd) }; 360 if ret != 0 { 361 error!(LOG_LABEL, "close({}) fail: {:?}", 362 @public(self.epoll_fd), 363 @public(Error::last_os_error())); 364 } 365 } 366 } 367 368 struct EpollHandlerFuture { 369 fd: RawFd, 370 epoll: Arc<Epoll>, 371 } 372 373 impl EpollHandlerFuture { new(fd: RawFd, epoll: Arc<Epoll>) -> Self374 fn new(fd: RawFd, epoll: Arc<Epoll>) -> Self 375 { 376 Self { fd, epoll } 377 } 378 } 379 380 impl Future for EpollHandlerFuture { 381 type Output = (); 382 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>383 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> 384 { 385 call_debug_enter!("EpollHandlerFuture::poll"); 386 self.epoll.dispatch(self.fd, cx.waker()); 387 Poll::Pending 388 } 389 } 390 391 struct EpollWaker { 392 fds: [RawFd; 2], 393 } 394 395 impl EpollWaker { new() -> Self396 fn new() -> Self 397 { 398 let mut fds: [c_int; 2] = [-1; 2]; 399 // SAFETY: 400 // The pipe API is multi-thread safe. 401 // We have carefully checked that parameters are as required by system interface. 402 let ret = unsafe { 403 libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) 404 }; 405 if ret != 0 { 406 error!(LOG_LABEL, "pipe2 fail: {:?}", @public(Error::last_os_error())); 407 } 408 Self { fds } 409 } 410 wake(&self)411 fn wake(&self) 412 { 413 call_debug_enter!("EpollWaker::wake"); 414 let data: i32 = 0; 415 // SAFETY: 416 // We have carefully checked that parameters are as required by system interface. 417 let ret = unsafe { 418 libc::write(self.fds[1], 419 std::ptr::addr_of!(data) as *const c_void, 420 std::mem::size_of_val(&data)) 421 }; 422 if ret == SYSTEM_IO_FAILURE { 423 error!(LOG_LABEL, "write fail: {:?}", @public(Error::last_os_error())); 424 } 425 } 426 } 427 428 impl IEpollHandler for EpollWaker { fd(&self) -> RawFd429 fn fd(&self) -> RawFd 430 { 431 self.fds[0] 432 } 433 dispatch(&self, events: u32)434 fn dispatch(&self, events: u32) 435 { 436 if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN { 437 let data: i32 = 0; 438 // SAFETY: 439 // Parameters are as required by system and business logic, so it can be trusted. 440 let ret = unsafe { 441 libc::read(self.fd(), 442 std::ptr::addr_of!(data) as *mut c_void, 443 std::mem::size_of_val(&data)) 444 }; 445 if ret == SYSTEM_IO_FAILURE { 446 error!(LOG_LABEL, "read fail: {:?}", @public(Error::last_os_error())); 447 } 448 } 449 } 450 } 451 452 impl Default for EpollWaker { default() -> Self453 fn default() -> Self 454 { 455 Self::new() 456 } 457 } 458 459 impl Drop for EpollWaker { drop(&mut self)460 fn drop(&mut self) 461 { 462 for fd in &mut self.fds { 463 if *fd != INVALID_FD { 464 // SAFETY: 465 // Parameter is as required by system, so consider it safe here. 466 let ret = unsafe { libc::close(*fd) }; 467 if ret != EPOLL_SUCCESS { 468 error!(LOG_LABEL, "close({}) fail: {:?}", 469 @public(*fd), 470 @public(Error::last_os_error())); 471 } 472 } 473 } 474 } 475 } 476 477 /// Bookkeeping of epoll handling. 478 pub struct Scheduler { 479 epoll: Arc<Epoll>, 480 epoll_waker: Arc<EpollWaker>, 481 is_running: Arc<AtomicBool>, 482 join_handle: Option<std::thread::JoinHandle<()>>, 483 } 484 485 impl Scheduler { new() -> Self486 pub(crate) fn new() -> Self 487 { 488 call_debug_enter!("Scheduler::new"); 489 let epoll: Arc<Epoll> = Arc::default(); 490 let is_running = Arc::new(AtomicBool::new(true)); 491 let driver = Driver::new(epoll.clone(), is_running.clone()); 492 let join_handle = std::thread::spawn(move || { 493 driver.run(); 494 }); 495 let scheduler = Self { 496 epoll, 497 epoll_waker: Arc::default(), 498 is_running, 499 join_handle: Some(join_handle), 500 }; 501 let _ = scheduler.add_epoll_handler(scheduler.epoll_waker.clone()); 502 scheduler 503 } 504 add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>505 pub(crate) fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 506 -> FusionResult<Arc<dyn IEpollHandler>> 507 { 508 call_debug_enter!("Scheduler::add_epoll_handler"); 509 let fd: RawFd = handler.fd(); 510 let join_handle = ylong_runtime::spawn( 511 EpollHandlerFuture::new(fd, self.epoll.clone()) 512 ); 513 self.epoll.add_epoll_handler(fd, EpollHandler::new(handler, join_handle)) 514 } 515 remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>516 pub(crate) fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 517 -> FusionResult<Arc<dyn IEpollHandler>> 518 { 519 call_debug_enter!("Scheduler::remove_epoll_handler"); 520 self.epoll.remove_epoll_handler(handler.fd()) 521 } 522 } 523 524 impl Default for Scheduler { default() -> Self525 fn default() -> Self 526 { 527 Self::new() 528 } 529 } 530 531 impl Drop for Scheduler { drop(&mut self)532 fn drop(&mut self) 533 { 534 call_debug_enter!("Scheduler::drop"); 535 self.is_running.store(false, Ordering::Relaxed); 536 self.epoll_waker.wake(); 537 if let Some(join_handle) = self.join_handle.take() { 538 let _ = join_handle.join(); 539 } 540 } 541 } 542