1 /* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 //! Providing asynchronous task scheduling and epoll handling mechanism. 17 //! 18 //! Current implementation of task scheduler allow posted tasks to run concurrently, 19 //! so synchronization are necessary if they share some data. 20 //! 21 //! On creation of scheduler, an epoll instance is also created and event loop is 22 //! started. We see a (fd, events processing logic) pair as an event handler. When 23 //! an epoll handler is added, its fd is added to the interest list of the epoll 24 //! and is waited on for events. When events occured on its fd, scheduler will 25 //! dispatch events to it. 26 27 #![allow(dead_code)] 28 #![allow(unused_variables)] 29 30 /// Module declarations. 31 mod scheduler; 32 mod task; 33 34 /// Public exports. 35 pub use scheduler::IEpollHandler; 36 pub use scheduler::{ 37 LIBC_EPOLLIN, 38 LIBC_EPOLLERR, 39 LIBC_EPOLLHUP, 40 }; 41 pub use task::TaskHandle; 42 43 use std::ffi::{ c_char, CString }; 44 use std::sync::Arc; 45 use std::sync::atomic::{ AtomicUsize, Ordering }; 46 use std::time::Duration; 47 use fusion_utils_rust::{call_debug_enter, FusionResult }; 48 use hilog_rust::{ hilog, HiLogLabel, LogType }; 49 use scheduler::Scheduler; 50 51 const LOG_LABEL: HiLogLabel = HiLogLabel { 52 log_type: LogType::LogCore, 53 domain: 0xD002220, 54 tag: "Handler", 55 }; 56 57 /// Front-end of scheduler, providing interface for posting asynchronous task 58 /// and epoll handling. 59 pub struct Handler { 60 id: usize, 61 scheduler: Arc<Scheduler>, 62 } 63 64 impl Handler { 65 /// Construct a new instance of `Handler`. new() -> Self66 pub fn new() -> Self 67 { 68 static ID_RADIX: AtomicUsize = AtomicUsize::new(1); 69 let scheduler = Arc::new(Scheduler::new()); 70 71 Self { 72 id: ID_RADIX.fetch_add(1, Ordering::Relaxed), 73 scheduler, 74 } 75 } 76 77 /// Return the unique identifier of this `Handler`. id(&self) -> usize78 pub fn id(&self) -> usize 79 { 80 self.id 81 } 82 83 /// Schedudle a `synchronous` executing task, and return the result. post_sync_task<F, R>(&self, task: F) -> R where F: Fn() -> R + Send + 'static, R: Send + 'static,84 pub fn post_sync_task<F, R>(&self, task: F) -> R 85 where 86 F: Fn() -> R + Send + 'static, 87 R: Send + 'static, 88 { 89 call_debug_enter!("Handler::post_sync_task"); 90 ylong_runtime::block_on(async move { 91 task() 92 }) 93 } 94 95 /// Scheduling an asynchronous task. 96 /// 97 /// Calling `TaskHandle::result` to get the result of the task. Calling 98 /// `TaskHandle::result` will block current thread until the task finish. 99 /// 100 /// Calling `TaskHandle::cancel` to cancel the posted task before it finish. 101 /// 102 /// # Examples 103 /// 104 /// ``` 105 /// let handler = Handler::new(); 106 /// let param: usize = 0xAB1807; 107 /// 108 /// let mut task_handle = handler.post_async_task(move || { 109 /// hash(param) 110 /// } 111 /// let ret = task_handle.result().unwrap(); 112 /// let expected = hash(param); 113 /// assert_eq!(ret, expected); 114 /// ``` 115 /// post_async_task<F, R>(&self, task: F) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,116 pub fn post_async_task<F, R>(&self, task: F) -> TaskHandle<R> 117 where 118 F: Fn() -> R + Send + 'static, 119 R: Send + 'static, 120 { 121 call_debug_enter!("Handler::post_async_task"); 122 let handle = ylong_runtime::spawn(async move { 123 task() 124 }); 125 TaskHandle::from(handle) 126 } 127 128 /// Schedule an asynchronous task that will run after a period of `delay`. 129 /// 130 /// Calling `TaskHandle::cancel` to cancel the posted task before it finish. 131 /// post_delayed_task<F, R>(&self, task: F, delay: Duration) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,132 pub fn post_delayed_task<F, R>(&self, task: F, delay: Duration) -> TaskHandle<R> 133 where 134 F: Fn() -> R + Send + 'static, 135 R: Send + 'static, 136 { 137 call_debug_enter!("Handler::post_delayed_task"); 138 let handle = ylong_runtime::spawn(async move { 139 ylong_runtime::time::sleep(delay).await; 140 task() 141 }); 142 TaskHandle::from(handle) 143 } 144 145 /// Schedule an asynchronous task that will run repeatedly with set interval 146 /// after a period of time. 147 /// 148 /// The posted task will start to run after a period of `delay` if `delay` is not None. 149 /// It will repeat for `repeat` times with `interval` between each running. If `repeat` 150 /// is None, the posted task will repeat forever. 151 /// 152 /// Calling `TaskHandle::cancel` to cancel the posted task before it finish. 153 /// post_perioric_task<F>(&self, task: F, delay: Option<Duration>, interval: Duration, repeat: Option<usize>) -> TaskHandle<()> where F: Fn() + Send + 'static154 pub fn post_perioric_task<F>(&self, task: F, delay: Option<Duration>, interval: Duration, 155 repeat: Option<usize>) -> TaskHandle<()> 156 where 157 F: Fn() + Send + 'static 158 { 159 call_debug_enter!("Handler::post_perioric_task"); 160 let handle = ylong_runtime::spawn(async move { 161 if let Some(d) = delay { 162 ylong_runtime::time::sleep(d).await; 163 } 164 ylong_runtime::time::periodic_schedule(task, repeat, interval).await; 165 }); 166 TaskHandle::from(handle) 167 } 168 169 /// Schedule an asynchronous task that may block. That is, it may take a huge time to 170 /// finish, or may block for resources. 171 /// 172 /// Calling `TaskHandle::cancel` to cancel the posted task before it finish. 173 /// post_blocking_task<F, R>(&self, task: F) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,174 pub fn post_blocking_task<F, R>(&self, task: F) -> TaskHandle<R> 175 where 176 F: Fn() -> R + Send + 'static, 177 R: Send + 'static, 178 { 179 call_debug_enter!("Handler::post_delayed_task"); 180 let handle = ylong_runtime::spawn_blocking(task); 181 TaskHandle::from(handle) 182 } 183 184 /// Add an epoll handler to epoll event loop. 185 /// 186 /// Note that we call a (fd, events processing logic) pair an event handler. 187 /// 188 /// # Examples 189 /// 190 /// ``` 191 /// struct EpollHandler { 192 /// // data members. 193 /// } 194 /// 195 /// impl IEpollHandler for EpollHandler { 196 /// fn fd(&self) -> RawFd { 197 /// // Return fd of this epoll handler. 198 /// } 199 /// 200 /// fn dispatch(&self, events: u32) { 201 /// // Process events. 202 /// } 203 /// } 204 /// 205 /// let handler: Arc<Handler> = Arc::default(); 206 /// let epoll_handler = Arc::new(EpollHandler::new()); 207 /// handler.add_epoll_handler(epoll_handler) 208 /// ``` add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>209 pub fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 210 -> FusionResult<Arc<dyn IEpollHandler>> 211 { 212 call_debug_enter!("Handler::add_epoll_handler"); 213 self.scheduler.add_epoll_handler(handler) 214 } 215 216 /// Remove an epoll handler from epoll event loop. remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>217 pub fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) 218 -> FusionResult<Arc<dyn IEpollHandler>> 219 { 220 call_debug_enter!("Handler::remove_epoll_handler"); 221 self.scheduler.remove_epoll_handler(handler) 222 } 223 } 224 225 impl Default for Handler { default() -> Self226 fn default() -> Self 227 { 228 Self::new() 229 } 230 } 231