1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 //! Providing asynchronous task scheduling and epoll handling mechanism.
17 //!
18 //! Current implementation of task scheduler allow posted tasks to run concurrently,
19 //! so synchronization are necessary if they share some data.
20 //!
21 //! On creation of scheduler, an epoll instance is also created and event loop is
22 //! started. We see a (fd, events processing logic) pair as an event handler. When
23 //! an epoll handler is added, its fd is added to the interest list of the epoll
24 //! and is waited on for events. When events occured on its fd, scheduler will
25 //! dispatch events to it.
26 
27 #![allow(dead_code)]
28 #![allow(unused_variables)]
29 
30 /// Module declarations.
31 mod scheduler;
32 mod task;
33 
34 /// Public exports.
35 pub use scheduler::IEpollHandler;
36 pub use scheduler::{
37     LIBC_EPOLLIN,
38     LIBC_EPOLLERR,
39     LIBC_EPOLLHUP,
40 };
41 pub use task::TaskHandle;
42 
43 use std::ffi::{ c_char, CString };
44 use std::sync::Arc;
45 use std::sync::atomic::{ AtomicUsize, Ordering };
46 use std::time::Duration;
47 use fusion_utils_rust::{call_debug_enter, FusionResult };
48 use hilog_rust::{ hilog, HiLogLabel, LogType };
49 use scheduler::Scheduler;
50 
51 const LOG_LABEL: HiLogLabel = HiLogLabel {
52     log_type: LogType::LogCore,
53     domain: 0xD002220,
54     tag: "Handler",
55 };
56 
57 /// Front-end of scheduler, providing interface for posting asynchronous task
58 /// and epoll handling.
59 pub struct Handler {
60     id: usize,
61     scheduler: Arc<Scheduler>,
62 }
63 
64 impl Handler {
65     /// Construct a new instance of `Handler`.
new() -> Self66     pub fn new() -> Self
67     {
68         static ID_RADIX: AtomicUsize = AtomicUsize::new(1);
69         let scheduler = Arc::new(Scheduler::new());
70 
71         Self {
72             id: ID_RADIX.fetch_add(1, Ordering::Relaxed),
73             scheduler,
74         }
75     }
76 
77     /// Return the unique identifier of this `Handler`.
id(&self) -> usize78     pub fn id(&self) -> usize
79     {
80         self.id
81     }
82 
83     /// Schedudle a `synchronous` executing task, and return the result.
post_sync_task<F, R>(&self, task: F) -> R where F: Fn() -> R + Send + 'static, R: Send + 'static,84     pub fn post_sync_task<F, R>(&self, task: F) -> R
85     where
86         F: Fn() -> R + Send + 'static,
87         R: Send + 'static,
88     {
89         call_debug_enter!("Handler::post_sync_task");
90         ylong_runtime::block_on(async move {
91             task()
92         })
93     }
94 
95     /// Scheduling an asynchronous task.
96     ///
97     /// Calling `TaskHandle::result` to get the result of the task. Calling
98     /// `TaskHandle::result` will block current thread until the task finish.
99     ///
100     /// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
101     ///
102     /// # Examples
103     ///
104     /// ```
105     /// let handler = Handler::new();
106     /// let param: usize = 0xAB1807;
107     ///
108     /// let mut task_handle = handler.post_async_task(move || {
109     ///     hash(param)
110     /// }
111     /// let ret = task_handle.result().unwrap();
112     /// let expected = hash(param);
113     /// assert_eq!(ret, expected);
114     /// ```
115     ///
post_async_task<F, R>(&self, task: F) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,116     pub fn post_async_task<F, R>(&self, task: F) -> TaskHandle<R>
117     where
118         F: Fn() -> R + Send + 'static,
119         R: Send + 'static,
120     {
121         call_debug_enter!("Handler::post_async_task");
122         let handle = ylong_runtime::spawn(async move {
123             task()
124         });
125         TaskHandle::from(handle)
126     }
127 
128     /// Schedule an asynchronous task that will run after a period of `delay`.
129     ///
130     /// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
131     ///
post_delayed_task<F, R>(&self, task: F, delay: Duration) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,132     pub fn post_delayed_task<F, R>(&self, task: F, delay: Duration) -> TaskHandle<R>
133     where
134         F: Fn() -> R + Send + 'static,
135         R: Send + 'static,
136     {
137         call_debug_enter!("Handler::post_delayed_task");
138         let handle = ylong_runtime::spawn(async move {
139             ylong_runtime::time::sleep(delay).await;
140             task()
141         });
142         TaskHandle::from(handle)
143     }
144 
145     /// Schedule an asynchronous task that will run repeatedly with set interval
146     /// after a period of time.
147     ///
148     /// The posted task will start to run after a period of `delay` if `delay` is not None.
149     /// It will repeat for `repeat` times with `interval` between each running. If `repeat`
150     /// is None, the posted task will repeat forever.
151     ///
152     /// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
153     ///
post_perioric_task<F>(&self, task: F, delay: Option<Duration>, interval: Duration, repeat: Option<usize>) -> TaskHandle<()> where F: Fn() + Send + 'static154     pub fn post_perioric_task<F>(&self, task: F, delay: Option<Duration>, interval: Duration,
155                                  repeat: Option<usize>) -> TaskHandle<()>
156     where
157         F: Fn() + Send + 'static
158     {
159         call_debug_enter!("Handler::post_perioric_task");
160         let handle = ylong_runtime::spawn(async move {
161             if let Some(d) = delay {
162                 ylong_runtime::time::sleep(d).await;
163             }
164             ylong_runtime::time::periodic_schedule(task, repeat, interval).await;
165         });
166         TaskHandle::from(handle)
167     }
168 
169     /// Schedule an asynchronous task that may block. That is, it may take a huge time to
170     /// finish, or may block for resources.
171     ///
172     /// Calling `TaskHandle::cancel` to cancel the posted task before it finish.
173     ///
post_blocking_task<F, R>(&self, task: F) -> TaskHandle<R> where F: Fn() -> R + Send + 'static, R: Send + 'static,174     pub fn post_blocking_task<F, R>(&self, task: F) -> TaskHandle<R>
175     where
176         F: Fn() -> R + Send + 'static,
177         R: Send + 'static,
178     {
179         call_debug_enter!("Handler::post_delayed_task");
180         let handle = ylong_runtime::spawn_blocking(task);
181         TaskHandle::from(handle)
182     }
183 
184     /// Add an epoll handler to epoll event loop.
185     ///
186     /// Note that we call a (fd, events processing logic) pair an event handler.
187     ///
188     /// # Examples
189     ///
190     /// ```
191     /// struct EpollHandler {
192     /// //  data members.
193     /// }
194     ///
195     /// impl IEpollHandler for EpollHandler {
196     ///     fn fd(&self) -> RawFd {
197     ///         // Return fd of this epoll handler.
198     ///     }
199     ///
200     ///     fn dispatch(&self, events: u32) {
201     ///         // Process events.
202     ///     }
203     /// }
204     ///
205     /// let handler: Arc<Handler> = Arc::default();
206     /// let epoll_handler = Arc::new(EpollHandler::new());
207     /// handler.add_epoll_handler(epoll_handler)
208     /// ```
add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>209     pub fn add_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
210         -> FusionResult<Arc<dyn IEpollHandler>>
211     {
212         call_debug_enter!("Handler::add_epoll_handler");
213         self.scheduler.add_epoll_handler(handler)
214     }
215 
216     /// Remove an epoll handler from epoll event loop.
remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>) -> FusionResult<Arc<dyn IEpollHandler>>217     pub fn remove_epoll_handler(&self, handler: Arc<dyn IEpollHandler>)
218         -> FusionResult<Arc<dyn IEpollHandler>>
219     {
220         call_debug_enter!("Handler::remove_epoll_handler");
221         self.scheduler.remove_epoll_handler(handler)
222     }
223 }
224 
225 impl Default for Handler {
default() -> Self226     fn default() -> Self
227     {
228         Self::new()
229     }
230 }
231