1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::RefCell;
15 use std::future::Future;
16 use std::sync::atomic::AtomicBool;
17 use std::sync::atomic::Ordering::{Acquire, SeqCst};
18 use std::sync::{Arc, Condvar, Mutex, RwLock};
19 use std::time::Duration;
20 use std::{cmp, io, thread};
21
22 use super::driver::{Driver, Handle};
23 use super::parker::Parker;
24 use super::queue::{GlobalQueue, LocalQueue, LOCAL_QUEUE_CAP};
25 use super::sleeper::Sleeper;
26 use super::worker::{get_current_ctx, run_worker, Worker};
27 use super::{worker, Schedule};
28 use crate::builder::multi_thread_builder::MultiThreadBuilder;
29 use crate::builder::CallbackHook;
30 use crate::executor::worker::WorkerContext;
31 use crate::fastrand::fast_random;
32 use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType};
33 #[cfg(not(target_os = "macos"))]
34 use crate::util::core_affinity::set_current_affinity;
35 use crate::util::num_cpus::get_cpu_num;
36
37 const ASYNC_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3);
38 pub(crate) const GLOBAL_POLL_INTERVAL: u8 = 61;
39
40 pub(crate) struct MultiThreadScheduler {
41 /// Async pool shutdown state
42 is_cancel: AtomicBool,
43 /// Number of total workers
44 pub(crate) num_workers: usize,
45 /// Join Handles for all threads in the executor
46 handles: RwLock<Vec<Parker>>,
47 /// Used for idle and wakeup logic.
48 pub(crate) sleeper: Sleeper,
49 /// The global queue of the executor
50 pub(crate) global: GlobalQueue,
51 /// A set of all the local queues in the executor
52 locals: Vec<LocalQueue>,
53 pub(crate) handle: Arc<Handle>,
54 #[cfg(feature = "metrics")]
55 steal_times: std::sync::atomic::AtomicU64,
56 }
57
58 impl Schedule for MultiThreadScheduler {
59 #[inline]
schedule(&self, task: Task, lifo: bool)60 fn schedule(&self, task: Task, lifo: bool) {
61 if self.enqueue(task, lifo) {
62 self.wake_up_rand_one(false);
63 }
64 }
65 }
66
67 impl MultiThreadScheduler {
new(thread_num: usize, handle: Arc<Handle>) -> Self68 pub(crate) fn new(thread_num: usize, handle: Arc<Handle>) -> Self {
69 let mut locals = Vec::new();
70 for _ in 0..thread_num {
71 locals.push(LocalQueue::new());
72 }
73
74 Self {
75 is_cancel: AtomicBool::new(false),
76 num_workers: thread_num,
77 handles: RwLock::new(Vec::new()),
78 sleeper: Sleeper::new(thread_num),
79 global: GlobalQueue::new(),
80 locals,
81 handle,
82 #[cfg(feature = "metrics")]
83 steal_times: std::sync::atomic::AtomicU64::new(0),
84 }
85 }
86
is_cancel(&self) -> bool87 pub(crate) fn is_cancel(&self) -> bool {
88 self.is_cancel.load(Acquire)
89 }
90
set_cancel(&self)91 pub(crate) fn set_cancel(&self) {
92 self.is_cancel.store(true, SeqCst);
93 }
94
cancel(&self)95 pub(crate) fn cancel(&self) {
96 self.set_cancel();
97 self.wake_up_all();
98 }
99
wake_up_all(&self)100 fn wake_up_all(&self) {
101 let join_handle = self.handles.read().unwrap();
102 for item in join_handle.iter() {
103 item.unpark(self.handle.clone());
104 }
105 }
106
107 #[inline]
is_parked(&self, worker_index: &usize) -> bool108 pub(crate) fn is_parked(&self, worker_index: &usize) -> bool {
109 self.sleeper.is_parked(worker_index)
110 }
111
is_waked_by_last_search(&self, idx: usize) -> bool112 pub(crate) fn is_waked_by_last_search(&self, idx: usize) -> bool {
113 let mut search_list = self.sleeper.wake_by_search.lock().unwrap();
114 let is_waked_by_last_search = search_list[idx];
115 search_list[idx] = false;
116 if is_waked_by_last_search {
117 self.sleeper.inc_searching_num();
118 return true;
119 }
120 false
121 }
122
wake_up_rand_one_if_last_search(&self)123 pub(crate) fn wake_up_rand_one_if_last_search(&self) {
124 if self.sleeper.dec_searching_num() {
125 self.wake_up_rand_one(true);
126 }
127 }
128
wake_up_rand_one(&self, last_search: bool)129 pub(crate) fn wake_up_rand_one(&self, last_search: bool) {
130 if let Some(index) = self.sleeper.pop_worker(last_search) {
131 // index is bounded by total worker num
132 self.handles
133 .read()
134 .unwrap()
135 .get(index)
136 .unwrap()
137 .unpark(self.handle.clone());
138 }
139 }
140
turn_to_sleep(&self, worker_inner: &mut worker::Inner, worker_index: usize)141 pub(crate) fn turn_to_sleep(&self, worker_inner: &mut worker::Inner, worker_index: usize) {
142 let is_last_search = if worker_inner.is_searching {
143 worker_inner.is_searching = false;
144 self.sleeper.dec_searching_num()
145 } else {
146 false
147 };
148 let is_last_active = self.sleeper.push_worker(worker_index);
149
150 if (is_last_search || is_last_active) && !self.has_no_work() {
151 self.wake_up_rand_one(true);
152 }
153 }
154
155 #[inline]
turn_from_sleep(&self, worker_index: &usize)156 pub(crate) fn turn_from_sleep(&self, worker_index: &usize) {
157 self.sleeper.pop_worker_by_id(worker_index);
158 }
159
create_local_queue(&self, index: usize) -> LocalQueue160 pub(crate) fn create_local_queue(&self, index: usize) -> LocalQueue {
161 // this index is bounded by total worker num
162 let local_run_queue = self.locals.get(index).unwrap();
163 LocalQueue {
164 inner: local_run_queue.inner.clone(),
165 }
166 }
167
has_no_work(&self) -> bool168 pub(crate) fn has_no_work(&self) -> bool {
169 // check if local queues are empty
170 for index in 0..self.num_workers {
171 // this index is bounded by total worker num
172 let item = self.locals.get(index).unwrap();
173 if !item.is_empty() {
174 return false;
175 }
176 }
177 // then check is global queue empty
178 self.global.is_empty()
179 }
180
181 // The returned value indicates whether or not to wake up another worker
enqueue_under_ctx(&self, mut task: Task, worker_ctx: &WorkerContext, lifo: bool) -> bool182 fn enqueue_under_ctx(&self, mut task: Task, worker_ctx: &WorkerContext, lifo: bool) -> bool {
183 // if the current context is another runtime, push it to the global queue
184 if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) {
185 self.global.push_back(task);
186 return true;
187 }
188
189 if lifo {
190 let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
191 let prev_task = lifo_slot.take();
192 if let Some(prev) = prev_task {
193 // there is some task in lifo slot, therefore we put the prev task
194 // into run queue, and put the current task into the lifo slot
195 *lifo_slot = Some(task);
196 task = prev;
197 } else {
198 // there is no task in lifo slot, return immediately
199 *lifo_slot = Some(task);
200 return false;
201 }
202 }
203
204 // this index is bounded by total worker num
205 let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap();
206 local_run_queue.push_back(task, &self.global);
207 true
208 }
209
210 // The returned value indicates whether or not to wake up another worker
211 // We need to wake another worker under these circumstances:
212 // 1. The task has been inserted into the global queue
213 // 2. The lifo slot is taken, we push the old task into the local queue
enqueue(&self, task: Task, lifo: bool) -> bool214 pub(crate) fn enqueue(&self, task: Task, lifo: bool) -> bool {
215 let cur_worker = get_current_ctx();
216
217 // currently we are inside a runtime's context
218 if let Some(worker_ctx) = cur_worker {
219 return self.enqueue_under_ctx(task, worker_ctx, lifo);
220 }
221
222 // If the local queue of the current worker is full, push the task into the
223 // global queue
224 self.global.push_back(task);
225 true
226 }
227
228 // gets task from the global queue or the thread's own local queue
get_task_from_queues(&self, worker_inner: &mut worker::Inner) -> Option<Task>229 fn get_task_from_queues(&self, worker_inner: &mut worker::Inner) -> Option<Task> {
230 let count = worker_inner.count;
231 let local_run_queue = &worker_inner.run_queue;
232
233 // For every 61 times of execution, dequeue a task from the global queue first.
234 // Otherwise, dequeue a task from the local queue. However, if the local queue
235 // has no task, dequeue a task from the global queue instead.
236 if count % GLOBAL_POLL_INTERVAL as u32 == 0 {
237 let mut limit = local_run_queue.remaining() as usize;
238 // If the local queue is empty, multiple tasks are stolen from the global queue
239 // to the local queue. If the local queue has tasks, only dequeue one task from
240 // the global queue and run it.
241 if limit != LOCAL_QUEUE_CAP {
242 limit = 0;
243 }
244 let task = self
245 .global
246 .pop_batch(self.num_workers, local_run_queue, limit);
247 match task {
248 Some(task) => Some(task),
249 None => local_run_queue.pop_front(),
250 }
251 } else {
252 let local_task = local_run_queue.pop_front();
253 match local_task {
254 Some(task) => Some(task),
255 None => {
256 let limit = local_run_queue.remaining() as usize;
257 self.global
258 .pop_batch(self.num_workers, local_run_queue, limit)
259 }
260 }
261 }
262 }
263
get_task_from_searching(&self, worker_inner: &mut worker::Inner) -> Option<Task>264 fn get_task_from_searching(&self, worker_inner: &mut worker::Inner) -> Option<Task> {
265 const STEAL_TIME: usize = 3;
266
267 // There is no task in the local queue or the global queue, so we try to steal
268 // tasks from another worker's local queue.
269 // The number of stealing workers should be less than half of the total worker
270 // number.
271 // Only increases the searching number only when the worker is not searching
272 if !worker_inner.is_searching && !self.sleeper.try_inc_searching_num() {
273 return None;
274 }
275
276 worker_inner.is_searching = true;
277
278 let local_run_queue = &worker_inner.run_queue;
279 for i in 0..STEAL_TIME {
280 if let Some(task) = self.steal(local_run_queue) {
281 return Some(task);
282 }
283 if i < STEAL_TIME - 1 {
284 thread::sleep(Duration::from_micros(1));
285 }
286 }
287
288 None
289 }
290
dequeue( &self, worker_inner: &mut worker::Inner, worker_ctx: &WorkerContext, ) -> Option<Task>291 pub(crate) fn dequeue(
292 &self,
293 worker_inner: &mut worker::Inner,
294 worker_ctx: &WorkerContext,
295 ) -> Option<Task> {
296 // dequeues from the global queue or the thread's own local queue
297 if let Some(task) = self.get_task_from_queues(worker_inner) {
298 return Some(task);
299 }
300
301 if let Ok(mut driver) = worker_inner.parker.get_driver().try_lock() {
302 driver.run_once();
303 }
304 worker_ctx.wake_yield();
305 if !worker_inner.run_queue.is_empty() {
306 return None;
307 }
308
309 self.get_task_from_searching(worker_inner)
310 }
311
steal(&self, destination: &LocalQueue) -> Option<Task>312 fn steal(&self, destination: &LocalQueue) -> Option<Task> {
313 let num = self.locals.len();
314 let start = (fast_random() >> 56) as usize;
315
316 for i in 0..num {
317 let i = (start + i) % num;
318 // skip the current worker's local queue
319 // this index is bounded by total worker num
320 let target = self.locals.get(i).unwrap();
321
322 if std::ptr::eq(target, destination) {
323 continue;
324 }
325
326 if let Some(task) = target.steal_into(destination) {
327 #[cfg(feature = "metrics")]
328 self.steal_times
329 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
330
331 return Some(task);
332 }
333 }
334
335 // if there is no task to steal, we check global queue for tasks
336 self.global.pop_batch(
337 self.num_workers,
338 destination,
339 destination.remaining() as usize,
340 )
341 }
342
343 cfg_metrics!(
344 pub(crate) fn get_handles(&self) -> &RwLock<Vec<Parker>> {
345 &self.handles
346 }
347
348 pub(crate) fn get_steal_times(&self) -> u64 {
349 self.steal_times.load(Acquire)
350 }
351 );
352 }
353
354 #[derive(Clone)]
355 pub(crate) struct AsyncPoolSpawner {
356 pub(crate) inner: Arc<Inner>,
357
358 pub(crate) exe_mng_info: Arc<MultiThreadScheduler>,
359 }
360
361 impl Drop for AsyncPoolSpawner {
drop(&mut self)362 fn drop(&mut self) {
363 self.release()
364 }
365 }
366
367 pub(crate) struct Inner {
368 /// Number of total threads
369 pub(crate) total: usize,
370 /// Core-affinity setting of the threads
371 #[cfg_attr(target_os = "macos", allow(unused))]
372 is_affinity: bool,
373 /// Handle for shutting down the pool
374 shutdown_handle: Arc<(Mutex<usize>, Condvar)>,
375 /// A callback func to be called after thread starts
376 after_start: Option<CallbackHook>,
377 /// A callback func to be called before thread stops
378 before_stop: Option<CallbackHook>,
379 /// Name of the worker threads
380 worker_name: Option<String>,
381 /// Stack size of each thread
382 stack_size: Option<usize>,
383 /// Workers
384 #[cfg(feature = "metrics")]
385 workers: Mutex<Vec<Arc<Worker>>>,
386 }
387
get_cpu_core() -> usize388 fn get_cpu_core() -> usize {
389 cmp::max(1, get_cpu_num() as usize)
390 }
391
async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>)392 fn async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>) {
393 if let Some(f) = inner.after_start.clone() {
394 f();
395 }
396
397 run_worker(worker, handle);
398 let (lock, cvar) = &*(inner.shutdown_handle.clone());
399 let mut finished = lock.lock().unwrap();
400 *finished += 1;
401
402 // the last thread wakes up the main thread
403 if *finished >= inner.total {
404 cvar.notify_one();
405 }
406
407 if let Some(f) = inner.before_stop.clone() {
408 f();
409 }
410 }
411
412 impl AsyncPoolSpawner {
new(builder: &MultiThreadBuilder) -> io::Result<Self>413 pub(crate) fn new(builder: &MultiThreadBuilder) -> io::Result<Self> {
414 let (handle, driver) = Driver::initialize();
415
416 let thread_num = builder.core_thread_size.unwrap_or_else(get_cpu_core);
417 let spawner = AsyncPoolSpawner {
418 inner: Arc::new(Inner {
419 total: thread_num,
420 is_affinity: builder.common.is_affinity,
421 shutdown_handle: Arc::new((Mutex::new(0), Condvar::new())),
422 after_start: builder.common.after_start.clone(),
423 before_stop: builder.common.before_stop.clone(),
424 worker_name: builder.common.worker_name.clone(),
425 stack_size: builder.common.stack_size,
426 #[cfg(feature = "metrics")]
427 workers: Mutex::new(Vec::with_capacity(thread_num)),
428 }),
429 exe_mng_info: Arc::new(MultiThreadScheduler::new(thread_num, handle)),
430 };
431 spawner.create_async_thread_pool(driver)?;
432 Ok(spawner)
433 }
434
create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()>435 fn create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()> {
436 let mut workers = vec![];
437 for index in 0..self.inner.total {
438 let local_queue = self.exe_mng_info.create_local_queue(index);
439 let local_run_queue =
440 Box::new(worker::Inner::new(local_queue, Parker::new(driver.clone())));
441 workers.push(Arc::new(Worker {
442 index,
443 scheduler: self.exe_mng_info.clone(),
444 inner: RefCell::new(local_run_queue),
445 lifo: RefCell::new(None),
446 yielded: RefCell::new(Vec::new()),
447 }))
448 }
449
450 for (worker_id, worker) in workers.drain(..).enumerate() {
451 let work_arc_handle = self.exe_mng_info.handle.clone();
452 #[cfg(feature = "metrics")]
453 self.inner.workers.lock().unwrap().push(worker.clone());
454 // set up thread attributes
455 let mut builder = thread::Builder::new();
456
457 if let Some(worker_name) = self.inner.worker_name.clone() {
458 builder = builder.name(format!("async-{worker_id}-{worker_name}"));
459 } else {
460 builder = builder.name(format!("async-{worker_id}"));
461 }
462
463 if let Some(stack_size) = self.inner.stack_size {
464 builder = builder.stack_size(stack_size);
465 }
466
467 let parker = worker.inner.borrow().parker.clone();
468 self.exe_mng_info.handles.write().unwrap().push(parker);
469
470 let inner = self.inner.clone();
471
472 #[cfg(not(target_os = "macos"))]
473 if self.inner.is_affinity {
474 builder.spawn(move || {
475 let cpu_core_num = get_cpu_core();
476 let cpu_id = worker_id % cpu_core_num;
477 let _ = set_current_affinity(cpu_id);
478 async_thread_proc(inner, worker, work_arc_handle);
479 })?;
480 } else {
481 builder.spawn(move || {
482 async_thread_proc(inner, worker, work_arc_handle);
483 })?;
484 }
485
486 #[cfg(target_os = "macos")]
487 builder.spawn(move || {
488 async_thread_proc(inner, worker, work_arc_handle);
489 })?;
490 }
491 Ok(())
492 }
493
spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,494 pub(crate) fn spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output>
495 where
496 T: Future + Send + 'static,
497 T::Output: Send + 'static,
498 {
499 let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
500 let (task, join_handle) =
501 Task::create_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
502
503 self.exe_mng_info.schedule(task, false);
504 join_handle
505 }
506
507 /// # Safety
508 /// Users need to guarantee that the future will remember lifetime and thus
509 /// compiler will capture lifetime issues, or the future will complete
510 /// when its context remains valid. If not, currently
511 /// runtime initialization will cause memory error.
512 ///
513 /// ## Memory issue example
514 /// No matter using which type (current / multi thread) of runtime, the
515 /// following code can compile. When the variable `slice` gets released
516 /// when the function ends, any handles returned from this function rely
517 /// on a dangled pointer.
518 ///
519 /// ```no run
520 /// fn err_example(runtime: &Runtime) -> JoinHandle<()> {
521 /// let builder = TaskBuilder::default();
522 /// let mut slice = [1, 2, 3, 4, 5];
523 /// let borrow = &mut slice;
524 /// match &runtime.async_spawner {
525 /// AsyncHandle::CurrentThread(pool) => {
526 /// pool.spawn_with_ref(
527 /// &builder,
528 /// async { borrow.iter_mut().for_each(|x| *x *= 2) }
529 /// )
530 /// }
531 /// AsyncHandle::MultiThread(pool) => {
532 /// pool.spawn_with_ref(
533 /// &builder,
534 /// async { borrow.iter_mut().for_each(|x| *x *= 2) }
535 /// )
536 /// }
537 /// }
538 /// }
539 ///
540 /// let runtime = Runtime::new().unwrap();
541 /// let handle = spawn_blocking(
542 /// move || block_on(err_example(&runtime)).unwrap()
543 /// );
544 /// ```
spawn_with_ref<T>( &self, builder: &TaskBuilder, task: T, ) -> JoinHandle<T::Output> where T: Future + Send, T::Output: Send,545 pub(crate) unsafe fn spawn_with_ref<T>(
546 &self,
547 builder: &TaskBuilder,
548 task: T,
549 ) -> JoinHandle<T::Output>
550 where
551 T: Future + Send,
552 T::Output: Send,
553 {
554 let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
555 let raw_task = Task::create_raw_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
556 let handle = JoinHandle::new(raw_task);
557 let task = Task(raw_task);
558 self.exe_mng_info.schedule(task, false);
559 handle
560 }
561
562 /// Waits 3 seconds for threads to finish before releasing the async pool.
563 /// If threads could not finish before releasing, there could be possible
564 /// memory leak.
release_wait(&self) -> Result<(), ()>565 fn release_wait(&self) -> Result<(), ()> {
566 self.exe_mng_info.cancel();
567 let pair = self.inner.shutdown_handle.clone();
568 let total = self.inner.total;
569 let (lock, cvar) = &*pair;
570 let finished = lock.lock().unwrap();
571 let res = cvar
572 .wait_timeout_while(finished, ASYNC_THREAD_QUIT_WAIT_TIME, |&mut finished| {
573 finished < total
574 })
575 .unwrap();
576 // if time limit has been reached, the unfinished threads would not get released
577 if res.1.timed_out() {
578 Err(())
579 } else {
580 Ok(())
581 }
582 }
583
release(&self)584 pub(crate) fn release(&self) {
585 if let Ok(()) = self.release_wait() {
586 let mut join_handle = self.exe_mng_info.handles.write().unwrap();
587 #[allow(clippy::mem_replace_with_default)]
588 let mut worker_handles = std::mem::replace(join_handle.as_mut(), vec![]);
589 drop(join_handle);
590 for parker in worker_handles.drain(..) {
591 parker.release();
592 }
593 }
594 }
595
596 #[cfg(feature = "metrics")]
get_worker(&self, index: usize) -> Result<Arc<Worker>, ()>597 pub(crate) fn get_worker(&self, index: usize) -> Result<Arc<Worker>, ()> {
598 let vec = self.inner.workers.lock().unwrap();
599 for worker in vec.iter() {
600 if worker.index == index {
601 return Ok(worker.clone());
602 }
603 }
604 Err(())
605 }
606 }
607
608 #[cfg(test)]
609 pub(crate) mod test {
610 use std::future::Future;
611 use std::pin::Pin;
612 use std::sync::atomic::Ordering::{Acquire, Release};
613 use std::sync::mpsc::channel;
614 use std::sync::{Arc, Mutex};
615 use std::task::{Context, Poll};
616 use std::thread;
617
618 use crate::builder::RuntimeBuilder;
619 use crate::executor::async_pool::{get_cpu_core, AsyncPoolSpawner, MultiThreadScheduler};
620 use crate::executor::driver::Driver;
621 use crate::executor::parker::Parker;
622 use crate::executor::queue::LocalQueue;
623 use crate::executor::{worker, Schedule};
624 use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType};
625
626 pub struct TestFuture {
627 value: usize,
628 total: usize,
629 }
630
create_new() -> TestFuture631 pub fn create_new() -> TestFuture {
632 TestFuture {
633 value: 0,
634 total: 1000,
635 }
636 }
637
638 impl Future for TestFuture {
639 type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>640 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
641 if self.total > self.value {
642 self.get_mut().value += 1;
643 cx.waker().wake_by_ref();
644 Poll::Pending
645 } else {
646 Poll::Ready(self.total)
647 }
648 }
649 }
650
test_future() -> usize651 async fn test_future() -> usize {
652 create_new().await
653 }
654
655 /// UT test cases for ExecutorMngInfo::new()
656 ///
657 /// # Brief
658 /// 1. Creates a ExecutorMsgInfo with thread number 1
659 /// 2. Creates a ExecutorMsgInfo with thread number 2
660 #[test]
ut_executor_mng_info_new_001()661 fn ut_executor_mng_info_new_001() {
662 let (arc_handle, _) = Driver::initialize();
663 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
664 assert!(!executor_mng_info.is_cancel.load(Acquire));
665 assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
666
667 let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
668 assert!(!executor_mng_info.is_cancel.load(Acquire));
669 assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
670 }
671
672 /// UT test cases for ExecutorMngInfo::create_local_queues()
673 ///
674 /// # Brief
675 /// 1. index set to 0, check the return value
676 /// 2. index set to ExecutorMngInfo.inner.total, check the return value
677 #[test]
ut_executor_mng_info_create_local_queues()678 fn ut_executor_mng_info_create_local_queues() {
679 let (arc_handle, _) = Driver::initialize();
680 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
681 let local_run_queue_info = executor_mng_info.create_local_queue(0);
682 assert!(local_run_queue_info.is_empty());
683
684 let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
685 let local_run_queue_info = executor_mng_info.create_local_queue(63);
686 assert!(local_run_queue_info.is_empty());
687 }
688
create_task<T, S>( builder: &TaskBuilder, scheduler: std::sync::Weak<S>, task: T, virtual_table_type: VirtualTableType, ) -> (Task, JoinHandle<T::Output>) where T: Future + Send + 'static, T::Output: Send + 'static, S: Schedule,689 pub(crate) fn create_task<T, S>(
690 builder: &TaskBuilder,
691 scheduler: std::sync::Weak<S>,
692 task: T,
693 virtual_table_type: VirtualTableType,
694 ) -> (Task, JoinHandle<T::Output>)
695 where
696 T: Future + Send + 'static,
697 T::Output: Send + 'static,
698 S: Schedule,
699 {
700 let (task, handle) = Task::create_task(builder, scheduler, task, virtual_table_type);
701 (task, handle)
702 }
703
704 /// UT test cases for ExecutorMngInfo::enqueue()
705 ///
706 /// # Brief
707 /// 1. index set to 0, check the return value
708 /// 2. index set to ExecutorMngInfo.inner.total, check the return value
709 #[test]
ut_executor_mng_info_enqueue()710 fn ut_executor_mng_info_enqueue() {
711 let (arc_handle, _) = Driver::initialize();
712 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
713
714 let builder = TaskBuilder::new();
715 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
716 let (task, _) = create_task(
717 &builder,
718 exe_scheduler,
719 test_future(),
720 VirtualTableType::Ylong,
721 );
722
723 executor_mng_info.enqueue(task, true);
724 assert!(!executor_mng_info.has_no_work());
725 }
726
727 /// UT test cases for ExecutorMngInfo::is_cancel()
728 ///
729 /// # Brief
730 /// 1. The is_cancel value is set to true to check the return value
731 /// 2. The is_cancel value is set to false to check the return value
732 #[test]
ut_executor_mng_info_is_cancel()733 fn ut_executor_mng_info_is_cancel() {
734 let (arc_handle, _) = Driver::initialize();
735 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
736 executor_mng_info.is_cancel.store(false, Release);
737 assert!(!executor_mng_info.is_cancel());
738 executor_mng_info.is_cancel.store(true, Release);
739 assert!(executor_mng_info.is_cancel());
740 }
741
742 /// UT test cases for ExecutorMngInfo::set_cancel()
743 ///
744 /// # Brief
745 /// 1. Check if the is_cancel parameter becomes true after set_cancel
746 #[test]
ut_executor_mng_info_set_cancel()747 fn ut_executor_mng_info_set_cancel() {
748 let (arc_handle, _) = Driver::initialize();
749 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
750 assert!(!executor_mng_info.is_cancel.load(Acquire));
751 executor_mng_info.set_cancel();
752 assert!(executor_mng_info.is_cancel.load(Acquire));
753 }
754
755 /// UT test cases for ExecutorMngInfo::cancel()
756 ///
757 /// # Brief
758 /// 1. Check if the is_cancel parameter becomes true after set_cancel
759 #[test]
ut_executor_mng_info_cancel()760 fn ut_executor_mng_info_cancel() {
761 let (arc_handle, arc_driver) = Driver::initialize();
762 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
763
764 let flag = Arc::new(Mutex::new(0));
765 let (tx, rx) = channel();
766
767 let (flag_clone, tx) = (flag.clone(), tx);
768
769 let mut parker = Parker::new(arc_driver);
770 let parker_cpy = parker.clone();
771 let _ = thread::spawn(move || {
772 parker.park();
773 *flag_clone.lock().unwrap() = 1;
774 tx.send(()).unwrap()
775 });
776 executor_mng_info.handles.write().unwrap().push(parker_cpy);
777
778 executor_mng_info.cancel();
779 rx.recv().unwrap();
780 assert_eq!(*flag.lock().unwrap(), 1);
781 }
782
783 /// UT test cases for ExecutorMngInfo::wake_up_all()
784 ///
785 /// # Brief
786 /// 1. Constructs an environment to check if all threads are woken up and
787 /// executed via thread hooks.
788 #[test]
ut_executor_mng_info_wake_up_all()789 fn ut_executor_mng_info_wake_up_all() {
790 let (arc_handle, arc_driver) = Driver::initialize();
791 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
792
793 let flag = Arc::new(Mutex::new(0));
794 let (tx, rx) = channel();
795
796 let (flag_clone, tx) = (flag.clone(), tx);
797
798 let mut parker = Parker::new(arc_driver);
799 let parker_cpy = parker.clone();
800
801 let _ = thread::spawn(move || {
802 parker.park();
803 *flag_clone.lock().unwrap() = 1;
804 tx.send(()).unwrap()
805 });
806
807 executor_mng_info.handles.write().unwrap().push(parker_cpy);
808
809 executor_mng_info.wake_up_all();
810 rx.recv().unwrap();
811 assert_eq!(*flag.lock().unwrap(), 1);
812 }
813
814 /// UT test cases for ExecutorMngInfo::wake_up_rand_one()
815 ///
816 /// # Brief
817 /// 1. Constructs an environment to check if a thread is woken up and
818 /// executed by a thread hook.
819 #[test]
ut_executor_mng_info_wake_up_rand_one()820 fn ut_executor_mng_info_wake_up_rand_one() {
821 let (arc_handle, arc_driver) = Driver::initialize();
822 let mut parker = Parker::new(arc_driver);
823 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
824 let local_queue = LocalQueue {
825 inner: executor_mng_info.locals[0].inner.clone(),
826 };
827 let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
828 worker_inner.is_searching = true;
829 executor_mng_info.sleeper.inc_searching_num();
830 executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
831
832 let flag = Arc::new(Mutex::new(0));
833 let (tx, rx) = channel();
834
835 let (flag_clone, tx) = (flag.clone(), tx);
836 let parker_cpy = parker.clone();
837
838 let _ = thread::spawn(move || {
839 parker.park();
840 *flag_clone.lock().unwrap() = 1;
841 tx.send(()).unwrap()
842 });
843
844 executor_mng_info.handles.write().unwrap().push(parker_cpy);
845 executor_mng_info.wake_up_rand_one(false);
846 rx.recv().unwrap();
847 assert_eq!(*flag.lock().unwrap(), 1);
848 assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
849 }
850
851 /// UT test cases for ExecutorMngInfo::wake_up_if_one_task_left()
852 ///
853 /// # Brief
854 /// 1. Constructs the environment, checks if there are still tasks, and if
855 /// so, wakes up a thread to continue working.
856 #[test]
ut_executor_mng_info_wake_up_if_one_task_left()857 fn ut_executor_mng_info_wake_up_if_one_task_left() {
858 let (arc_handle, arc_driver) = Driver::initialize();
859 let mut parker = Parker::new(arc_driver);
860 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
861
862 let local_queue = LocalQueue {
863 inner: executor_mng_info.locals[0].inner.clone(),
864 };
865 let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
866 executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
867
868 let flag = Arc::new(Mutex::new(0));
869 let (tx, rx) = channel();
870
871 let (flag_clone, tx) = (flag.clone(), tx);
872 let parker_cpy = parker.clone();
873
874 let _ = thread::spawn(move || {
875 parker.park();
876 *flag_clone.lock().unwrap() = 1;
877 tx.send(()).unwrap()
878 });
879
880 executor_mng_info.handles.write().unwrap().push(parker_cpy);
881
882 let builder = TaskBuilder::new();
883 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
884 let (task, _) = create_task(
885 &builder,
886 exe_scheduler,
887 test_future(),
888 VirtualTableType::Ylong,
889 );
890
891 executor_mng_info.enqueue(task, true);
892
893 if !executor_mng_info.has_no_work() {
894 executor_mng_info.wake_up_rand_one(false);
895 }
896
897 rx.recv().unwrap();
898 assert_eq!(*flag.lock().unwrap(), 1);
899 assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
900 }
901
902 /// UT test cases for ExecutorMngInfo::from_woken_to_sleep()
903 ///
904 /// # Brief
905 /// 1. Construct the environment and set the state of the specified thread
906 /// to park state. If the last thread is in park state, check whether
907 /// there is a task, and if so, wake up this thread.
908 #[test]
ut_executor_mng_info_from_woken_to_sleep()909 fn ut_executor_mng_info_from_woken_to_sleep() {
910 let (arc_handle, arc_driver) = Driver::initialize();
911 let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
912
913 let flag = Arc::new(Mutex::new(0));
914 let (tx, rx) = channel();
915 let (flag_clone, tx) = (flag.clone(), tx);
916
917 let mut parker = Parker::new(arc_driver);
918
919 let local_queue = LocalQueue {
920 inner: executor_mng_info.locals[0].inner.clone(),
921 };
922 let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
923 worker_inner.is_searching = true;
924 executor_mng_info.sleeper.inc_searching_num();
925
926 let parker_cpy = parker.clone();
927
928 let _ = thread::spawn(move || {
929 parker.park();
930 *flag_clone.lock().unwrap() = 1;
931 tx.send(()).unwrap()
932 });
933
934 executor_mng_info.handles.write().unwrap().push(parker_cpy);
935
936 let builder = TaskBuilder::new();
937 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
938 let (task, _) = create_task(
939 &builder,
940 exe_scheduler,
941 test_future(),
942 VirtualTableType::Ylong,
943 );
944
945 executor_mng_info.enqueue(task, true);
946 executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
947 rx.recv().unwrap();
948 assert_eq!(*flag.lock().unwrap(), 1);
949 assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
950 }
951
952 /// UT test cases for AsyncPoolSpawner::new()
953 ///
954 /// # Brief
955 /// 1. Verify the parameters of the initialization completion
956 #[test]
ut_async_pool_spawner_new()957 fn ut_async_pool_spawner_new() {
958 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
959 let async_pool_spawner = AsyncPoolSpawner::new(&thread_pool_builder).unwrap();
960 assert_eq!(
961 async_pool_spawner.inner.total,
962 thread_pool_builder
963 .core_thread_size
964 .unwrap_or_else(get_cpu_core)
965 );
966 assert_eq!(
967 async_pool_spawner.inner.worker_name,
968 thread_pool_builder.common.worker_name
969 );
970 assert_eq!(
971 async_pool_spawner.inner.stack_size,
972 thread_pool_builder.common.stack_size
973 );
974 assert!(!async_pool_spawner.exe_mng_info.is_cancel.load(Acquire));
975 }
976
977 /// UT test cases for `create_async_thread_pool`.
978 ///
979 /// # Brief
980 /// 1. Create an async_pool_spawner with `is_affinity` setting to false
981 /// 2. Call create_async_thread_pool()
982 /// 3. This UT should not panic
983 #[test]
ut_async_pool_spawner_create_async_thread_pool_001()984 fn ut_async_pool_spawner_create_async_thread_pool_001() {
985 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
986 let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(false)).unwrap();
987 }
988
989 /// UT test cases for `UnboundedSender`.
990 ///
991 /// # Brief
992 /// 1. Create an async_pool_spawner with `is_affinity` setting to true
993 /// 2. Call create_async_thread_pool()
994 /// 3. This UT should not panic
995 #[test]
ut_async_pool_spawner_create_async_thread_pool_002()996 fn ut_async_pool_spawner_create_async_thread_pool_002() {
997 let thread_pool_builder = RuntimeBuilder::new_multi_thread();
998 let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(true)).unwrap();
999 }
1000 }
1001