1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::RefCell;
15 use std::future::Future;
16 use std::sync::atomic::AtomicBool;
17 use std::sync::atomic::Ordering::{Acquire, SeqCst};
18 use std::sync::{Arc, Condvar, Mutex, RwLock};
19 use std::time::Duration;
20 use std::{cmp, io, thread};
21 
22 use super::driver::{Driver, Handle};
23 use super::parker::Parker;
24 use super::queue::{GlobalQueue, LocalQueue, LOCAL_QUEUE_CAP};
25 use super::sleeper::Sleeper;
26 use super::worker::{get_current_ctx, run_worker, Worker};
27 use super::{worker, Schedule};
28 use crate::builder::multi_thread_builder::MultiThreadBuilder;
29 use crate::builder::CallbackHook;
30 use crate::executor::worker::WorkerContext;
31 use crate::fastrand::fast_random;
32 use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType};
33 #[cfg(not(target_os = "macos"))]
34 use crate::util::core_affinity::set_current_affinity;
35 use crate::util::num_cpus::get_cpu_num;
36 
37 const ASYNC_THREAD_QUIT_WAIT_TIME: Duration = Duration::from_secs(3);
38 pub(crate) const GLOBAL_POLL_INTERVAL: u8 = 61;
39 
40 pub(crate) struct MultiThreadScheduler {
41     /// Async pool shutdown state
42     is_cancel: AtomicBool,
43     /// Number of total workers
44     pub(crate) num_workers: usize,
45     /// Join Handles for all threads in the executor
46     handles: RwLock<Vec<Parker>>,
47     /// Used for idle and wakeup logic.
48     pub(crate) sleeper: Sleeper,
49     /// The global queue of the executor
50     pub(crate) global: GlobalQueue,
51     /// A set of all the local queues in the executor
52     locals: Vec<LocalQueue>,
53     pub(crate) handle: Arc<Handle>,
54     #[cfg(feature = "metrics")]
55     steal_times: std::sync::atomic::AtomicU64,
56 }
57 
58 impl Schedule for MultiThreadScheduler {
59     #[inline]
schedule(&self, task: Task, lifo: bool)60     fn schedule(&self, task: Task, lifo: bool) {
61         if self.enqueue(task, lifo) {
62             self.wake_up_rand_one(false);
63         }
64     }
65 }
66 
67 impl MultiThreadScheduler {
new(thread_num: usize, handle: Arc<Handle>) -> Self68     pub(crate) fn new(thread_num: usize, handle: Arc<Handle>) -> Self {
69         let mut locals = Vec::new();
70         for _ in 0..thread_num {
71             locals.push(LocalQueue::new());
72         }
73 
74         Self {
75             is_cancel: AtomicBool::new(false),
76             num_workers: thread_num,
77             handles: RwLock::new(Vec::new()),
78             sleeper: Sleeper::new(thread_num),
79             global: GlobalQueue::new(),
80             locals,
81             handle,
82             #[cfg(feature = "metrics")]
83             steal_times: std::sync::atomic::AtomicU64::new(0),
84         }
85     }
86 
is_cancel(&self) -> bool87     pub(crate) fn is_cancel(&self) -> bool {
88         self.is_cancel.load(Acquire)
89     }
90 
set_cancel(&self)91     pub(crate) fn set_cancel(&self) {
92         self.is_cancel.store(true, SeqCst);
93     }
94 
cancel(&self)95     pub(crate) fn cancel(&self) {
96         self.set_cancel();
97         self.wake_up_all();
98     }
99 
wake_up_all(&self)100     fn wake_up_all(&self) {
101         let join_handle = self.handles.read().unwrap();
102         for item in join_handle.iter() {
103             item.unpark(self.handle.clone());
104         }
105     }
106 
107     #[inline]
is_parked(&self, worker_index: &usize) -> bool108     pub(crate) fn is_parked(&self, worker_index: &usize) -> bool {
109         self.sleeper.is_parked(worker_index)
110     }
111 
is_waked_by_last_search(&self, idx: usize) -> bool112     pub(crate) fn is_waked_by_last_search(&self, idx: usize) -> bool {
113         let mut search_list = self.sleeper.wake_by_search.lock().unwrap();
114         let is_waked_by_last_search = search_list[idx];
115         search_list[idx] = false;
116         if is_waked_by_last_search {
117             self.sleeper.inc_searching_num();
118             return true;
119         }
120         false
121     }
122 
wake_up_rand_one_if_last_search(&self)123     pub(crate) fn wake_up_rand_one_if_last_search(&self) {
124         if self.sleeper.dec_searching_num() {
125             self.wake_up_rand_one(true);
126         }
127     }
128 
wake_up_rand_one(&self, last_search: bool)129     pub(crate) fn wake_up_rand_one(&self, last_search: bool) {
130         if let Some(index) = self.sleeper.pop_worker(last_search) {
131             // index is bounded by total worker num
132             self.handles
133                 .read()
134                 .unwrap()
135                 .get(index)
136                 .unwrap()
137                 .unpark(self.handle.clone());
138         }
139     }
140 
turn_to_sleep(&self, worker_inner: &mut worker::Inner, worker_index: usize)141     pub(crate) fn turn_to_sleep(&self, worker_inner: &mut worker::Inner, worker_index: usize) {
142         let is_last_search = if worker_inner.is_searching {
143             worker_inner.is_searching = false;
144             self.sleeper.dec_searching_num()
145         } else {
146             false
147         };
148         let is_last_active = self.sleeper.push_worker(worker_index);
149 
150         if (is_last_search || is_last_active) && !self.has_no_work() {
151             self.wake_up_rand_one(true);
152         }
153     }
154 
155     #[inline]
turn_from_sleep(&self, worker_index: &usize)156     pub(crate) fn turn_from_sleep(&self, worker_index: &usize) {
157         self.sleeper.pop_worker_by_id(worker_index);
158     }
159 
create_local_queue(&self, index: usize) -> LocalQueue160     pub(crate) fn create_local_queue(&self, index: usize) -> LocalQueue {
161         // this index is bounded by total worker num
162         let local_run_queue = self.locals.get(index).unwrap();
163         LocalQueue {
164             inner: local_run_queue.inner.clone(),
165         }
166     }
167 
has_no_work(&self) -> bool168     pub(crate) fn has_no_work(&self) -> bool {
169         // check if local queues are empty
170         for index in 0..self.num_workers {
171             // this index is bounded by total worker num
172             let item = self.locals.get(index).unwrap();
173             if !item.is_empty() {
174                 return false;
175             }
176         }
177         // then check is global queue empty
178         self.global.is_empty()
179     }
180 
181     // The returned value indicates whether or not to wake up another worker
enqueue_under_ctx(&self, mut task: Task, worker_ctx: &WorkerContext, lifo: bool) -> bool182     fn enqueue_under_ctx(&self, mut task: Task, worker_ctx: &WorkerContext, lifo: bool) -> bool {
183         // if the current context is another runtime, push it to the global queue
184         if !std::ptr::eq(&self.global, &worker_ctx.worker.scheduler.global) {
185             self.global.push_back(task);
186             return true;
187         }
188 
189         if lifo {
190             let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
191             let prev_task = lifo_slot.take();
192             if let Some(prev) = prev_task {
193                 // there is some task in lifo slot, therefore we put the prev task
194                 // into run queue, and put the current task into the lifo slot
195                 *lifo_slot = Some(task);
196                 task = prev;
197             } else {
198                 // there is no task in lifo slot, return immediately
199                 *lifo_slot = Some(task);
200                 return false;
201             }
202         }
203 
204         // this index is bounded by total worker num
205         let local_run_queue = self.locals.get(worker_ctx.worker.index).unwrap();
206         local_run_queue.push_back(task, &self.global);
207         true
208     }
209 
210     // The returned value indicates whether or not to wake up another worker
211     // We need to wake another worker under these circumstances:
212     // 1. The task has been inserted into the global queue
213     // 2. The lifo slot is taken, we push the old task into the local queue
enqueue(&self, task: Task, lifo: bool) -> bool214     pub(crate) fn enqueue(&self, task: Task, lifo: bool) -> bool {
215         let cur_worker = get_current_ctx();
216 
217         // currently we are inside a runtime's context
218         if let Some(worker_ctx) = cur_worker {
219             return self.enqueue_under_ctx(task, worker_ctx, lifo);
220         }
221 
222         // If the local queue of the current worker is full, push the task into the
223         // global queue
224         self.global.push_back(task);
225         true
226     }
227 
228     // gets task from the global queue or the thread's own local queue
get_task_from_queues(&self, worker_inner: &mut worker::Inner) -> Option<Task>229     fn get_task_from_queues(&self, worker_inner: &mut worker::Inner) -> Option<Task> {
230         let count = worker_inner.count;
231         let local_run_queue = &worker_inner.run_queue;
232 
233         // For every 61 times of execution, dequeue a task from the global queue first.
234         // Otherwise, dequeue a task from the local queue. However, if the local queue
235         // has no task, dequeue a task from the global queue instead.
236         if count % GLOBAL_POLL_INTERVAL as u32 == 0 {
237             let mut limit = local_run_queue.remaining() as usize;
238             // If the local queue is empty, multiple tasks are stolen from the global queue
239             // to the local queue. If the local queue has tasks, only dequeue one task from
240             // the global queue and run it.
241             if limit != LOCAL_QUEUE_CAP {
242                 limit = 0;
243             }
244             let task = self
245                 .global
246                 .pop_batch(self.num_workers, local_run_queue, limit);
247             match task {
248                 Some(task) => Some(task),
249                 None => local_run_queue.pop_front(),
250             }
251         } else {
252             let local_task = local_run_queue.pop_front();
253             match local_task {
254                 Some(task) => Some(task),
255                 None => {
256                     let limit = local_run_queue.remaining() as usize;
257                     self.global
258                         .pop_batch(self.num_workers, local_run_queue, limit)
259                 }
260             }
261         }
262     }
263 
get_task_from_searching(&self, worker_inner: &mut worker::Inner) -> Option<Task>264     fn get_task_from_searching(&self, worker_inner: &mut worker::Inner) -> Option<Task> {
265         const STEAL_TIME: usize = 3;
266 
267         // There is no task in the local queue or the global queue, so we try to steal
268         // tasks from another worker's local queue.
269         // The number of stealing workers should be less than half of the total worker
270         // number.
271         // Only increases the searching number only when the worker is not searching
272         if !worker_inner.is_searching && !self.sleeper.try_inc_searching_num() {
273             return None;
274         }
275 
276         worker_inner.is_searching = true;
277 
278         let local_run_queue = &worker_inner.run_queue;
279         for i in 0..STEAL_TIME {
280             if let Some(task) = self.steal(local_run_queue) {
281                 return Some(task);
282             }
283             if i < STEAL_TIME - 1 {
284                 thread::sleep(Duration::from_micros(1));
285             }
286         }
287 
288         None
289     }
290 
dequeue( &self, worker_inner: &mut worker::Inner, worker_ctx: &WorkerContext, ) -> Option<Task>291     pub(crate) fn dequeue(
292         &self,
293         worker_inner: &mut worker::Inner,
294         worker_ctx: &WorkerContext,
295     ) -> Option<Task> {
296         // dequeues from the global queue or the thread's own local queue
297         if let Some(task) = self.get_task_from_queues(worker_inner) {
298             return Some(task);
299         }
300 
301         if let Ok(mut driver) = worker_inner.parker.get_driver().try_lock() {
302             driver.run_once();
303         }
304         worker_ctx.wake_yield();
305         if !worker_inner.run_queue.is_empty() {
306             return None;
307         }
308 
309         self.get_task_from_searching(worker_inner)
310     }
311 
steal(&self, destination: &LocalQueue) -> Option<Task>312     fn steal(&self, destination: &LocalQueue) -> Option<Task> {
313         let num = self.locals.len();
314         let start = (fast_random() >> 56) as usize;
315 
316         for i in 0..num {
317             let i = (start + i) % num;
318             // skip the current worker's local queue
319             // this index is bounded by total worker num
320             let target = self.locals.get(i).unwrap();
321 
322             if std::ptr::eq(target, destination) {
323                 continue;
324             }
325 
326             if let Some(task) = target.steal_into(destination) {
327                 #[cfg(feature = "metrics")]
328                 self.steal_times
329                     .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
330 
331                 return Some(task);
332             }
333         }
334 
335         // if there is no task to steal, we check global queue for tasks
336         self.global.pop_batch(
337             self.num_workers,
338             destination,
339             destination.remaining() as usize,
340         )
341     }
342 
343     cfg_metrics!(
344         pub(crate) fn get_handles(&self) -> &RwLock<Vec<Parker>> {
345             &self.handles
346         }
347 
348         pub(crate) fn get_steal_times(&self) -> u64 {
349             self.steal_times.load(Acquire)
350         }
351     );
352 }
353 
354 #[derive(Clone)]
355 pub(crate) struct AsyncPoolSpawner {
356     pub(crate) inner: Arc<Inner>,
357 
358     pub(crate) exe_mng_info: Arc<MultiThreadScheduler>,
359 }
360 
361 impl Drop for AsyncPoolSpawner {
drop(&mut self)362     fn drop(&mut self) {
363         self.release()
364     }
365 }
366 
367 pub(crate) struct Inner {
368     /// Number of total threads
369     pub(crate) total: usize,
370     /// Core-affinity setting of the threads
371     #[cfg_attr(target_os = "macos", allow(unused))]
372     is_affinity: bool,
373     /// Handle for shutting down the pool
374     shutdown_handle: Arc<(Mutex<usize>, Condvar)>,
375     /// A callback func to be called after thread starts
376     after_start: Option<CallbackHook>,
377     /// A callback func to be called before thread stops
378     before_stop: Option<CallbackHook>,
379     /// Name of the worker threads
380     worker_name: Option<String>,
381     /// Stack size of each thread
382     stack_size: Option<usize>,
383     /// Workers
384     #[cfg(feature = "metrics")]
385     workers: Mutex<Vec<Arc<Worker>>>,
386 }
387 
get_cpu_core() -> usize388 fn get_cpu_core() -> usize {
389     cmp::max(1, get_cpu_num() as usize)
390 }
391 
async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>)392 fn async_thread_proc(inner: Arc<Inner>, worker: Arc<Worker>, handle: Arc<Handle>) {
393     if let Some(f) = inner.after_start.clone() {
394         f();
395     }
396 
397     run_worker(worker, handle);
398     let (lock, cvar) = &*(inner.shutdown_handle.clone());
399     let mut finished = lock.lock().unwrap();
400     *finished += 1;
401 
402     // the last thread wakes up the main thread
403     if *finished >= inner.total {
404         cvar.notify_one();
405     }
406 
407     if let Some(f) = inner.before_stop.clone() {
408         f();
409     }
410 }
411 
412 impl AsyncPoolSpawner {
new(builder: &MultiThreadBuilder) -> io::Result<Self>413     pub(crate) fn new(builder: &MultiThreadBuilder) -> io::Result<Self> {
414         let (handle, driver) = Driver::initialize();
415 
416         let thread_num = builder.core_thread_size.unwrap_or_else(get_cpu_core);
417         let spawner = AsyncPoolSpawner {
418             inner: Arc::new(Inner {
419                 total: thread_num,
420                 is_affinity: builder.common.is_affinity,
421                 shutdown_handle: Arc::new((Mutex::new(0), Condvar::new())),
422                 after_start: builder.common.after_start.clone(),
423                 before_stop: builder.common.before_stop.clone(),
424                 worker_name: builder.common.worker_name.clone(),
425                 stack_size: builder.common.stack_size,
426                 #[cfg(feature = "metrics")]
427                 workers: Mutex::new(Vec::with_capacity(thread_num)),
428             }),
429             exe_mng_info: Arc::new(MultiThreadScheduler::new(thread_num, handle)),
430         };
431         spawner.create_async_thread_pool(driver)?;
432         Ok(spawner)
433     }
434 
create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()>435     fn create_async_thread_pool(&self, driver: Arc<Mutex<Driver>>) -> io::Result<()> {
436         let mut workers = vec![];
437         for index in 0..self.inner.total {
438             let local_queue = self.exe_mng_info.create_local_queue(index);
439             let local_run_queue =
440                 Box::new(worker::Inner::new(local_queue, Parker::new(driver.clone())));
441             workers.push(Arc::new(Worker {
442                 index,
443                 scheduler: self.exe_mng_info.clone(),
444                 inner: RefCell::new(local_run_queue),
445                 lifo: RefCell::new(None),
446                 yielded: RefCell::new(Vec::new()),
447             }))
448         }
449 
450         for (worker_id, worker) in workers.drain(..).enumerate() {
451             let work_arc_handle = self.exe_mng_info.handle.clone();
452             #[cfg(feature = "metrics")]
453             self.inner.workers.lock().unwrap().push(worker.clone());
454             // set up thread attributes
455             let mut builder = thread::Builder::new();
456 
457             if let Some(worker_name) = self.inner.worker_name.clone() {
458                 builder = builder.name(format!("async-{worker_id}-{worker_name}"));
459             } else {
460                 builder = builder.name(format!("async-{worker_id}"));
461             }
462 
463             if let Some(stack_size) = self.inner.stack_size {
464                 builder = builder.stack_size(stack_size);
465             }
466 
467             let parker = worker.inner.borrow().parker.clone();
468             self.exe_mng_info.handles.write().unwrap().push(parker);
469 
470             let inner = self.inner.clone();
471 
472             #[cfg(not(target_os = "macos"))]
473             if self.inner.is_affinity {
474                 builder.spawn(move || {
475                     let cpu_core_num = get_cpu_core();
476                     let cpu_id = worker_id % cpu_core_num;
477                     let _ = set_current_affinity(cpu_id);
478                     async_thread_proc(inner, worker, work_arc_handle);
479                 })?;
480             } else {
481                 builder.spawn(move || {
482                     async_thread_proc(inner, worker, work_arc_handle);
483                 })?;
484             }
485 
486             #[cfg(target_os = "macos")]
487             builder.spawn(move || {
488                 async_thread_proc(inner, worker, work_arc_handle);
489             })?;
490         }
491         Ok(())
492     }
493 
spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,494     pub(crate) fn spawn<T>(&self, builder: &TaskBuilder, task: T) -> JoinHandle<T::Output>
495     where
496         T: Future + Send + 'static,
497         T::Output: Send + 'static,
498     {
499         let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
500         let (task, join_handle) =
501             Task::create_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
502 
503         self.exe_mng_info.schedule(task, false);
504         join_handle
505     }
506 
507     /// # Safety
508     /// Users need to guarantee that the future will remember lifetime and thus
509     /// compiler will capture lifetime issues, or the future will complete
510     /// when its context remains valid. If not, currently
511     /// runtime initialization will cause memory error.
512     ///
513     /// ## Memory issue example
514     /// No matter using which type (current / multi thread) of runtime, the
515     /// following code can compile. When the variable `slice` gets released
516     /// when the function ends, any handles returned from this function rely
517     /// on a dangled pointer.
518     ///
519     /// ```no run
520     ///  fn err_example(runtime: &Runtime) -> JoinHandle<()> {
521     ///     let builder = TaskBuilder::default();
522     ///     let mut slice = [1, 2, 3, 4, 5];
523     ///     let borrow = &mut slice;
524     ///     match &runtime.async_spawner {
525     ///         AsyncHandle::CurrentThread(pool) => {
526     ///             pool.spawn_with_ref(
527     ///                 &builder,
528     ///                 async { borrow.iter_mut().for_each(|x| *x *= 2) }
529     ///             )
530     ///        }
531     ///        AsyncHandle::MultiThread(pool) => {
532     ///             pool.spawn_with_ref(
533     ///                 &builder,
534     ///                 async { borrow.iter_mut().for_each(|x| *x *= 2) }
535     ///             )
536     ///        }
537     ///     }
538     /// }
539     ///
540     /// let runtime = Runtime::new().unwrap();
541     /// let handle = spawn_blocking(
542     ///     move || block_on(err_example(&runtime)).unwrap()
543     /// );
544     /// ```
spawn_with_ref<T>( &self, builder: &TaskBuilder, task: T, ) -> JoinHandle<T::Output> where T: Future + Send, T::Output: Send,545     pub(crate) unsafe fn spawn_with_ref<T>(
546         &self,
547         builder: &TaskBuilder,
548         task: T,
549     ) -> JoinHandle<T::Output>
550     where
551         T: Future + Send,
552         T::Output: Send,
553     {
554         let exe_scheduler = Arc::downgrade(&self.exe_mng_info);
555         let raw_task = Task::create_raw_task(builder, exe_scheduler, task, VirtualTableType::Ylong);
556         let handle = JoinHandle::new(raw_task);
557         let task = Task(raw_task);
558         self.exe_mng_info.schedule(task, false);
559         handle
560     }
561 
562     /// Waits 3 seconds for threads to finish before releasing the async pool.
563     /// If threads could not finish before releasing, there could be possible
564     /// memory leak.
release_wait(&self) -> Result<(), ()>565     fn release_wait(&self) -> Result<(), ()> {
566         self.exe_mng_info.cancel();
567         let pair = self.inner.shutdown_handle.clone();
568         let total = self.inner.total;
569         let (lock, cvar) = &*pair;
570         let finished = lock.lock().unwrap();
571         let res = cvar
572             .wait_timeout_while(finished, ASYNC_THREAD_QUIT_WAIT_TIME, |&mut finished| {
573                 finished < total
574             })
575             .unwrap();
576         // if time limit has been reached, the unfinished threads would not get released
577         if res.1.timed_out() {
578             Err(())
579         } else {
580             Ok(())
581         }
582     }
583 
release(&self)584     pub(crate) fn release(&self) {
585         if let Ok(()) = self.release_wait() {
586             let mut join_handle = self.exe_mng_info.handles.write().unwrap();
587             #[allow(clippy::mem_replace_with_default)]
588             let mut worker_handles = std::mem::replace(join_handle.as_mut(), vec![]);
589             drop(join_handle);
590             for parker in worker_handles.drain(..) {
591                 parker.release();
592             }
593         }
594     }
595 
596     #[cfg(feature = "metrics")]
get_worker(&self, index: usize) -> Result<Arc<Worker>, ()>597     pub(crate) fn get_worker(&self, index: usize) -> Result<Arc<Worker>, ()> {
598         let vec = self.inner.workers.lock().unwrap();
599         for worker in vec.iter() {
600             if worker.index == index {
601                 return Ok(worker.clone());
602             }
603         }
604         Err(())
605     }
606 }
607 
608 #[cfg(test)]
609 pub(crate) mod test {
610     use std::future::Future;
611     use std::pin::Pin;
612     use std::sync::atomic::Ordering::{Acquire, Release};
613     use std::sync::mpsc::channel;
614     use std::sync::{Arc, Mutex};
615     use std::task::{Context, Poll};
616     use std::thread;
617 
618     use crate::builder::RuntimeBuilder;
619     use crate::executor::async_pool::{get_cpu_core, AsyncPoolSpawner, MultiThreadScheduler};
620     use crate::executor::driver::Driver;
621     use crate::executor::parker::Parker;
622     use crate::executor::queue::LocalQueue;
623     use crate::executor::{worker, Schedule};
624     use crate::task::{JoinHandle, Task, TaskBuilder, VirtualTableType};
625 
626     pub struct TestFuture {
627         value: usize,
628         total: usize,
629     }
630 
create_new() -> TestFuture631     pub fn create_new() -> TestFuture {
632         TestFuture {
633             value: 0,
634             total: 1000,
635         }
636     }
637 
638     impl Future for TestFuture {
639         type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>640         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
641             if self.total > self.value {
642                 self.get_mut().value += 1;
643                 cx.waker().wake_by_ref();
644                 Poll::Pending
645             } else {
646                 Poll::Ready(self.total)
647             }
648         }
649     }
650 
test_future() -> usize651     async fn test_future() -> usize {
652         create_new().await
653     }
654 
655     /// UT test cases for ExecutorMngInfo::new()
656     ///
657     /// # Brief
658     /// 1. Creates a ExecutorMsgInfo with thread number 1
659     /// 2. Creates a ExecutorMsgInfo with thread number 2
660     #[test]
ut_executor_mng_info_new_001()661     fn ut_executor_mng_info_new_001() {
662         let (arc_handle, _) = Driver::initialize();
663         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
664         assert!(!executor_mng_info.is_cancel.load(Acquire));
665         assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
666 
667         let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
668         assert!(!executor_mng_info.is_cancel.load(Acquire));
669         assert_eq!(executor_mng_info.handles.read().unwrap().capacity(), 0);
670     }
671 
672     /// UT test cases for ExecutorMngInfo::create_local_queues()
673     ///
674     /// # Brief
675     /// 1. index set to 0, check the return value
676     /// 2. index set to ExecutorMngInfo.inner.total, check the return value
677     #[test]
ut_executor_mng_info_create_local_queues()678     fn ut_executor_mng_info_create_local_queues() {
679         let (arc_handle, _) = Driver::initialize();
680         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
681         let local_run_queue_info = executor_mng_info.create_local_queue(0);
682         assert!(local_run_queue_info.is_empty());
683 
684         let executor_mng_info = MultiThreadScheduler::new(64, arc_handle);
685         let local_run_queue_info = executor_mng_info.create_local_queue(63);
686         assert!(local_run_queue_info.is_empty());
687     }
688 
create_task<T, S>( builder: &TaskBuilder, scheduler: std::sync::Weak<S>, task: T, virtual_table_type: VirtualTableType, ) -> (Task, JoinHandle<T::Output>) where T: Future + Send + 'static, T::Output: Send + 'static, S: Schedule,689     pub(crate) fn create_task<T, S>(
690         builder: &TaskBuilder,
691         scheduler: std::sync::Weak<S>,
692         task: T,
693         virtual_table_type: VirtualTableType,
694     ) -> (Task, JoinHandle<T::Output>)
695     where
696         T: Future + Send + 'static,
697         T::Output: Send + 'static,
698         S: Schedule,
699     {
700         let (task, handle) = Task::create_task(builder, scheduler, task, virtual_table_type);
701         (task, handle)
702     }
703 
704     /// UT test cases for ExecutorMngInfo::enqueue()
705     ///
706     /// # Brief
707     /// 1. index set to 0, check the return value
708     /// 2. index set to ExecutorMngInfo.inner.total, check the return value
709     #[test]
ut_executor_mng_info_enqueue()710     fn ut_executor_mng_info_enqueue() {
711         let (arc_handle, _) = Driver::initialize();
712         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
713 
714         let builder = TaskBuilder::new();
715         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
716         let (task, _) = create_task(
717             &builder,
718             exe_scheduler,
719             test_future(),
720             VirtualTableType::Ylong,
721         );
722 
723         executor_mng_info.enqueue(task, true);
724         assert!(!executor_mng_info.has_no_work());
725     }
726 
727     /// UT test cases for ExecutorMngInfo::is_cancel()
728     ///
729     /// # Brief
730     /// 1. The is_cancel value is set to true to check the return value
731     /// 2. The is_cancel value is set to false to check the return value
732     #[test]
ut_executor_mng_info_is_cancel()733     fn ut_executor_mng_info_is_cancel() {
734         let (arc_handle, _) = Driver::initialize();
735         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
736         executor_mng_info.is_cancel.store(false, Release);
737         assert!(!executor_mng_info.is_cancel());
738         executor_mng_info.is_cancel.store(true, Release);
739         assert!(executor_mng_info.is_cancel());
740     }
741 
742     /// UT test cases for ExecutorMngInfo::set_cancel()
743     ///
744     /// # Brief
745     /// 1. Check if the is_cancel parameter becomes true after set_cancel
746     #[test]
ut_executor_mng_info_set_cancel()747     fn ut_executor_mng_info_set_cancel() {
748         let (arc_handle, _) = Driver::initialize();
749         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
750         assert!(!executor_mng_info.is_cancel.load(Acquire));
751         executor_mng_info.set_cancel();
752         assert!(executor_mng_info.is_cancel.load(Acquire));
753     }
754 
755     /// UT test cases for ExecutorMngInfo::cancel()
756     ///
757     /// # Brief
758     /// 1. Check if the is_cancel parameter becomes true after set_cancel
759     #[test]
ut_executor_mng_info_cancel()760     fn ut_executor_mng_info_cancel() {
761         let (arc_handle, arc_driver) = Driver::initialize();
762         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
763 
764         let flag = Arc::new(Mutex::new(0));
765         let (tx, rx) = channel();
766 
767         let (flag_clone, tx) = (flag.clone(), tx);
768 
769         let mut parker = Parker::new(arc_driver);
770         let parker_cpy = parker.clone();
771         let _ = thread::spawn(move || {
772             parker.park();
773             *flag_clone.lock().unwrap() = 1;
774             tx.send(()).unwrap()
775         });
776         executor_mng_info.handles.write().unwrap().push(parker_cpy);
777 
778         executor_mng_info.cancel();
779         rx.recv().unwrap();
780         assert_eq!(*flag.lock().unwrap(), 1);
781     }
782 
783     /// UT test cases for ExecutorMngInfo::wake_up_all()
784     ///
785     /// # Brief
786     /// 1. Constructs an environment to check if all threads are woken up and
787     ///    executed via thread hooks.
788     #[test]
ut_executor_mng_info_wake_up_all()789     fn ut_executor_mng_info_wake_up_all() {
790         let (arc_handle, arc_driver) = Driver::initialize();
791         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
792 
793         let flag = Arc::new(Mutex::new(0));
794         let (tx, rx) = channel();
795 
796         let (flag_clone, tx) = (flag.clone(), tx);
797 
798         let mut parker = Parker::new(arc_driver);
799         let parker_cpy = parker.clone();
800 
801         let _ = thread::spawn(move || {
802             parker.park();
803             *flag_clone.lock().unwrap() = 1;
804             tx.send(()).unwrap()
805         });
806 
807         executor_mng_info.handles.write().unwrap().push(parker_cpy);
808 
809         executor_mng_info.wake_up_all();
810         rx.recv().unwrap();
811         assert_eq!(*flag.lock().unwrap(), 1);
812     }
813 
814     /// UT test cases for ExecutorMngInfo::wake_up_rand_one()
815     ///
816     /// # Brief
817     /// 1. Constructs an environment to check if a thread is woken up and
818     ///    executed by a thread hook.
819     #[test]
ut_executor_mng_info_wake_up_rand_one()820     fn ut_executor_mng_info_wake_up_rand_one() {
821         let (arc_handle, arc_driver) = Driver::initialize();
822         let mut parker = Parker::new(arc_driver);
823         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle);
824         let local_queue = LocalQueue {
825             inner: executor_mng_info.locals[0].inner.clone(),
826         };
827         let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
828         worker_inner.is_searching = true;
829         executor_mng_info.sleeper.inc_searching_num();
830         executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
831 
832         let flag = Arc::new(Mutex::new(0));
833         let (tx, rx) = channel();
834 
835         let (flag_clone, tx) = (flag.clone(), tx);
836         let parker_cpy = parker.clone();
837 
838         let _ = thread::spawn(move || {
839             parker.park();
840             *flag_clone.lock().unwrap() = 1;
841             tx.send(()).unwrap()
842         });
843 
844         executor_mng_info.handles.write().unwrap().push(parker_cpy);
845         executor_mng_info.wake_up_rand_one(false);
846         rx.recv().unwrap();
847         assert_eq!(*flag.lock().unwrap(), 1);
848         assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
849     }
850 
851     /// UT test cases for ExecutorMngInfo::wake_up_if_one_task_left()
852     ///
853     /// # Brief
854     /// 1. Constructs the environment, checks if there are still tasks, and if
855     ///    so, wakes up a thread to continue working.
856     #[test]
ut_executor_mng_info_wake_up_if_one_task_left()857     fn ut_executor_mng_info_wake_up_if_one_task_left() {
858         let (arc_handle, arc_driver) = Driver::initialize();
859         let mut parker = Parker::new(arc_driver);
860         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
861 
862         let local_queue = LocalQueue {
863             inner: executor_mng_info.locals[0].inner.clone(),
864         };
865         let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
866         executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
867 
868         let flag = Arc::new(Mutex::new(0));
869         let (tx, rx) = channel();
870 
871         let (flag_clone, tx) = (flag.clone(), tx);
872         let parker_cpy = parker.clone();
873 
874         let _ = thread::spawn(move || {
875             parker.park();
876             *flag_clone.lock().unwrap() = 1;
877             tx.send(()).unwrap()
878         });
879 
880         executor_mng_info.handles.write().unwrap().push(parker_cpy);
881 
882         let builder = TaskBuilder::new();
883         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
884         let (task, _) = create_task(
885             &builder,
886             exe_scheduler,
887             test_future(),
888             VirtualTableType::Ylong,
889         );
890 
891         executor_mng_info.enqueue(task, true);
892 
893         if !executor_mng_info.has_no_work() {
894             executor_mng_info.wake_up_rand_one(false);
895         }
896 
897         rx.recv().unwrap();
898         assert_eq!(*flag.lock().unwrap(), 1);
899         assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
900     }
901 
902     /// UT test cases for ExecutorMngInfo::from_woken_to_sleep()
903     ///
904     /// # Brief
905     ///  1. Construct the environment and set the state of the specified thread
906     ///     to park state. If the last thread is in park state, check whether
907     ///     there is a task, and if so, wake up this thread.
908     #[test]
ut_executor_mng_info_from_woken_to_sleep()909     fn ut_executor_mng_info_from_woken_to_sleep() {
910         let (arc_handle, arc_driver) = Driver::initialize();
911         let executor_mng_info = MultiThreadScheduler::new(1, arc_handle.clone());
912 
913         let flag = Arc::new(Mutex::new(0));
914         let (tx, rx) = channel();
915         let (flag_clone, tx) = (flag.clone(), tx);
916 
917         let mut parker = Parker::new(arc_driver);
918 
919         let local_queue = LocalQueue {
920             inner: executor_mng_info.locals[0].inner.clone(),
921         };
922         let mut worker_inner = worker::Inner::new(local_queue, parker.clone());
923         worker_inner.is_searching = true;
924         executor_mng_info.sleeper.inc_searching_num();
925 
926         let parker_cpy = parker.clone();
927 
928         let _ = thread::spawn(move || {
929             parker.park();
930             *flag_clone.lock().unwrap() = 1;
931             tx.send(()).unwrap()
932         });
933 
934         executor_mng_info.handles.write().unwrap().push(parker_cpy);
935 
936         let builder = TaskBuilder::new();
937         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
938         let (task, _) = create_task(
939             &builder,
940             exe_scheduler,
941             test_future(),
942             VirtualTableType::Ylong,
943         );
944 
945         executor_mng_info.enqueue(task, true);
946         executor_mng_info.turn_to_sleep(&mut worker_inner, 0);
947         rx.recv().unwrap();
948         assert_eq!(*flag.lock().unwrap(), 1);
949         assert_eq!(executor_mng_info.sleeper.pop_worker(false), None);
950     }
951 
952     /// UT test cases for AsyncPoolSpawner::new()
953     ///
954     /// # Brief
955     /// 1. Verify the parameters of the initialization completion
956     #[test]
ut_async_pool_spawner_new()957     fn ut_async_pool_spawner_new() {
958         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
959         let async_pool_spawner = AsyncPoolSpawner::new(&thread_pool_builder).unwrap();
960         assert_eq!(
961             async_pool_spawner.inner.total,
962             thread_pool_builder
963                 .core_thread_size
964                 .unwrap_or_else(get_cpu_core)
965         );
966         assert_eq!(
967             async_pool_spawner.inner.worker_name,
968             thread_pool_builder.common.worker_name
969         );
970         assert_eq!(
971             async_pool_spawner.inner.stack_size,
972             thread_pool_builder.common.stack_size
973         );
974         assert!(!async_pool_spawner.exe_mng_info.is_cancel.load(Acquire));
975     }
976 
977     /// UT test cases for `create_async_thread_pool`.
978     ///
979     /// # Brief
980     /// 1. Create an async_pool_spawner with `is_affinity` setting to false
981     /// 2. Call create_async_thread_pool()
982     /// 3. This UT should not panic
983     #[test]
ut_async_pool_spawner_create_async_thread_pool_001()984     fn ut_async_pool_spawner_create_async_thread_pool_001() {
985         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
986         let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(false)).unwrap();
987     }
988 
989     /// UT test cases for `UnboundedSender`.
990     ///
991     /// # Brief
992     /// 1. Create an async_pool_spawner with `is_affinity` setting to true
993     /// 2. Call create_async_thread_pool()
994     /// 3. This UT should not panic
995     #[test]
ut_async_pool_spawner_create_async_thread_pool_002()996     fn ut_async_pool_spawner_create_async_thread_pool_002() {
997         let thread_pool_builder = RuntimeBuilder::new_multi_thread();
998         let _ = AsyncPoolSpawner::new(&thread_pool_builder.is_affinity(true)).unwrap();
999     }
1000 }
1001