1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 #[cfg(feature = "current_thread_runtime")]
19 pub(crate) mod current_thread;
20 
21 use std::future::Future;
22 use std::mem::MaybeUninit;
23 use std::sync::Once;
24 
25 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
26 use crate::builder::RuntimeBuilder;
27 #[cfg(feature = "current_thread_runtime")]
28 use crate::executor::current_thread::CurrentThreadSpawner;
29 use crate::task::{JoinHandle, Task, TaskBuilder};
30 mod driver_handle;
31 pub(crate) use driver_handle::Handle;
32 
33 use crate::executor::worker::WorkerHandle;
34 cfg_ffrt! {
35     use crate::ffrt::spawner::spawn;
36 }
37 cfg_not_ffrt! {
38     mod parker;
39     pub(crate) mod async_pool;
40     pub(crate) mod blocking_pool;
41     pub(crate) mod queue;
42     mod sleeper;
43     pub(crate) mod worker;
44     pub(crate) mod driver;
45     use crate::builder::{initialize_blocking_spawner, initialize_async_spawner};
46     use crate::executor::async_pool::AsyncPoolSpawner;
47     use crate::executor::blocking_pool::BlockPoolSpawner;
48 }
49 
50 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)51     fn schedule(&self, task: Task, lifo: bool);
52 }
53 
54 pub(crate) struct PlaceholderScheduler;
55 
56 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)57     fn schedule(&self, _task: Task, _lifo: bool) {
58         panic!("no scheduler can be called");
59     }
60 }
61 
62 pub(crate) enum AsyncHandle {
63     #[cfg(feature = "current_thread_runtime")]
64     CurrentThread(CurrentThreadSpawner),
65     #[cfg(not(feature = "ffrt"))]
66     MultiThread(AsyncPoolSpawner),
67     #[cfg(feature = "ffrt")]
68     FfrtMultiThread,
69 }
70 
71 /// Runtime struct.
72 ///
73 /// # If `multi_instance_runtime` feature is turned on
74 /// There will be multiple runtime executors, initializing from user settings in
75 /// `RuntimeBuilder`.
76 ///
77 /// # If `multi_instance_runtime` feature is turned off
78 /// There will be only *ONE* runtime executor singleton inside one process.
79 /// The async and blocking pools working when calling methods of this struct are
80 /// stored in the global static executor instance. Here, keep the empty struct
81 /// for compatibility and possibility for function extension in the future.
82 pub struct Runtime {
83     pub(crate) async_spawner: AsyncHandle,
84 }
85 
86 #[cfg(not(feature = "ffrt"))]
87 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>88     pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
89         match &self.async_spawner {
90             #[cfg(feature = "current_thread_runtime")]
91             AsyncHandle::CurrentThread(s) => s.handle.clone(),
92             AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
93         }
94     }
95 }
96 
global_default_async() -> &'static Runtime97 pub(crate) fn global_default_async() -> &'static Runtime {
98     static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
99     static ONCE: Once = Once::new();
100 
101     unsafe {
102         ONCE.call_once(|| {
103             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
104 
105             if global_builder.is_none() {
106                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
107             }
108 
109             #[cfg(not(feature = "ffrt"))]
110             // we have just made sure the global builder is a some, so this unwrap_unchecked is safe
111             let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap_unchecked())
112             {
113                 Ok(s) => Runtime {
114                     async_spawner: AsyncHandle::MultiThread(s),
115                 },
116                 Err(e) => panic!("initialize runtime failed: {e:?}"),
117             };
118             #[cfg(feature = "ffrt")]
119             let runtime = Runtime {
120                 async_spawner: AsyncHandle::FfrtMultiThread,
121             };
122             GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
123         });
124         &*GLOBAL_DEFAULT_ASYNC.as_ptr()
125     }
126 }
127 
128 #[cfg(not(feature = "ffrt"))]
global_default_blocking() -> &'static BlockPoolSpawner129 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
130     static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
131     static ONCE: Once = Once::new();
132 
133     unsafe {
134         ONCE.call_once(|| {
135             let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
136 
137             if global_builder.is_none() {
138                 *global_builder = Some(RuntimeBuilder::new_multi_thread());
139             }
140             // we have just made sure the global builder is a some, so this unwrap_unchecked
141             // is safe
142             match initialize_blocking_spawner(&global_builder.as_ref().unwrap_unchecked().common) {
143                 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
144                 Err(e) => panic!("initialize blocking pool failed: {e:?}"),
145             }
146         });
147         &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
148     }
149 }
150 
151 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
152 impl Runtime {
153     /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>154     pub fn new() -> std::io::Result<Runtime> {
155         RuntimeBuilder::new_multi_thread().build()
156     }
157 }
158 
159 impl Runtime {
160     /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
161     ///
162     /// The future will be later polled by the executor, which is usually
163     /// implemented as a thread pool. The executor will run the future util
164     /// finished.
165     ///
166     /// Awaits on the JoinHandle will return the result of the future, but users
167     /// don't have to `.await` the `JoinHandle` in order to run the future,
168     /// since the future will be executed in the background once it's
169     /// spawned. Dropping the JoinHandle will throw away the returned value.
170     ///
171     /// # Examples
172     ///
173     /// ```no run
174     /// use ylong_runtime::task::*;
175     /// use ylong_runtime::builder::RuntimeBuilder;
176     /// use ylong_runtime::executor::Runtime;
177     ///
178     /// async fn test_future(num: usize) -> usize {
179     ///     num
180     /// }
181     ///
182     /// let core_pool_size = 4;
183     ///
184     /// let runtime = RuntimeBuilder::new_multi_thread()
185     ///     .worker_num(core_pool_size)
186     ///     .build()
187     ///     .unwrap();
188     ///
189     /// runtime.spawn(test_future(1));
190     /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,191     pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
192     where
193         T: Future<Output = R> + Send + 'static,
194         R: Send + 'static,
195     {
196         self.spawn_with_attr(task, &TaskBuilder::default())
197     }
198 
199     #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,200     pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
201     where
202         T: Future<Output = R> + Send + 'static,
203         R: Send + 'static,
204     {
205         match &self.async_spawner {
206             #[cfg(feature = "current_thread_runtime")]
207             AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
208             #[cfg(not(feature = "ffrt"))]
209             AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
210             #[cfg(feature = "ffrt")]
211             AsyncHandle::FfrtMultiThread => spawn(task, builder),
212         }
213     }
214 
215     /// Spawns the provided function or closure onto the runtime.
216     ///
217     /// It's usually used for cpu-bounded computation that does not return
218     /// pending and takes a relatively long time.
219     ///
220     /// # Examples
221     ///
222     /// ```no run
223     /// use ylong_runtime::builder::RuntimeBuilder;
224     ///
225     /// use std::time;
226     /// use std::thread::sleep;
227     ///
228     /// let runtime = RuntimeBuilder::new_multi_thread()
229     ///     .build()
230     ///     .unwrap();
231     ///
232     /// runtime.spawn_blocking(move || {
233     ///     sleep(time::Duration::from_millis(1));
234     ///     10
235     /// });
236     /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,237     pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
238     where
239         T: FnOnce() -> R + Send + 'static,
240         R: Send + 'static,
241     {
242         crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
243     }
244 
245     /// Blocks the current thread and runs the given future (usually a
246     /// JoinHandle) to completion, and gets its return value.
247     ///
248     /// Any code after the `block_on` will be executed once the future is done.
249     ///
250     /// # Panics
251     /// 1. This function panics if it gets called in a runtime asynchronous
252     ///    context. To be specific, this function cannot be called inside
253     ///    `ylong_runtime::block_on` or `ylong_runtime::spawn`.
254     /// 2. This function panics if the provided Future panics.
255     ///
256     /// # Examples
257     ///
258     /// ```no run
259     /// use ylong_runtime::builder::RuntimeBuilder;
260     ///
261     /// let core_pool_size = 4;
262     /// async fn test_future(num: usize) -> usize {
263     ///     num
264     /// }
265     ///
266     /// let runtime = RuntimeBuilder::new_multi_thread()
267     ///     .worker_num(core_pool_size)
268     ///     .build()
269     ///     .unwrap();
270     ///
271     /// let handle = runtime.spawn(test_future(4));
272     ///
273     /// let result = runtime.block_on(handle);
274     ///
275     /// assert_eq!(result.unwrap(), 4);
276     /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,277     pub fn block_on<T, R>(&self, task: T) -> R
278     where
279         T: Future<Output = R>,
280     {
281         self.block_on_inner(task)
282     }
283 
284     #[cfg(not(feature = "ffrt"))]
block_on_inner<T, R>(&self, task: T) -> R where T: Future<Output = R>,285     fn block_on_inner<T, R>(&self, task: T) -> R
286     where
287         T: Future<Output = R>,
288     {
289         worker::CURRENT_HANDLE.with(|ctx| {
290             if !ctx.get().is_null() {
291                 panic!(
292                     "Cannot block_on an asynchronous function in a runtime context. \
293                     This happens because a block_on call tries to block the current \
294                     thread which is being used to drive asynchronous tasks."
295                 );
296             }
297         });
298 
299         // Registers handle to the current thread when block_on().
300         // so that async_source can get the handle and register it.
301         let cur_context = worker::WorkerHandle {
302             _handle: self.get_handle(),
303         };
304 
305         worker::CURRENT_HANDLE.with(|ctx| {
306             ctx.set((&cur_context as *const WorkerHandle).cast::<()>());
307         });
308 
309         let ret = match &self.async_spawner {
310             #[cfg(feature = "current_thread_runtime")]
311             AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
312             AsyncHandle::MultiThread(_) => block_on::block_on(task),
313         };
314 
315         // Sets the current thread variable to null,
316         // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
317         worker::CURRENT_HANDLE.with(|ctx| {
318             ctx.set(std::ptr::null());
319         });
320 
321         ret
322     }
323 
324     #[cfg(feature = "ffrt")]
block_on_inner<T, R>(&self, task: T) -> R where T: Future<Output = R>,325     fn block_on_inner<T, R>(&self, task: T) -> R
326     where
327         T: Future<Output = R>,
328     {
329         match &self.async_spawner {
330             #[cfg(feature = "current_thread_runtime")]
331             AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
332             AsyncHandle::FfrtMultiThread => block_on::block_on(task),
333         }
334     }
335 }
336 
337 cfg_metrics!(
338     use crate::metrics::Metrics;
339     impl Runtime {
340         /// User can get some message from Runtime during running.
341         ///
342         /// # Example
343         /// ```
344         /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
345         /// let _metrics = runtime.metrics();
346         /// ```
347         pub fn metrics(&self) -> Metrics {
348             Metrics::new(self)
349         }
350     }
351 
352     /// Gets metrics of the global Runtime.
353     /// # Example
354     /// ```
355     /// use ylong_runtime::executor::get_global_runtime_metrics;
356     ///
357     /// let metrics = get_global_runtime_metrics();
358     /// ```
359     pub fn get_global_runtime_metrics() -> Metrics<'static> {
360         Metrics::new(global_default_async())
361     }
362 );
363 
364 #[cfg(test)]
365 mod test {
366 
367     /// UT test cases for block_on inside a spawn
368     ///
369     /// # Brief
370     /// 1. Call block_on inside a spawn
371     /// 2. Check if the test panics
372     #[should_panic]
373     #[test]
ut_block_on_panic_in_spawn()374     fn ut_block_on_panic_in_spawn() {
375         let handle = crate::spawn(async move {
376             let ret = crate::block_on(async move { 1 });
377             assert_eq!(ret, 1);
378         });
379         crate::block_on(handle).unwrap();
380     }
381 
382     /// UT test cases for new_timer_timeout
383     ///
384     /// # Brief
385     /// 1. Call block inside another block_on
386     /// 2. Check if the test panics
387     #[should_panic]
388     #[test]
ut_block_on_panic_in_block_on()389     fn ut_block_on_panic_in_block_on() {
390         crate::block_on(async move {
391             let ret = crate::block_on(async move { 1 });
392             assert_eq!(ret, 1);
393         });
394     }
395 }
396