1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Executor contains two parts:
15 //! - thread pool: how threads are started and how they run the tasks.
16 //! - schedule policy: how tasks are scheduled in the task queues.
17 pub(crate) mod block_on;
18 #[cfg(feature = "current_thread_runtime")]
19 pub(crate) mod current_thread;
20
21 use std::future::Future;
22 use std::mem::MaybeUninit;
23 use std::sync::Once;
24
25 use crate::builder::multi_thread_builder::GLOBAL_BUILDER;
26 use crate::builder::RuntimeBuilder;
27 #[cfg(feature = "current_thread_runtime")]
28 use crate::executor::current_thread::CurrentThreadSpawner;
29 use crate::task::{JoinHandle, Task, TaskBuilder};
30 mod driver_handle;
31 pub(crate) use driver_handle::Handle;
32
33 use crate::executor::worker::WorkerHandle;
34 cfg_ffrt! {
35 use crate::ffrt::spawner::spawn;
36 }
37 cfg_not_ffrt! {
38 mod parker;
39 pub(crate) mod async_pool;
40 pub(crate) mod blocking_pool;
41 pub(crate) mod queue;
42 mod sleeper;
43 pub(crate) mod worker;
44 pub(crate) mod driver;
45 use crate::builder::{initialize_blocking_spawner, initialize_async_spawner};
46 use crate::executor::async_pool::AsyncPoolSpawner;
47 use crate::executor::blocking_pool::BlockPoolSpawner;
48 }
49
50 pub(crate) trait Schedule {
schedule(&self, task: Task, lifo: bool)51 fn schedule(&self, task: Task, lifo: bool);
52 }
53
54 pub(crate) struct PlaceholderScheduler;
55
56 impl Schedule for PlaceholderScheduler {
schedule(&self, _task: Task, _lifo: bool)57 fn schedule(&self, _task: Task, _lifo: bool) {
58 panic!("no scheduler can be called");
59 }
60 }
61
62 pub(crate) enum AsyncHandle {
63 #[cfg(feature = "current_thread_runtime")]
64 CurrentThread(CurrentThreadSpawner),
65 #[cfg(not(feature = "ffrt"))]
66 MultiThread(AsyncPoolSpawner),
67 #[cfg(feature = "ffrt")]
68 FfrtMultiThread,
69 }
70
71 /// Runtime struct.
72 ///
73 /// # If `multi_instance_runtime` feature is turned on
74 /// There will be multiple runtime executors, initializing from user settings in
75 /// `RuntimeBuilder`.
76 ///
77 /// # If `multi_instance_runtime` feature is turned off
78 /// There will be only *ONE* runtime executor singleton inside one process.
79 /// The async and blocking pools working when calling methods of this struct are
80 /// stored in the global static executor instance. Here, keep the empty struct
81 /// for compatibility and possibility for function extension in the future.
82 pub struct Runtime {
83 pub(crate) async_spawner: AsyncHandle,
84 }
85
86 #[cfg(not(feature = "ffrt"))]
87 impl Runtime {
get_handle(&self) -> std::sync::Arc<Handle>88 pub(crate) fn get_handle(&self) -> std::sync::Arc<Handle> {
89 match &self.async_spawner {
90 #[cfg(feature = "current_thread_runtime")]
91 AsyncHandle::CurrentThread(s) => s.handle.clone(),
92 AsyncHandle::MultiThread(s) => s.exe_mng_info.handle.clone(),
93 }
94 }
95 }
96
global_default_async() -> &'static Runtime97 pub(crate) fn global_default_async() -> &'static Runtime {
98 static mut GLOBAL_DEFAULT_ASYNC: MaybeUninit<Runtime> = MaybeUninit::uninit();
99 static ONCE: Once = Once::new();
100
101 unsafe {
102 ONCE.call_once(|| {
103 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
104
105 if global_builder.is_none() {
106 *global_builder = Some(RuntimeBuilder::new_multi_thread());
107 }
108
109 #[cfg(not(feature = "ffrt"))]
110 // we have just made sure the global builder is a some, so this unwrap_unchecked is safe
111 let runtime = match initialize_async_spawner(global_builder.as_ref().unwrap_unchecked())
112 {
113 Ok(s) => Runtime {
114 async_spawner: AsyncHandle::MultiThread(s),
115 },
116 Err(e) => panic!("initialize runtime failed: {e:?}"),
117 };
118 #[cfg(feature = "ffrt")]
119 let runtime = Runtime {
120 async_spawner: AsyncHandle::FfrtMultiThread,
121 };
122 GLOBAL_DEFAULT_ASYNC = MaybeUninit::new(runtime);
123 });
124 &*GLOBAL_DEFAULT_ASYNC.as_ptr()
125 }
126 }
127
128 #[cfg(not(feature = "ffrt"))]
global_default_blocking() -> &'static BlockPoolSpawner129 pub(crate) fn global_default_blocking() -> &'static BlockPoolSpawner {
130 static mut GLOBAL_DEFAULT_BLOCKING: MaybeUninit<BlockPoolSpawner> = MaybeUninit::uninit();
131 static ONCE: Once = Once::new();
132
133 unsafe {
134 ONCE.call_once(|| {
135 let mut global_builder = GLOBAL_BUILDER.lock().unwrap();
136
137 if global_builder.is_none() {
138 *global_builder = Some(RuntimeBuilder::new_multi_thread());
139 }
140 // we have just made sure the global builder is a some, so this unwrap_unchecked
141 // is safe
142 match initialize_blocking_spawner(&global_builder.as_ref().unwrap_unchecked().common) {
143 Ok(bps) => GLOBAL_DEFAULT_BLOCKING = MaybeUninit::new(bps),
144 Err(e) => panic!("initialize blocking pool failed: {e:?}"),
145 }
146 });
147 &*GLOBAL_DEFAULT_BLOCKING.as_ptr()
148 }
149 }
150
151 #[cfg(all(feature = "multi_instance_runtime", not(feature = "ffrt")))]
152 impl Runtime {
153 /// Creates a new multi-thread runtime with default setting
new() -> std::io::Result<Runtime>154 pub fn new() -> std::io::Result<Runtime> {
155 RuntimeBuilder::new_multi_thread().build()
156 }
157 }
158
159 impl Runtime {
160 /// Spawns a future onto the runtime, returning a [`JoinHandle`] for it.
161 ///
162 /// The future will be later polled by the executor, which is usually
163 /// implemented as a thread pool. The executor will run the future util
164 /// finished.
165 ///
166 /// Awaits on the JoinHandle will return the result of the future, but users
167 /// don't have to `.await` the `JoinHandle` in order to run the future,
168 /// since the future will be executed in the background once it's
169 /// spawned. Dropping the JoinHandle will throw away the returned value.
170 ///
171 /// # Examples
172 ///
173 /// ```no run
174 /// use ylong_runtime::task::*;
175 /// use ylong_runtime::builder::RuntimeBuilder;
176 /// use ylong_runtime::executor::Runtime;
177 ///
178 /// async fn test_future(num: usize) -> usize {
179 /// num
180 /// }
181 ///
182 /// let core_pool_size = 4;
183 ///
184 /// let runtime = RuntimeBuilder::new_multi_thread()
185 /// .worker_num(core_pool_size)
186 /// .build()
187 /// .unwrap();
188 ///
189 /// runtime.spawn(test_future(1));
190 /// ```
spawn<T, R>(&self, task: T) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,191 pub fn spawn<T, R>(&self, task: T) -> JoinHandle<R>
192 where
193 T: Future<Output = R> + Send + 'static,
194 R: Send + 'static,
195 {
196 self.spawn_with_attr(task, &TaskBuilder::default())
197 }
198
199 #[inline]
spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R> where T: Future<Output = R> + Send + 'static, R: Send + 'static,200 pub(crate) fn spawn_with_attr<T, R>(&self, task: T, builder: &TaskBuilder) -> JoinHandle<R>
201 where
202 T: Future<Output = R> + Send + 'static,
203 R: Send + 'static,
204 {
205 match &self.async_spawner {
206 #[cfg(feature = "current_thread_runtime")]
207 AsyncHandle::CurrentThread(current_thread) => current_thread.spawn(builder, task),
208 #[cfg(not(feature = "ffrt"))]
209 AsyncHandle::MultiThread(async_spawner) => async_spawner.spawn(builder, task),
210 #[cfg(feature = "ffrt")]
211 AsyncHandle::FfrtMultiThread => spawn(task, builder),
212 }
213 }
214
215 /// Spawns the provided function or closure onto the runtime.
216 ///
217 /// It's usually used for cpu-bounded computation that does not return
218 /// pending and takes a relatively long time.
219 ///
220 /// # Examples
221 ///
222 /// ```no run
223 /// use ylong_runtime::builder::RuntimeBuilder;
224 ///
225 /// use std::time;
226 /// use std::thread::sleep;
227 ///
228 /// let runtime = RuntimeBuilder::new_multi_thread()
229 /// .build()
230 /// .unwrap();
231 ///
232 /// runtime.spawn_blocking(move || {
233 /// sleep(time::Duration::from_millis(1));
234 /// 10
235 /// });
236 /// ```
spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R> where T: FnOnce() -> R + Send + 'static, R: Send + 'static,237 pub fn spawn_blocking<T, R>(&self, task: T) -> JoinHandle<R>
238 where
239 T: FnOnce() -> R + Send + 'static,
240 R: Send + 'static,
241 {
242 crate::spawn::spawn_blocking(&TaskBuilder::new(), task)
243 }
244
245 /// Blocks the current thread and runs the given future (usually a
246 /// JoinHandle) to completion, and gets its return value.
247 ///
248 /// Any code after the `block_on` will be executed once the future is done.
249 ///
250 /// # Panics
251 /// 1. This function panics if it gets called in a runtime asynchronous
252 /// context. To be specific, this function cannot be called inside
253 /// `ylong_runtime::block_on` or `ylong_runtime::spawn`.
254 /// 2. This function panics if the provided Future panics.
255 ///
256 /// # Examples
257 ///
258 /// ```no run
259 /// use ylong_runtime::builder::RuntimeBuilder;
260 ///
261 /// let core_pool_size = 4;
262 /// async fn test_future(num: usize) -> usize {
263 /// num
264 /// }
265 ///
266 /// let runtime = RuntimeBuilder::new_multi_thread()
267 /// .worker_num(core_pool_size)
268 /// .build()
269 /// .unwrap();
270 ///
271 /// let handle = runtime.spawn(test_future(4));
272 ///
273 /// let result = runtime.block_on(handle);
274 ///
275 /// assert_eq!(result.unwrap(), 4);
276 /// ```
block_on<T, R>(&self, task: T) -> R where T: Future<Output = R>,277 pub fn block_on<T, R>(&self, task: T) -> R
278 where
279 T: Future<Output = R>,
280 {
281 self.block_on_inner(task)
282 }
283
284 #[cfg(not(feature = "ffrt"))]
block_on_inner<T, R>(&self, task: T) -> R where T: Future<Output = R>,285 fn block_on_inner<T, R>(&self, task: T) -> R
286 where
287 T: Future<Output = R>,
288 {
289 worker::CURRENT_HANDLE.with(|ctx| {
290 if !ctx.get().is_null() {
291 panic!(
292 "Cannot block_on an asynchronous function in a runtime context. \
293 This happens because a block_on call tries to block the current \
294 thread which is being used to drive asynchronous tasks."
295 );
296 }
297 });
298
299 // Registers handle to the current thread when block_on().
300 // so that async_source can get the handle and register it.
301 let cur_context = worker::WorkerHandle {
302 _handle: self.get_handle(),
303 };
304
305 worker::CURRENT_HANDLE.with(|ctx| {
306 ctx.set((&cur_context as *const WorkerHandle).cast::<()>());
307 });
308
309 let ret = match &self.async_spawner {
310 #[cfg(feature = "current_thread_runtime")]
311 AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
312 AsyncHandle::MultiThread(_) => block_on::block_on(task),
313 };
314
315 // Sets the current thread variable to null,
316 // otherwise the worker's CURRENT_WORKER can not be set under MultiThread.
317 worker::CURRENT_HANDLE.with(|ctx| {
318 ctx.set(std::ptr::null());
319 });
320
321 ret
322 }
323
324 #[cfg(feature = "ffrt")]
block_on_inner<T, R>(&self, task: T) -> R where T: Future<Output = R>,325 fn block_on_inner<T, R>(&self, task: T) -> R
326 where
327 T: Future<Output = R>,
328 {
329 match &self.async_spawner {
330 #[cfg(feature = "current_thread_runtime")]
331 AsyncHandle::CurrentThread(current_thread) => current_thread.block_on(task),
332 AsyncHandle::FfrtMultiThread => block_on::block_on(task),
333 }
334 }
335 }
336
337 cfg_metrics!(
338 use crate::metrics::Metrics;
339 impl Runtime {
340 /// User can get some message from Runtime during running.
341 ///
342 /// # Example
343 /// ```
344 /// let runtime = ylong_runtime::builder::RuntimeBuilder::new_multi_thread().build().unwrap();
345 /// let _metrics = runtime.metrics();
346 /// ```
347 pub fn metrics(&self) -> Metrics {
348 Metrics::new(self)
349 }
350 }
351
352 /// Gets metrics of the global Runtime.
353 /// # Example
354 /// ```
355 /// use ylong_runtime::executor::get_global_runtime_metrics;
356 ///
357 /// let metrics = get_global_runtime_metrics();
358 /// ```
359 pub fn get_global_runtime_metrics() -> Metrics<'static> {
360 Metrics::new(global_default_async())
361 }
362 );
363
364 #[cfg(test)]
365 mod test {
366
367 /// UT test cases for block_on inside a spawn
368 ///
369 /// # Brief
370 /// 1. Call block_on inside a spawn
371 /// 2. Check if the test panics
372 #[should_panic]
373 #[test]
ut_block_on_panic_in_spawn()374 fn ut_block_on_panic_in_spawn() {
375 let handle = crate::spawn(async move {
376 let ret = crate::block_on(async move { 1 });
377 assert_eq!(ret, 1);
378 });
379 crate::block_on(handle).unwrap();
380 }
381
382 /// UT test cases for new_timer_timeout
383 ///
384 /// # Brief
385 /// 1. Call block inside another block_on
386 /// 2. Check if the test panics
387 #[should_panic]
388 #[test]
ut_block_on_panic_in_block_on()389 fn ut_block_on_panic_in_block_on() {
390 crate::block_on(async move {
391 let ret = crate::block_on(async move { 1 });
392 assert_eq!(ret, 1);
393 });
394 }
395 }
396