1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::UnsafeCell;
15 use std::mem::MaybeUninit;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicU64;
18 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
19 use std::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
20 use std::sync::{Arc, Mutex};
21 use std::{cmp, ptr};
22 
23 /// Schedule strategy implementation, includes FIFO LIFO priority and
24 /// work-stealing work-stealing strategy include stealing half of every worker
25 /// or the largest amount of worker
26 use crate::task::{Header, Task};
27 use crate::util::linked_list::LinkedList;
28 
non_atomic_load(data: &AtomicU16) -> u1629 unsafe fn non_atomic_load(data: &AtomicU16) -> u16 {
30     ptr::read((data as *const AtomicU16).cast::<u16>())
31 }
32 
33 /// Capacity of the local queue
34 pub(crate) const LOCAL_QUEUE_CAP: usize = 256;
35 const MASK: u16 = LOCAL_QUEUE_CAP as u16 - 1;
36 
37 /// Local queue of the worker
38 pub(crate) struct LocalQueue {
39     pub(crate) inner: Arc<InnerBuffer>,
40 }
41 
42 unsafe impl Send for LocalQueue {}
43 unsafe impl Sync for LocalQueue {}
44 
45 unsafe impl Send for InnerBuffer {}
46 unsafe impl Sync for InnerBuffer {}
47 
48 impl LocalQueue {
new() -> Self49     pub(crate) fn new() -> Self {
50         LocalQueue {
51             inner: Arc::new(InnerBuffer::new(LOCAL_QUEUE_CAP as u16)),
52         }
53     }
54 
is_half_full(&self, rear: u16) -> bool55     fn is_half_full(&self, rear: u16) -> bool {
56         let (steal_pos, _) = unwrap(self.inner.front.load(Acquire));
57         if rear.wrapping_sub(steal_pos) > LOCAL_QUEUE_CAP as u16 / 2 {
58             return true;
59         }
60         false
61     }
62 }
63 
64 #[inline]
unwrap(num: u32) -> (u16, u16)65 fn unwrap(num: u32) -> (u16, u16) {
66     let head_pos = num & u16::MAX as u32;
67     let steal_pos = num >> 16;
68     (steal_pos as u16, head_pos as u16)
69 }
70 
71 #[inline]
wrap(steal_pos: u16, head_pos: u16) -> u3272 fn wrap(steal_pos: u16, head_pos: u16) -> u32 {
73     (head_pos as u32) | ((steal_pos as u32) << 16)
74 }
75 
76 impl LocalQueue {
77     #[inline]
pop_front(&self) -> Option<Task>78     pub(crate) fn pop_front(&self) -> Option<Task> {
79         self.inner.pop_front()
80     }
81 
82     #[inline]
push_back(&self, task: Task, global: &GlobalQueue)83     pub(crate) fn push_back(&self, task: Task, global: &GlobalQueue) {
84         self.inner.push_back(task, global);
85     }
86 
87     #[inline]
steal_into(&self, dst: &LocalQueue) -> Option<Task>88     pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
89         self.inner.steal_into(dst)
90     }
91 
92     #[inline]
is_empty(&self) -> bool93     pub(crate) fn is_empty(&self) -> bool {
94         self.inner.is_empty()
95     }
96 
97     #[inline]
remaining(&self) -> u1698     pub(crate) fn remaining(&self) -> u16 {
99         self.inner.remaining()
100     }
101 }
102 
103 #[cfg(feature = "metrics")]
104 impl LocalQueue {
105     #[inline]
len(&self) -> u16106     pub(crate) fn len(&self) -> u16 {
107         self.inner.len()
108     }
109 
110     #[inline]
count(&self) -> u64111     pub(crate) fn count(&self) -> u64 {
112         self.inner.count()
113     }
114 
115     #[inline]
task_from_global_count(&self) -> u64116     pub(crate) fn task_from_global_count(&self) -> u64 {
117         self.inner.task_from_global_count()
118     }
119 
120     #[inline]
task_to_global_count(&self) -> u64121     pub(crate) fn task_to_global_count(&self) -> u64 {
122         self.inner.task_to_global_count()
123     }
124 }
125 
126 pub(crate) struct InnerBuffer {
127     /// Front stores the position of both head and steal
128     front: AtomicU32,
129     rear: AtomicU16,
130     cap: u16,
131     buffer: Box<[UnsafeCell<MaybeUninit<Task>>]>,
132     #[cfg(feature = "metrics")]
133     metrics: InnerBufferMetrics,
134 }
135 
136 /// Metrics of InnerBuffer
137 #[cfg(feature = "metrics")]
138 struct InnerBufferMetrics {
139     /// The total number of task which has entered this LocalQueue
140     count: AtomicU64,
141     /// The total number of task which has entered this LocalQueue from
142     /// GlobalQueue
143     task_from_global_count: AtomicU64,
144     /// The total number of task which has entered GlobalQueue from this
145     /// LocalQueue
146     task_to_global_count: AtomicU64,
147 }
148 
149 #[cfg(feature = "metrics")]
150 impl InnerBuffer {
151     /// Return queue's len.
len(&self) -> u16152     fn len(&self) -> u16 {
153         let rear = self.rear.load(Acquire);
154         let (_, head) = unwrap(self.front.load(Acquire));
155         rear.wrapping_sub(head)
156     }
157 
158     /// Returns the total number of task which has entered this LocalQueue
count(&self) -> u64159     fn count(&self) -> u64 {
160         self.metrics.count.load(Acquire)
161     }
162 
163     /// Returns the total number of task which has entered this LocalQueue from
164     /// GlobalQueue
task_from_global_count(&self) -> u64165     fn task_from_global_count(&self) -> u64 {
166         self.metrics.task_from_global_count.load(Acquire)
167     }
168 
169     /// Returns the total number of task which has entered GlobalQueue from this
170     /// LocalQueue
task_to_global_count(&self) -> u64171     fn task_to_global_count(&self) -> u64 {
172         self.metrics.task_to_global_count.load(Acquire)
173     }
174 }
175 
176 impl InnerBuffer {
new(cap: u16) -> Self177     fn new(cap: u16) -> Self {
178         let mut buffer = Vec::with_capacity(cap as usize);
179 
180         for _ in 0..cap {
181             buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
182         }
183         InnerBuffer {
184             front: AtomicU32::new(0),
185             rear: AtomicU16::new(0),
186             cap,
187             buffer: buffer.into(),
188             #[cfg(feature = "metrics")]
189             metrics: InnerBufferMetrics {
190                 count: AtomicU64::new(0),
191                 task_from_global_count: AtomicU64::new(0),
192                 task_to_global_count: AtomicU64::new(0),
193             },
194         }
195     }
196 
197     /// Checks whether the queue is empty
is_empty(&self) -> bool198     fn is_empty(&self) -> bool {
199         let (_, head) = unwrap(self.front.load(Acquire));
200         let rear = self.rear.load(Acquire);
201         head == rear
202     }
203 
pop_front(&self) -> Option<Task>204     pub(crate) fn pop_front(&self) -> Option<Task> {
205         let mut head = self.front.load(Acquire);
206 
207         let pos = loop {
208             let (steal_pos, real_pos) = unwrap(head);
209 
210             // it's a spmc queue, so the queue could read its own tail non-atomically
211             let tail_pos = unsafe { non_atomic_load(&self.rear) };
212 
213             // return none if the queue is empty
214             if real_pos == tail_pos {
215                 return None;
216             }
217 
218             let next_real = real_pos.wrapping_add(1);
219             let next = if steal_pos == real_pos {
220                 wrap(next_real, next_real)
221             } else {
222                 wrap(steal_pos, next_real)
223             };
224 
225             let res = self.front.compare_exchange(head, next, AcqRel, Acquire);
226             match res {
227                 Ok(_) => break real_pos,
228                 Err(actual) => head = actual,
229             }
230         };
231 
232         let task = self.buffer[(pos & MASK) as usize].get();
233 
234         Some(unsafe { ptr::read(task).assume_init() })
235     }
236 
remaining(&self) -> u16237     pub(crate) fn remaining(&self) -> u16 {
238         let front = self.front.load(Acquire);
239 
240         let (steal_pos, _real_pos) = unwrap(front);
241         // it's a spmc queue, so the queue could read its own tail non-atomically
242         let rear = unsafe { non_atomic_load(&self.rear) };
243 
244         self.cap - (rear.wrapping_sub(steal_pos))
245     }
246 
sync_steal_pos(&self, mut prev: u32)247     fn sync_steal_pos(&self, mut prev: u32) {
248         loop {
249             let (_front_steal, front_real) = unwrap(prev);
250             let next = wrap(front_real, front_real);
251             let res = self.front.compare_exchange(prev, next, AcqRel, Acquire);
252 
253             if let Err(actual) = res {
254                 let (actual_steal_pos, actual_real_pos) = unwrap(actual);
255                 assert_ne!(
256                     actual_steal_pos, actual_real_pos,
257                     "steal pos: {}, real_pos: {}, they should not be the same",
258                     actual_steal_pos, actual_real_pos
259                 );
260                 prev = actual;
261             } else {
262                 return;
263             }
264         }
265     }
266 
push_back(&self, mut task: Task, global: &GlobalQueue)267     pub(crate) fn push_back(&self, mut task: Task, global: &GlobalQueue) {
268         loop {
269             let front = self.front.load(Acquire);
270 
271             let (steal_pos, _) = unwrap(front);
272             // it's a spmc queue, so the queue could read its own tail non-atomically
273             let rear = unsafe { non_atomic_load(&self.rear) };
274 
275             // if the local queue is full, push the task into the global queue
276             if rear.wrapping_sub(steal_pos) < self.cap {
277                 let idx = (rear & MASK) as usize;
278                 let ptr = self.buffer[idx].get();
279                 unsafe {
280                     ptr::write((*ptr).as_mut_ptr(), task);
281                 }
282                 self.rear.store(rear.wrapping_add(1), SeqCst);
283                 #[cfg(feature = "metrics")]
284                 self.metrics.count.fetch_add(1, AcqRel);
285                 return;
286             } else {
287                 match self.push_overflowed(task, global, steal_pos) {
288                     Ok(_) => return,
289                     Err(ret) => task = ret,
290                 }
291             }
292         }
293     }
294 
295     #[allow(unused_assignments)]
push_overflowed( &self, task: Task, global: &GlobalQueue, front: u16, ) -> Result<(), Task>296     pub(crate) fn push_overflowed(
297         &self,
298         task: Task,
299         global: &GlobalQueue,
300         front: u16,
301     ) -> Result<(), Task> {
302         // get the number of tasks the worker has stolen
303         let count = LOCAL_QUEUE_CAP / 2;
304         let prev = wrap(front, front);
305         let next = wrap(front, front.wrapping_add(count as u16));
306 
307         match self.front.compare_exchange(prev, next, Release, Acquire) {
308             Ok(_) => {}
309             Err(_) => return Err(task),
310         }
311 
312         let (mut src_front_steal, _src_front_real) = unwrap(prev);
313 
314         let mut tmp_buf = Vec::with_capacity(count);
315         for _ in 0..count {
316             tmp_buf.push(UnsafeCell::new(MaybeUninit::uninit()));
317         }
318 
319         for dst_ptr in tmp_buf.iter().take(count) {
320             let src_idx = (src_front_steal & MASK) as usize;
321             let task_ptr = self.buffer[src_idx].get();
322             let task = unsafe { ptr::read(task_ptr).assume_init() };
323             unsafe {
324                 ptr::write((*dst_ptr.get()).as_mut_ptr(), task);
325             }
326             src_front_steal = src_front_steal.wrapping_add(1);
327         }
328 
329         self.sync_steal_pos(next);
330 
331         #[cfg(feature = "metrics")]
332         self.metrics
333             .task_to_global_count
334             .fetch_add(tmp_buf.len() as u64 + 1, AcqRel);
335 
336         global.push_batch(tmp_buf, task);
337 
338         Ok(())
339     }
340 
steal_into(&self, dst: &LocalQueue) -> Option<Task>341     pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
342         // it's a spmc queue, so the queue could read its own tail non-atomically
343         let mut dst_rear = unsafe { non_atomic_load(&dst.inner.rear) };
344         if dst.is_half_full(dst_rear) {
345             return None;
346         }
347 
348         let mut src_next_front;
349         let mut src_prev_front = self.front.load(Acquire);
350 
351         // get the number of tasks the worker has stolen
352         let mut count = loop {
353             let (src_front_steal, src_front_real) = unwrap(src_prev_front);
354 
355             // if these two values are not equal, it means another worker has stolen from
356             // this queue, therefore abort this steal.
357             if src_front_steal != src_front_real {
358                 return None;
359             };
360 
361             let src_rear = self.rear.load(Acquire);
362 
363             // steal half of the tasks from the queue
364             let mut n = src_rear.wrapping_sub(src_front_real);
365             n = n - n / 2;
366             if n == 0 {
367                 return None;
368             }
369 
370             let src_steal_to = src_front_real.wrapping_add(n);
371             src_next_front = wrap(src_front_steal, src_steal_to);
372 
373             let res = self
374                 .front
375                 .compare_exchange(src_prev_front, src_next_front, AcqRel, Acquire);
376             match res {
377                 Ok(_) => break n,
378                 Err(actual) => src_prev_front = actual,
379             }
380         };
381 
382         // transfer the tasks
383         let (mut src_front_steal, _src_front_real) = unwrap(src_next_front);
384         count -= 1;
385         for _ in 0..count {
386             let src_idx = (src_front_steal & MASK) as usize;
387             let des_idx = (dst_rear & MASK) as usize;
388 
389             let task_ptr = self.buffer[src_idx].get();
390 
391             let task = unsafe { ptr::read(task_ptr).assume_init() };
392             let ptr = dst.inner.buffer[des_idx].get();
393 
394             unsafe {
395                 ptr::write((*ptr).as_mut_ptr(), task);
396             }
397             src_front_steal = src_front_steal.wrapping_add(1);
398             dst_rear = dst_rear.wrapping_add(1);
399         }
400 
401         let src_idx = (src_front_steal & MASK) as usize;
402 
403         let task_ptr = self.buffer[src_idx].get();
404         let task = unsafe { ptr::read(task_ptr).assume_init() };
405         if count != 0 {
406             dst.inner.rear.store(dst_rear, SeqCst);
407         }
408 
409         self.sync_steal_pos(src_next_front);
410 
411         Some(task)
412     }
413 }
414 
415 impl Drop for InnerBuffer {
drop(&mut self)416     fn drop(&mut self) {
417         let mut head = self.pop_front();
418         while let Some(task) = head {
419             task.shutdown();
420             head = self.pop_front();
421         }
422     }
423 }
424 
425 pub(crate) struct GlobalQueue {
426     /// Current number of tasks
427     len: AtomicUsize,
428     /// The total number of tasks which has entered global queue.
429     #[cfg(feature = "metrics")]
430     count: AtomicU64,
431     globals: Mutex<LinkedList<Header>>,
432 }
433 
434 impl Drop for GlobalQueue {
drop(&mut self)435     fn drop(&mut self) {
436         while !self.is_empty() {
437             // we just check the queue is not empty
438             let task = self.pop_front().unwrap();
439             task.shutdown();
440         }
441     }
442 }
443 
444 impl GlobalQueue {
new() -> Self445     pub(crate) fn new() -> Self {
446         GlobalQueue {
447             len: AtomicUsize::new(0_usize),
448             #[cfg(feature = "metrics")]
449             count: AtomicU64::new(0),
450             globals: Mutex::new(LinkedList::new()),
451         }
452     }
is_empty(&self) -> bool453     pub(super) fn is_empty(&self) -> bool {
454         self.len.load(Acquire) == 0
455     }
456 
push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task)457     pub(super) fn push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task) {
458         let mut list = self.globals.lock().unwrap();
459         let len = tasks.len() + 1;
460         for task_ptr in tasks {
461             let task = unsafe { ptr::read(task_ptr.get()).assume_init() };
462             list.push_front(task.into_header());
463         }
464         list.push_front(task.into_header());
465         self.len.fetch_add(len, AcqRel);
466         #[cfg(feature = "metrics")]
467         self.count.fetch_add(len as u64, AcqRel);
468     }
469 
pop_batch( &self, worker_num: usize, queue: &LocalQueue, limit: usize, ) -> Option<Task>470     pub(super) fn pop_batch(
471         &self,
472         worker_num: usize,
473         queue: &LocalQueue,
474         limit: usize,
475     ) -> Option<Task> {
476         let len = self.len.load(Acquire);
477         let num = cmp::min(len / worker_num, limit);
478 
479         let inner_buf = &queue.inner;
480         // it's a spmc queue, so the queue could read its own tail non-atomically
481         let rear = unsafe { non_atomic_load(&inner_buf.rear) };
482         let mut curr = rear;
483 
484         let mut list = self.globals.lock().unwrap();
485         let first_task = unsafe { Task::from_raw(list.pop_back()?) };
486 
487         let mut count = 1;
488 
489         for _ in 1..num {
490             if let Some(task) = list.pop_back() {
491                 let idx = (curr & MASK) as usize;
492                 let ptr = inner_buf.buffer[idx].get();
493                 unsafe {
494                     ptr::write((*ptr).as_mut_ptr(), Task::from_raw(task));
495                 }
496                 curr = curr.wrapping_add(1);
497                 count += 1;
498             } else {
499                 break;
500             }
501         }
502         drop(list);
503         self.len.fetch_sub(count, AcqRel);
504         inner_buf.rear.store(curr, Release);
505 
506         #[cfg(feature = "metrics")]
507         inner_buf
508             .metrics
509             .task_from_global_count
510             .fetch_add(1, AcqRel);
511 
512         Some(first_task)
513     }
514 
pop_front(&self) -> Option<Task>515     pub(super) fn pop_front(&self) -> Option<Task> {
516         if self.is_empty() {
517             return None;
518         }
519         let mut list = self.globals.lock().unwrap();
520         let task = list
521             .pop_back()
522             .map(|header| unsafe { Task::from_raw(header) });
523         if task.is_some() {
524             self.len.fetch_sub(1, AcqRel);
525         }
526         drop(list);
527         task
528     }
529 
push_back(&self, task: Task)530     pub(super) fn push_back(&self, task: Task) {
531         let mut list = self.globals.lock().unwrap();
532         let header = task.into_header();
533         list.push_front(header);
534         self.len.fetch_add(1, AcqRel);
535         drop(list);
536         #[cfg(feature = "metrics")]
537         self.count.fetch_add(1, AcqRel);
538     }
539 
540     #[cfg(feature = "metrics")]
get_len(&self) -> usize541     pub(crate) fn get_len(&self) -> usize {
542         self.len.load(Acquire)
543     }
544 
545     #[cfg(feature = "metrics")]
get_count(&self) -> u64546     pub(crate) fn get_count(&self) -> u64 {
547         self.count.load(Acquire)
548     }
549 }
550 
551 #[cfg(test)]
552 pub(crate) mod test {
553     use std::future::Future;
554     use std::pin::Pin;
555     use std::sync::atomic::Ordering::Acquire;
556     use std::sync::Arc;
557     use std::task::{Context, Poll};
558     use std::thread::park;
559 
560     use crate::executor::async_pool::test::create_task;
561     use crate::executor::async_pool::MultiThreadScheduler;
562     use crate::executor::driver::Driver;
563     use crate::executor::queue::{GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP};
564     use crate::task::{TaskBuilder, VirtualTableType};
565 
566     #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
567     impl InnerBuffer {
len(&self) -> u16568         fn len(&self) -> u16 {
569             let front = self.front.load(Acquire);
570             let (_, real_pos) = crate::executor::queue::unwrap(front);
571 
572             let rear = self.rear.load(Acquire);
573             rear.wrapping_sub(real_pos)
574         }
575     }
576 
577     #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
578     impl LocalQueue {
len(&self) -> u16579         pub fn len(&self) -> u16 {
580             self.inner.len()
581         }
582     }
583 
584     pub struct TestFuture {
585         value: usize,
586         total: usize,
587     }
588 
create_new() -> TestFuture589     pub fn create_new() -> TestFuture {
590         TestFuture {
591             value: 0,
592             total: 1000,
593         }
594     }
595 
596     impl Future for TestFuture {
597         type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>598         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
599             if self.total > self.value {
600                 self.get_mut().value += 1;
601                 cx.waker().wake_by_ref();
602                 Poll::Pending
603             } else {
604                 Poll::Ready(self.total)
605             }
606         }
607     }
608 
test_future() -> usize609     async fn test_future() -> usize {
610         create_new().await
611     }
612 
613     impl LocalQueue {
pop_front_and_release(&self)614         fn pop_front_and_release(&self) {
615             let task = self.pop_front();
616             if let Some(task) = task {
617                 task.shutdown();
618             }
619         }
620 
steal_into_and_release(&self, other: &LocalQueue)621         fn steal_into_and_release(&self, other: &LocalQueue) {
622             let task = self.steal_into(other);
623             if let Some(task) = task {
624                 task.shutdown();
625             }
626         }
627     }
628 
629     /// UT test cases for InnerBuffer::new()
630     ///
631     /// # Brief
632     /// 1. Checking the parameters after initialization is completed
633     #[test]
ut_inner_buffer_new()634     fn ut_inner_buffer_new() {
635         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
636         assert_eq!(inner_buffer.cap, LOCAL_QUEUE_CAP as u16);
637         assert_eq!(inner_buffer.buffer.len(), LOCAL_QUEUE_CAP);
638     }
639 
640     /// InnerBuffer::is_empty() UT test cases
641     ///
642     /// # Brief
643     /// case execution
644     /// 1. Checking the parameters after initialization iscompleted
645     /// 2. After entering a task into the queue space, determine again whether
646     ///    it is empty or not, and it should be non-empty property value should
647     ///    be related to the entry after the initialization is completed
648     #[test]
ut_inner_buffer_is_empty()649     fn ut_inner_buffer_is_empty() {
650         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
651         assert!(inner_buffer.is_empty());
652 
653         let builder = TaskBuilder::new();
654 
655         let (arc_handle, _) = Driver::initialize();
656 
657         let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
658         let (task, _) = create_task(
659             &builder,
660             exe_scheduler,
661             test_future(),
662             VirtualTableType::Ylong,
663         );
664         let global_queue = GlobalQueue::new();
665         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
666         inner_buffer.push_back(task, &global_queue);
667         assert!(!inner_buffer.is_empty());
668     }
669 
670     /// InnerBuffer::len() UT test cases
671     ///
672     /// # Brief
673     /// case execution
674     /// 1. Checking the parameters after initialization is completed
675     /// 2. Insert tasks up to their capacity into the local queue, checking the
676     ///    local queue length
677     /// 3. Insert tasks into the local queue that exceed its capacity, checking
678     ///    the local queue length as well as the global queue length value, no
679     ///    exception branch, and the property value should be related to the
680     ///    entry after the initialization is completed
681     #[test]
ut_inner_buffer_len()682     fn ut_inner_buffer_len() {
683         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
684         assert_eq!(inner_buffer.len(), 0);
685 
686         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
687         let global_queue = GlobalQueue::new();
688         let builder = TaskBuilder::new();
689 
690         let (arc_handle, _) = Driver::initialize();
691 
692         let exe_scheduler =
693             Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
694         let (task, _) = create_task(
695             &builder,
696             exe_scheduler,
697             test_future(),
698             VirtualTableType::Ylong,
699         );
700         inner_buffer.push_back(task, &global_queue);
701         assert_eq!(inner_buffer.len(), 1);
702 
703         let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
704         let global_queue = GlobalQueue::new();
705         for _ in 0..LOCAL_QUEUE_CAP + 1 {
706             let exe_scheduler =
707                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
708             let (task, _) = create_task(
709                 &builder,
710                 exe_scheduler,
711                 test_future(),
712                 VirtualTableType::Ylong,
713             );
714             inner_buffer.push_back(task, &global_queue);
715         }
716         assert_eq!(
717             inner_buffer.len() as usize,
718             LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
719         );
720         assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
721     }
722 
723     /// InnerBuffer::push_back() UT test cases
724     ///
725     /// # Brief
726     /// case execution
727     /// 1. Insert tasks up to capacity into the local queue, verifying that they
728     ///    are functionally correct
729     /// 2. Insert tasks that exceed the capacity into the local queue and verify
730     ///    that they are functionally correct there is an exception branch,
731     ///    after the initialization is completed the property value should be
732     ///    related to the entry
733     #[test]
ut_inner_buffer_push_back()734     fn ut_inner_buffer_push_back() {
735         // 1. Insert tasks up to capacity into the local queue, verifying that they are
736         // functionally correct
737         let local_queue = LocalQueue::new();
738         let global_queue = GlobalQueue::new();
739 
740         let (arc_handle, _) = Driver::initialize();
741 
742         let builder = TaskBuilder::new();
743         for _ in 0..LOCAL_QUEUE_CAP / 2 {
744             let exe_scheduler =
745                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
746             let (task, _) = create_task(
747                 &builder,
748                 exe_scheduler,
749                 test_future(),
750                 VirtualTableType::Ylong,
751             );
752             local_queue.push_back(task, &global_queue);
753         }
754 
755         for _ in 0..LOCAL_QUEUE_CAP / 2 {
756             let exe_scheduler =
757                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
758             let (task, _) = create_task(
759                 &builder,
760                 exe_scheduler,
761                 test_future(),
762                 VirtualTableType::Ylong,
763             );
764             local_queue.push_back(task, &global_queue);
765         }
766 
767         assert_eq!(local_queue.len(), 256);
768 
769         // 2. Insert tasks that exceed the capacity into the local queue and verify that
770         // they are functionally correct
771         let local_queue = LocalQueue::new();
772         let global_queue = GlobalQueue::new();
773 
774         let (arc_handle, _) = Driver::initialize();
775 
776         for _ in 0..LOCAL_QUEUE_CAP / 2 + 1 {
777             let exe_scheduler =
778                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
779             let (task, _) = create_task(
780                 &builder,
781                 exe_scheduler,
782                 test_future(),
783                 VirtualTableType::Ylong,
784             );
785             local_queue.push_back(task, &global_queue);
786         }
787 
788         for _ in 0..LOCAL_QUEUE_CAP / 2 {
789             let exe_scheduler =
790                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
791             let (task, _) = create_task(
792                 &builder,
793                 exe_scheduler,
794                 test_future(),
795                 VirtualTableType::Ylong,
796             );
797             local_queue.push_back(task, &global_queue);
798         }
799 
800         assert_eq!(
801             local_queue.len() as usize,
802             LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
803         );
804         assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
805     }
806 
807     /// InnerBuffer::pop_front() UT test cases
808     ///
809     /// # Brief
810     /// case execution
811     /// 1. Multi-threaded take out task operation with empty local queue, check
812     ///    if the function is correct
813     /// 2. If the local queue is not empty, multi-threaded take out operations
814     ///    up to the number of existing tasks and check if the function is
815     ///    correct
816     /// 3. If the local queue is not empty, the multi-threaded operation to take
817     ///    out more than the number of existing tasks, check whether the
818     ///    function is correct should be related to the entry after the
819     ///    initialization is completed
820     #[test]
ut_inner_buffer_pop_front()821     fn ut_inner_buffer_pop_front() {
822         // 1. Multi-threaded take out task operation with empty local queue, check if
823         // the function is correct
824         let local_queue = LocalQueue::new();
825         let global_queue = GlobalQueue::new();
826         assert!(local_queue.pop_front().is_none());
827 
828         // 2. If the local queue is not empty, multi-threaded take out operations up to
829         // the number of existing tasks and check if the function is correct
830         let local_queue = Arc::new(LocalQueue::new());
831         let builder = TaskBuilder::new();
832 
833         let (arc_handle, _) = Driver::initialize();
834 
835         for _ in 0..LOCAL_QUEUE_CAP {
836             let exe_scheduler =
837                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
838             let (task, _) = create_task(
839                 &builder,
840                 exe_scheduler,
841                 test_future(),
842                 VirtualTableType::Ylong,
843             );
844             local_queue.push_back(task, &global_queue);
845         }
846         assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
847 
848         let local_queue_clone_one = local_queue.clone();
849         let local_queue_clone_two = local_queue.clone();
850 
851         let thread_one = std::thread::spawn(move || {
852             for _ in 0..LOCAL_QUEUE_CAP / 2 {
853                 local_queue_clone_one.pop_front_and_release();
854             }
855         });
856 
857         let thread_two = std::thread::spawn(move || {
858             for _ in 0..LOCAL_QUEUE_CAP / 2 {
859                 local_queue_clone_two.pop_front_and_release();
860             }
861         });
862 
863         thread_one.join().expect("failed");
864         thread_two.join().expect("failed");
865         assert!(local_queue.is_empty());
866 
867         // 3. If the local queue is not empty, the multi-threaded operation to take out
868         // more than the number of existing tasks, check whether the function is correct
869         let local_queue = Arc::new(LocalQueue::new());
870 
871         let (arc_handle, _) = Driver::initialize();
872 
873         for _ in 0..LOCAL_QUEUE_CAP {
874             let exe_scheduler =
875                 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
876             let (task, _) = create_task(
877                 &builder,
878                 exe_scheduler,
879                 test_future(),
880                 VirtualTableType::Ylong,
881             );
882             local_queue.push_back(task, &global_queue);
883         }
884         assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
885 
886         let local_queue_clone_one = local_queue.clone();
887         let local_queue_clone_two = local_queue.clone();
888 
889         let thread_one = std::thread::spawn(move || {
890             for _ in 0..LOCAL_QUEUE_CAP {
891                 local_queue_clone_one.pop_front_and_release();
892             }
893         });
894 
895         let thread_two = std::thread::spawn(move || {
896             for _ in 0..LOCAL_QUEUE_CAP {
897                 local_queue_clone_two.pop_front_and_release();
898             }
899         });
900 
901         thread_one.join().expect("failed");
902         thread_two.join().expect("failed");
903         assert!(local_queue.is_empty());
904     }
905 
906     /// InnerBuffer::steal_into() UT test cases
907     ///
908     /// # Brief
909     /// case execution
910     /// 1. In the single-threaded case, the number of tasks already in the local
911     ///    queue is not more than half, steal from other local queues, the
912     ///    number of steals is 0, check whether the function is completed
913     #[test]
ut_inner_buffer_steal_into_zero()914     fn ut_inner_buffer_steal_into_zero() {
915         let local_queue = LocalQueue::new();
916         let other_local_queue = LocalQueue::new();
917 
918         assert!(other_local_queue.steal_into(&local_queue).is_none());
919     }
920 
921     /// InnerBuffer::steal_into() UT test cases
922     ///
923     /// # Brief
924     /// case execution
925     /// 1. In the single-threaded case, the number of tasks already in the local
926     ///    queue is not more than half, steal from other local queues, the
927     ///    number of steals is not 0, check whether the function is completed
928     #[test]
ut_inner_buffer_steal_into_less_than_half()929     fn ut_inner_buffer_steal_into_less_than_half() {
930         let builder = TaskBuilder::new();
931         let (arc_handle, _) = Driver::initialize();
932         let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
933 
934         let local_queue = LocalQueue::new();
935         let other_local_queue = LocalQueue::new();
936         let global_queue = GlobalQueue::new();
937 
938         for _ in 0..LOCAL_QUEUE_CAP {
939             let exe_scheduler = Arc::downgrade(&multi_scheduler);
940             let (task, _) = create_task(
941                 &builder,
942                 exe_scheduler,
943                 test_future(),
944                 VirtualTableType::Ylong,
945             );
946             other_local_queue.push_back(task, &global_queue);
947         }
948 
949         other_local_queue.steal_into_and_release(&local_queue);
950 
951         assert_eq!(other_local_queue.len(), (LOCAL_QUEUE_CAP / 2) as u16);
952         assert_eq!(local_queue.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
953     }
954 
955     /// InnerBuffer::steal_into() UT test cases
956     ///
957     /// # Brief
958     /// case execution
959     /// 1. Multi-threaded case, other queues are doing take out operations, but
960     ///    steal from this queue to see if the function is completed
961     #[test]
ut_inner_buffer_steal_into_multi_thread()962     fn ut_inner_buffer_steal_into_multi_thread() {
963         let builder = TaskBuilder::new();
964         let (arc_handle, _) = Driver::initialize();
965         let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
966 
967         let local_queue = Arc::new(LocalQueue::new());
968         let local_queue_clone = local_queue.clone();
969 
970         let other_local_queue = Arc::new(LocalQueue::new());
971         let other_local_queue_clone_one = other_local_queue.clone();
972         let other_local_queue_clone_two = other_local_queue.clone();
973 
974         let global_queue = GlobalQueue::new();
975         for _ in 0..LOCAL_QUEUE_CAP {
976             let exe_scheduler = Arc::downgrade(&multi_scheduler);
977             let (task, _) = create_task(
978                 &builder,
979                 exe_scheduler,
980                 test_future(),
981                 VirtualTableType::Ylong,
982             );
983             other_local_queue.push_back(task, &global_queue);
984         }
985 
986         let thread_one = std::thread::spawn(move || {
987             for _ in 0..LOCAL_QUEUE_CAP / 2 {
988                 other_local_queue_clone_one.pop_front_and_release();
989             }
990         });
991 
992         let thread_two = std::thread::spawn(move || {
993             other_local_queue_clone_two.steal_into_and_release(&local_queue_clone);
994         });
995 
996         thread_one.join().expect("failed");
997         thread_two.join().expect("failed");
998 
999         assert_eq!(
1000             other_local_queue.len() + local_queue.len() + 1,
1001             (LOCAL_QUEUE_CAP / 2) as u16
1002         );
1003     }
1004 
1005     /// InnerBuffer::steal_into() UT test cases
1006     ///
1007     /// # Brief
1008     /// case execution
1009     /// 1. In the multi-threaded case, other queues are being stolen by
1010     ///    non-local queues, steal from that stolen queue and see if the
1011     ///    function is completed invalid value, and the property value should be
1012     ///    related to the entry after the initialization is completed
1013     #[test]
ut_inner_buffer_steal_into_multi_threaded_complex()1014     fn ut_inner_buffer_steal_into_multi_threaded_complex() {
1015         let global_queue = GlobalQueue::new();
1016 
1017         let builder = TaskBuilder::new();
1018         let (arc_handle, _) = Driver::initialize();
1019         let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
1020 
1021         let local_queue_one = Arc::new(LocalQueue::new());
1022         let local_queue_one_clone = local_queue_one.clone();
1023 
1024         let local_queue_two = Arc::new(LocalQueue::new());
1025         let local_queue_two_clone = local_queue_two.clone();
1026 
1027         let other_local_queue = Arc::new(LocalQueue::new());
1028         let other_local_queue_clone_one = other_local_queue.clone();
1029         let other_local_queue_clone_two = other_local_queue.clone();
1030 
1031         for _ in 0..LOCAL_QUEUE_CAP {
1032             let exe_scheduler = Arc::downgrade(&multi_scheduler);
1033             let (task, _) = create_task(
1034                 &builder,
1035                 exe_scheduler,
1036                 test_future(),
1037                 VirtualTableType::Ylong,
1038             );
1039             other_local_queue.push_back(task, &global_queue);
1040         }
1041 
1042         let thread_one = std::thread::spawn(move || {
1043             park();
1044             other_local_queue_clone_one.steal_into_and_release(&local_queue_one_clone);
1045         });
1046 
1047         let thread_two = std::thread::spawn(move || {
1048             other_local_queue_clone_two.steal_into_and_release(&local_queue_two_clone);
1049         });
1050 
1051         thread_two.join().expect("failed");
1052         thread_one.thread().unpark();
1053         thread_one.join().expect("failed");
1054 
1055         assert_eq!(local_queue_two.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
1056         assert_eq!(local_queue_one.len(), (LOCAL_QUEUE_CAP / 4 - 1) as u16);
1057     }
1058 
1059     /// InnerBuffer::steal_into() UT test cases
1060     ///
1061     /// # Brief
1062     /// case execution
1063     /// 1. In the single-threaded case, the local queue has more than half the
1064     ///    number of tasks, steal from other local queues, the number of steals
1065     ///    is 0, check whether the function is completed
1066     #[test]
ut_inner_buffer_steal_into_more_than_half()1067     fn ut_inner_buffer_steal_into_more_than_half() {
1068         // 1. In the single-threaded case, the local queue has more than half the number
1069         // of tasks, steal from other local queues, the number of steals is 0, check
1070         // whether the function is completed
1071         let local_queue = LocalQueue::new();
1072         let other_local_queue = LocalQueue::new();
1073         let global_queue = GlobalQueue::new();
1074 
1075         let builder = TaskBuilder::new();
1076         let (arc_handle, _) = Driver::initialize();
1077         let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
1078 
1079         for _ in 0..LOCAL_QUEUE_CAP {
1080             let exe_scheduler = Arc::downgrade(&multi_scheduler);
1081             let (task, _) = create_task(
1082                 &builder,
1083                 exe_scheduler,
1084                 test_future(),
1085                 VirtualTableType::Ylong,
1086             );
1087             local_queue.push_back(task, &global_queue);
1088         }
1089 
1090         for _ in 0..LOCAL_QUEUE_CAP {
1091             let exe_scheduler = Arc::downgrade(&multi_scheduler);
1092             let (task, _) = create_task(
1093                 &builder,
1094                 exe_scheduler,
1095                 test_future(),
1096                 VirtualTableType::Ylong,
1097             );
1098             other_local_queue.push_back(task, &global_queue);
1099         }
1100 
1101         assert!(other_local_queue.steal_into(&local_queue).is_none());
1102     }
1103 }
1104