1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::UnsafeCell;
15 use std::mem::MaybeUninit;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicU64;
18 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
19 use std::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
20 use std::sync::{Arc, Mutex};
21 use std::{cmp, ptr};
22
23 /// Schedule strategy implementation, includes FIFO LIFO priority and
24 /// work-stealing work-stealing strategy include stealing half of every worker
25 /// or the largest amount of worker
26 use crate::task::{Header, Task};
27 use crate::util::linked_list::LinkedList;
28
non_atomic_load(data: &AtomicU16) -> u1629 unsafe fn non_atomic_load(data: &AtomicU16) -> u16 {
30 ptr::read((data as *const AtomicU16).cast::<u16>())
31 }
32
33 /// Capacity of the local queue
34 pub(crate) const LOCAL_QUEUE_CAP: usize = 256;
35 const MASK: u16 = LOCAL_QUEUE_CAP as u16 - 1;
36
37 /// Local queue of the worker
38 pub(crate) struct LocalQueue {
39 pub(crate) inner: Arc<InnerBuffer>,
40 }
41
42 unsafe impl Send for LocalQueue {}
43 unsafe impl Sync for LocalQueue {}
44
45 unsafe impl Send for InnerBuffer {}
46 unsafe impl Sync for InnerBuffer {}
47
48 impl LocalQueue {
new() -> Self49 pub(crate) fn new() -> Self {
50 LocalQueue {
51 inner: Arc::new(InnerBuffer::new(LOCAL_QUEUE_CAP as u16)),
52 }
53 }
54
is_half_full(&self, rear: u16) -> bool55 fn is_half_full(&self, rear: u16) -> bool {
56 let (steal_pos, _) = unwrap(self.inner.front.load(Acquire));
57 if rear.wrapping_sub(steal_pos) > LOCAL_QUEUE_CAP as u16 / 2 {
58 return true;
59 }
60 false
61 }
62 }
63
64 #[inline]
unwrap(num: u32) -> (u16, u16)65 fn unwrap(num: u32) -> (u16, u16) {
66 let head_pos = num & u16::MAX as u32;
67 let steal_pos = num >> 16;
68 (steal_pos as u16, head_pos as u16)
69 }
70
71 #[inline]
wrap(steal_pos: u16, head_pos: u16) -> u3272 fn wrap(steal_pos: u16, head_pos: u16) -> u32 {
73 (head_pos as u32) | ((steal_pos as u32) << 16)
74 }
75
76 impl LocalQueue {
77 #[inline]
pop_front(&self) -> Option<Task>78 pub(crate) fn pop_front(&self) -> Option<Task> {
79 self.inner.pop_front()
80 }
81
82 #[inline]
push_back(&self, task: Task, global: &GlobalQueue)83 pub(crate) fn push_back(&self, task: Task, global: &GlobalQueue) {
84 self.inner.push_back(task, global);
85 }
86
87 #[inline]
steal_into(&self, dst: &LocalQueue) -> Option<Task>88 pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
89 self.inner.steal_into(dst)
90 }
91
92 #[inline]
is_empty(&self) -> bool93 pub(crate) fn is_empty(&self) -> bool {
94 self.inner.is_empty()
95 }
96
97 #[inline]
remaining(&self) -> u1698 pub(crate) fn remaining(&self) -> u16 {
99 self.inner.remaining()
100 }
101 }
102
103 #[cfg(feature = "metrics")]
104 impl LocalQueue {
105 #[inline]
len(&self) -> u16106 pub(crate) fn len(&self) -> u16 {
107 self.inner.len()
108 }
109
110 #[inline]
count(&self) -> u64111 pub(crate) fn count(&self) -> u64 {
112 self.inner.count()
113 }
114
115 #[inline]
task_from_global_count(&self) -> u64116 pub(crate) fn task_from_global_count(&self) -> u64 {
117 self.inner.task_from_global_count()
118 }
119
120 #[inline]
task_to_global_count(&self) -> u64121 pub(crate) fn task_to_global_count(&self) -> u64 {
122 self.inner.task_to_global_count()
123 }
124 }
125
126 pub(crate) struct InnerBuffer {
127 /// Front stores the position of both head and steal
128 front: AtomicU32,
129 rear: AtomicU16,
130 cap: u16,
131 buffer: Box<[UnsafeCell<MaybeUninit<Task>>]>,
132 #[cfg(feature = "metrics")]
133 metrics: InnerBufferMetrics,
134 }
135
136 /// Metrics of InnerBuffer
137 #[cfg(feature = "metrics")]
138 struct InnerBufferMetrics {
139 /// The total number of task which has entered this LocalQueue
140 count: AtomicU64,
141 /// The total number of task which has entered this LocalQueue from
142 /// GlobalQueue
143 task_from_global_count: AtomicU64,
144 /// The total number of task which has entered GlobalQueue from this
145 /// LocalQueue
146 task_to_global_count: AtomicU64,
147 }
148
149 #[cfg(feature = "metrics")]
150 impl InnerBuffer {
151 /// Return queue's len.
len(&self) -> u16152 fn len(&self) -> u16 {
153 let rear = self.rear.load(Acquire);
154 let (_, head) = unwrap(self.front.load(Acquire));
155 rear.wrapping_sub(head)
156 }
157
158 /// Returns the total number of task which has entered this LocalQueue
count(&self) -> u64159 fn count(&self) -> u64 {
160 self.metrics.count.load(Acquire)
161 }
162
163 /// Returns the total number of task which has entered this LocalQueue from
164 /// GlobalQueue
task_from_global_count(&self) -> u64165 fn task_from_global_count(&self) -> u64 {
166 self.metrics.task_from_global_count.load(Acquire)
167 }
168
169 /// Returns the total number of task which has entered GlobalQueue from this
170 /// LocalQueue
task_to_global_count(&self) -> u64171 fn task_to_global_count(&self) -> u64 {
172 self.metrics.task_to_global_count.load(Acquire)
173 }
174 }
175
176 impl InnerBuffer {
new(cap: u16) -> Self177 fn new(cap: u16) -> Self {
178 let mut buffer = Vec::with_capacity(cap as usize);
179
180 for _ in 0..cap {
181 buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
182 }
183 InnerBuffer {
184 front: AtomicU32::new(0),
185 rear: AtomicU16::new(0),
186 cap,
187 buffer: buffer.into(),
188 #[cfg(feature = "metrics")]
189 metrics: InnerBufferMetrics {
190 count: AtomicU64::new(0),
191 task_from_global_count: AtomicU64::new(0),
192 task_to_global_count: AtomicU64::new(0),
193 },
194 }
195 }
196
197 /// Checks whether the queue is empty
is_empty(&self) -> bool198 fn is_empty(&self) -> bool {
199 let (_, head) = unwrap(self.front.load(Acquire));
200 let rear = self.rear.load(Acquire);
201 head == rear
202 }
203
pop_front(&self) -> Option<Task>204 pub(crate) fn pop_front(&self) -> Option<Task> {
205 let mut head = self.front.load(Acquire);
206
207 let pos = loop {
208 let (steal_pos, real_pos) = unwrap(head);
209
210 // it's a spmc queue, so the queue could read its own tail non-atomically
211 let tail_pos = unsafe { non_atomic_load(&self.rear) };
212
213 // return none if the queue is empty
214 if real_pos == tail_pos {
215 return None;
216 }
217
218 let next_real = real_pos.wrapping_add(1);
219 let next = if steal_pos == real_pos {
220 wrap(next_real, next_real)
221 } else {
222 wrap(steal_pos, next_real)
223 };
224
225 let res = self.front.compare_exchange(head, next, AcqRel, Acquire);
226 match res {
227 Ok(_) => break real_pos,
228 Err(actual) => head = actual,
229 }
230 };
231
232 let task = self.buffer[(pos & MASK) as usize].get();
233
234 Some(unsafe { ptr::read(task).assume_init() })
235 }
236
remaining(&self) -> u16237 pub(crate) fn remaining(&self) -> u16 {
238 let front = self.front.load(Acquire);
239
240 let (steal_pos, _real_pos) = unwrap(front);
241 // it's a spmc queue, so the queue could read its own tail non-atomically
242 let rear = unsafe { non_atomic_load(&self.rear) };
243
244 self.cap - (rear.wrapping_sub(steal_pos))
245 }
246
sync_steal_pos(&self, mut prev: u32)247 fn sync_steal_pos(&self, mut prev: u32) {
248 loop {
249 let (_front_steal, front_real) = unwrap(prev);
250 let next = wrap(front_real, front_real);
251 let res = self.front.compare_exchange(prev, next, AcqRel, Acquire);
252
253 if let Err(actual) = res {
254 let (actual_steal_pos, actual_real_pos) = unwrap(actual);
255 assert_ne!(
256 actual_steal_pos, actual_real_pos,
257 "steal pos: {}, real_pos: {}, they should not be the same",
258 actual_steal_pos, actual_real_pos
259 );
260 prev = actual;
261 } else {
262 return;
263 }
264 }
265 }
266
push_back(&self, mut task: Task, global: &GlobalQueue)267 pub(crate) fn push_back(&self, mut task: Task, global: &GlobalQueue) {
268 loop {
269 let front = self.front.load(Acquire);
270
271 let (steal_pos, _) = unwrap(front);
272 // it's a spmc queue, so the queue could read its own tail non-atomically
273 let rear = unsafe { non_atomic_load(&self.rear) };
274
275 // if the local queue is full, push the task into the global queue
276 if rear.wrapping_sub(steal_pos) < self.cap {
277 let idx = (rear & MASK) as usize;
278 let ptr = self.buffer[idx].get();
279 unsafe {
280 ptr::write((*ptr).as_mut_ptr(), task);
281 }
282 self.rear.store(rear.wrapping_add(1), SeqCst);
283 #[cfg(feature = "metrics")]
284 self.metrics.count.fetch_add(1, AcqRel);
285 return;
286 } else {
287 match self.push_overflowed(task, global, steal_pos) {
288 Ok(_) => return,
289 Err(ret) => task = ret,
290 }
291 }
292 }
293 }
294
295 #[allow(unused_assignments)]
push_overflowed( &self, task: Task, global: &GlobalQueue, front: u16, ) -> Result<(), Task>296 pub(crate) fn push_overflowed(
297 &self,
298 task: Task,
299 global: &GlobalQueue,
300 front: u16,
301 ) -> Result<(), Task> {
302 // get the number of tasks the worker has stolen
303 let count = LOCAL_QUEUE_CAP / 2;
304 let prev = wrap(front, front);
305 let next = wrap(front, front.wrapping_add(count as u16));
306
307 match self.front.compare_exchange(prev, next, Release, Acquire) {
308 Ok(_) => {}
309 Err(_) => return Err(task),
310 }
311
312 let (mut src_front_steal, _src_front_real) = unwrap(prev);
313
314 let mut tmp_buf = Vec::with_capacity(count);
315 for _ in 0..count {
316 tmp_buf.push(UnsafeCell::new(MaybeUninit::uninit()));
317 }
318
319 for dst_ptr in tmp_buf.iter().take(count) {
320 let src_idx = (src_front_steal & MASK) as usize;
321 let task_ptr = self.buffer[src_idx].get();
322 let task = unsafe { ptr::read(task_ptr).assume_init() };
323 unsafe {
324 ptr::write((*dst_ptr.get()).as_mut_ptr(), task);
325 }
326 src_front_steal = src_front_steal.wrapping_add(1);
327 }
328
329 self.sync_steal_pos(next);
330
331 #[cfg(feature = "metrics")]
332 self.metrics
333 .task_to_global_count
334 .fetch_add(tmp_buf.len() as u64 + 1, AcqRel);
335
336 global.push_batch(tmp_buf, task);
337
338 Ok(())
339 }
340
steal_into(&self, dst: &LocalQueue) -> Option<Task>341 pub(crate) fn steal_into(&self, dst: &LocalQueue) -> Option<Task> {
342 // it's a spmc queue, so the queue could read its own tail non-atomically
343 let mut dst_rear = unsafe { non_atomic_load(&dst.inner.rear) };
344 if dst.is_half_full(dst_rear) {
345 return None;
346 }
347
348 let mut src_next_front;
349 let mut src_prev_front = self.front.load(Acquire);
350
351 // get the number of tasks the worker has stolen
352 let mut count = loop {
353 let (src_front_steal, src_front_real) = unwrap(src_prev_front);
354
355 // if these two values are not equal, it means another worker has stolen from
356 // this queue, therefore abort this steal.
357 if src_front_steal != src_front_real {
358 return None;
359 };
360
361 let src_rear = self.rear.load(Acquire);
362
363 // steal half of the tasks from the queue
364 let mut n = src_rear.wrapping_sub(src_front_real);
365 n = n - n / 2;
366 if n == 0 {
367 return None;
368 }
369
370 let src_steal_to = src_front_real.wrapping_add(n);
371 src_next_front = wrap(src_front_steal, src_steal_to);
372
373 let res = self
374 .front
375 .compare_exchange(src_prev_front, src_next_front, AcqRel, Acquire);
376 match res {
377 Ok(_) => break n,
378 Err(actual) => src_prev_front = actual,
379 }
380 };
381
382 // transfer the tasks
383 let (mut src_front_steal, _src_front_real) = unwrap(src_next_front);
384 count -= 1;
385 for _ in 0..count {
386 let src_idx = (src_front_steal & MASK) as usize;
387 let des_idx = (dst_rear & MASK) as usize;
388
389 let task_ptr = self.buffer[src_idx].get();
390
391 let task = unsafe { ptr::read(task_ptr).assume_init() };
392 let ptr = dst.inner.buffer[des_idx].get();
393
394 unsafe {
395 ptr::write((*ptr).as_mut_ptr(), task);
396 }
397 src_front_steal = src_front_steal.wrapping_add(1);
398 dst_rear = dst_rear.wrapping_add(1);
399 }
400
401 let src_idx = (src_front_steal & MASK) as usize;
402
403 let task_ptr = self.buffer[src_idx].get();
404 let task = unsafe { ptr::read(task_ptr).assume_init() };
405 if count != 0 {
406 dst.inner.rear.store(dst_rear, SeqCst);
407 }
408
409 self.sync_steal_pos(src_next_front);
410
411 Some(task)
412 }
413 }
414
415 impl Drop for InnerBuffer {
drop(&mut self)416 fn drop(&mut self) {
417 let mut head = self.pop_front();
418 while let Some(task) = head {
419 task.shutdown();
420 head = self.pop_front();
421 }
422 }
423 }
424
425 pub(crate) struct GlobalQueue {
426 /// Current number of tasks
427 len: AtomicUsize,
428 /// The total number of tasks which has entered global queue.
429 #[cfg(feature = "metrics")]
430 count: AtomicU64,
431 globals: Mutex<LinkedList<Header>>,
432 }
433
434 impl Drop for GlobalQueue {
drop(&mut self)435 fn drop(&mut self) {
436 while !self.is_empty() {
437 // we just check the queue is not empty
438 let task = self.pop_front().unwrap();
439 task.shutdown();
440 }
441 }
442 }
443
444 impl GlobalQueue {
new() -> Self445 pub(crate) fn new() -> Self {
446 GlobalQueue {
447 len: AtomicUsize::new(0_usize),
448 #[cfg(feature = "metrics")]
449 count: AtomicU64::new(0),
450 globals: Mutex::new(LinkedList::new()),
451 }
452 }
is_empty(&self) -> bool453 pub(super) fn is_empty(&self) -> bool {
454 self.len.load(Acquire) == 0
455 }
456
push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task)457 pub(super) fn push_batch(&self, tasks: Vec<UnsafeCell<MaybeUninit<Task>>>, task: Task) {
458 let mut list = self.globals.lock().unwrap();
459 let len = tasks.len() + 1;
460 for task_ptr in tasks {
461 let task = unsafe { ptr::read(task_ptr.get()).assume_init() };
462 list.push_front(task.into_header());
463 }
464 list.push_front(task.into_header());
465 self.len.fetch_add(len, AcqRel);
466 #[cfg(feature = "metrics")]
467 self.count.fetch_add(len as u64, AcqRel);
468 }
469
pop_batch( &self, worker_num: usize, queue: &LocalQueue, limit: usize, ) -> Option<Task>470 pub(super) fn pop_batch(
471 &self,
472 worker_num: usize,
473 queue: &LocalQueue,
474 limit: usize,
475 ) -> Option<Task> {
476 let len = self.len.load(Acquire);
477 let num = cmp::min(len / worker_num, limit);
478
479 let inner_buf = &queue.inner;
480 // it's a spmc queue, so the queue could read its own tail non-atomically
481 let rear = unsafe { non_atomic_load(&inner_buf.rear) };
482 let mut curr = rear;
483
484 let mut list = self.globals.lock().unwrap();
485 let first_task = unsafe { Task::from_raw(list.pop_back()?) };
486
487 let mut count = 1;
488
489 for _ in 1..num {
490 if let Some(task) = list.pop_back() {
491 let idx = (curr & MASK) as usize;
492 let ptr = inner_buf.buffer[idx].get();
493 unsafe {
494 ptr::write((*ptr).as_mut_ptr(), Task::from_raw(task));
495 }
496 curr = curr.wrapping_add(1);
497 count += 1;
498 } else {
499 break;
500 }
501 }
502 drop(list);
503 self.len.fetch_sub(count, AcqRel);
504 inner_buf.rear.store(curr, Release);
505
506 #[cfg(feature = "metrics")]
507 inner_buf
508 .metrics
509 .task_from_global_count
510 .fetch_add(1, AcqRel);
511
512 Some(first_task)
513 }
514
pop_front(&self) -> Option<Task>515 pub(super) fn pop_front(&self) -> Option<Task> {
516 if self.is_empty() {
517 return None;
518 }
519 let mut list = self.globals.lock().unwrap();
520 let task = list
521 .pop_back()
522 .map(|header| unsafe { Task::from_raw(header) });
523 if task.is_some() {
524 self.len.fetch_sub(1, AcqRel);
525 }
526 drop(list);
527 task
528 }
529
push_back(&self, task: Task)530 pub(super) fn push_back(&self, task: Task) {
531 let mut list = self.globals.lock().unwrap();
532 let header = task.into_header();
533 list.push_front(header);
534 self.len.fetch_add(1, AcqRel);
535 drop(list);
536 #[cfg(feature = "metrics")]
537 self.count.fetch_add(1, AcqRel);
538 }
539
540 #[cfg(feature = "metrics")]
get_len(&self) -> usize541 pub(crate) fn get_len(&self) -> usize {
542 self.len.load(Acquire)
543 }
544
545 #[cfg(feature = "metrics")]
get_count(&self) -> u64546 pub(crate) fn get_count(&self) -> u64 {
547 self.count.load(Acquire)
548 }
549 }
550
551 #[cfg(test)]
552 pub(crate) mod test {
553 use std::future::Future;
554 use std::pin::Pin;
555 use std::sync::atomic::Ordering::Acquire;
556 use std::sync::Arc;
557 use std::task::{Context, Poll};
558 use std::thread::park;
559
560 use crate::executor::async_pool::test::create_task;
561 use crate::executor::async_pool::MultiThreadScheduler;
562 use crate::executor::driver::Driver;
563 use crate::executor::queue::{GlobalQueue, InnerBuffer, LocalQueue, LOCAL_QUEUE_CAP};
564 use crate::task::{TaskBuilder, VirtualTableType};
565
566 #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
567 impl InnerBuffer {
len(&self) -> u16568 fn len(&self) -> u16 {
569 let front = self.front.load(Acquire);
570 let (_, real_pos) = crate::executor::queue::unwrap(front);
571
572 let rear = self.rear.load(Acquire);
573 rear.wrapping_sub(real_pos)
574 }
575 }
576
577 #[cfg(any(not(feature = "metrics"), feature = "ffrt"))]
578 impl LocalQueue {
len(&self) -> u16579 pub fn len(&self) -> u16 {
580 self.inner.len()
581 }
582 }
583
584 pub struct TestFuture {
585 value: usize,
586 total: usize,
587 }
588
create_new() -> TestFuture589 pub fn create_new() -> TestFuture {
590 TestFuture {
591 value: 0,
592 total: 1000,
593 }
594 }
595
596 impl Future for TestFuture {
597 type Output = usize;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>598 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
599 if self.total > self.value {
600 self.get_mut().value += 1;
601 cx.waker().wake_by_ref();
602 Poll::Pending
603 } else {
604 Poll::Ready(self.total)
605 }
606 }
607 }
608
test_future() -> usize609 async fn test_future() -> usize {
610 create_new().await
611 }
612
613 impl LocalQueue {
pop_front_and_release(&self)614 fn pop_front_and_release(&self) {
615 let task = self.pop_front();
616 if let Some(task) = task {
617 task.shutdown();
618 }
619 }
620
steal_into_and_release(&self, other: &LocalQueue)621 fn steal_into_and_release(&self, other: &LocalQueue) {
622 let task = self.steal_into(other);
623 if let Some(task) = task {
624 task.shutdown();
625 }
626 }
627 }
628
629 /// UT test cases for InnerBuffer::new()
630 ///
631 /// # Brief
632 /// 1. Checking the parameters after initialization is completed
633 #[test]
ut_inner_buffer_new()634 fn ut_inner_buffer_new() {
635 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
636 assert_eq!(inner_buffer.cap, LOCAL_QUEUE_CAP as u16);
637 assert_eq!(inner_buffer.buffer.len(), LOCAL_QUEUE_CAP);
638 }
639
640 /// InnerBuffer::is_empty() UT test cases
641 ///
642 /// # Brief
643 /// case execution
644 /// 1. Checking the parameters after initialization iscompleted
645 /// 2. After entering a task into the queue space, determine again whether
646 /// it is empty or not, and it should be non-empty property value should
647 /// be related to the entry after the initialization is completed
648 #[test]
ut_inner_buffer_is_empty()649 fn ut_inner_buffer_is_empty() {
650 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
651 assert!(inner_buffer.is_empty());
652
653 let builder = TaskBuilder::new();
654
655 let (arc_handle, _) = Driver::initialize();
656
657 let exe_scheduler = Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle)));
658 let (task, _) = create_task(
659 &builder,
660 exe_scheduler,
661 test_future(),
662 VirtualTableType::Ylong,
663 );
664 let global_queue = GlobalQueue::new();
665 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
666 inner_buffer.push_back(task, &global_queue);
667 assert!(!inner_buffer.is_empty());
668 }
669
670 /// InnerBuffer::len() UT test cases
671 ///
672 /// # Brief
673 /// case execution
674 /// 1. Checking the parameters after initialization is completed
675 /// 2. Insert tasks up to their capacity into the local queue, checking the
676 /// local queue length
677 /// 3. Insert tasks into the local queue that exceed its capacity, checking
678 /// the local queue length as well as the global queue length value, no
679 /// exception branch, and the property value should be related to the
680 /// entry after the initialization is completed
681 #[test]
ut_inner_buffer_len()682 fn ut_inner_buffer_len() {
683 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
684 assert_eq!(inner_buffer.len(), 0);
685
686 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
687 let global_queue = GlobalQueue::new();
688 let builder = TaskBuilder::new();
689
690 let (arc_handle, _) = Driver::initialize();
691
692 let exe_scheduler =
693 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
694 let (task, _) = create_task(
695 &builder,
696 exe_scheduler,
697 test_future(),
698 VirtualTableType::Ylong,
699 );
700 inner_buffer.push_back(task, &global_queue);
701 assert_eq!(inner_buffer.len(), 1);
702
703 let inner_buffer = InnerBuffer::new(LOCAL_QUEUE_CAP as u16);
704 let global_queue = GlobalQueue::new();
705 for _ in 0..LOCAL_QUEUE_CAP + 1 {
706 let exe_scheduler =
707 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(1, arc_handle.clone())));
708 let (task, _) = create_task(
709 &builder,
710 exe_scheduler,
711 test_future(),
712 VirtualTableType::Ylong,
713 );
714 inner_buffer.push_back(task, &global_queue);
715 }
716 assert_eq!(
717 inner_buffer.len() as usize,
718 LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
719 );
720 assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
721 }
722
723 /// InnerBuffer::push_back() UT test cases
724 ///
725 /// # Brief
726 /// case execution
727 /// 1. Insert tasks up to capacity into the local queue, verifying that they
728 /// are functionally correct
729 /// 2. Insert tasks that exceed the capacity into the local queue and verify
730 /// that they are functionally correct there is an exception branch,
731 /// after the initialization is completed the property value should be
732 /// related to the entry
733 #[test]
ut_inner_buffer_push_back()734 fn ut_inner_buffer_push_back() {
735 // 1. Insert tasks up to capacity into the local queue, verifying that they are
736 // functionally correct
737 let local_queue = LocalQueue::new();
738 let global_queue = GlobalQueue::new();
739
740 let (arc_handle, _) = Driver::initialize();
741
742 let builder = TaskBuilder::new();
743 for _ in 0..LOCAL_QUEUE_CAP / 2 {
744 let exe_scheduler =
745 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
746 let (task, _) = create_task(
747 &builder,
748 exe_scheduler,
749 test_future(),
750 VirtualTableType::Ylong,
751 );
752 local_queue.push_back(task, &global_queue);
753 }
754
755 for _ in 0..LOCAL_QUEUE_CAP / 2 {
756 let exe_scheduler =
757 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
758 let (task, _) = create_task(
759 &builder,
760 exe_scheduler,
761 test_future(),
762 VirtualTableType::Ylong,
763 );
764 local_queue.push_back(task, &global_queue);
765 }
766
767 assert_eq!(local_queue.len(), 256);
768
769 // 2. Insert tasks that exceed the capacity into the local queue and verify that
770 // they are functionally correct
771 let local_queue = LocalQueue::new();
772 let global_queue = GlobalQueue::new();
773
774 let (arc_handle, _) = Driver::initialize();
775
776 for _ in 0..LOCAL_QUEUE_CAP / 2 + 1 {
777 let exe_scheduler =
778 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
779 let (task, _) = create_task(
780 &builder,
781 exe_scheduler,
782 test_future(),
783 VirtualTableType::Ylong,
784 );
785 local_queue.push_back(task, &global_queue);
786 }
787
788 for _ in 0..LOCAL_QUEUE_CAP / 2 {
789 let exe_scheduler =
790 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
791 let (task, _) = create_task(
792 &builder,
793 exe_scheduler,
794 test_future(),
795 VirtualTableType::Ylong,
796 );
797 local_queue.push_back(task, &global_queue);
798 }
799
800 assert_eq!(
801 local_queue.len() as usize,
802 LOCAL_QUEUE_CAP - LOCAL_QUEUE_CAP / 2
803 );
804 assert_eq!(global_queue.len.load(Acquire), 1 + LOCAL_QUEUE_CAP / 2);
805 }
806
807 /// InnerBuffer::pop_front() UT test cases
808 ///
809 /// # Brief
810 /// case execution
811 /// 1. Multi-threaded take out task operation with empty local queue, check
812 /// if the function is correct
813 /// 2. If the local queue is not empty, multi-threaded take out operations
814 /// up to the number of existing tasks and check if the function is
815 /// correct
816 /// 3. If the local queue is not empty, the multi-threaded operation to take
817 /// out more than the number of existing tasks, check whether the
818 /// function is correct should be related to the entry after the
819 /// initialization is completed
820 #[test]
ut_inner_buffer_pop_front()821 fn ut_inner_buffer_pop_front() {
822 // 1. Multi-threaded take out task operation with empty local queue, check if
823 // the function is correct
824 let local_queue = LocalQueue::new();
825 let global_queue = GlobalQueue::new();
826 assert!(local_queue.pop_front().is_none());
827
828 // 2. If the local queue is not empty, multi-threaded take out operations up to
829 // the number of existing tasks and check if the function is correct
830 let local_queue = Arc::new(LocalQueue::new());
831 let builder = TaskBuilder::new();
832
833 let (arc_handle, _) = Driver::initialize();
834
835 for _ in 0..LOCAL_QUEUE_CAP {
836 let exe_scheduler =
837 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
838 let (task, _) = create_task(
839 &builder,
840 exe_scheduler,
841 test_future(),
842 VirtualTableType::Ylong,
843 );
844 local_queue.push_back(task, &global_queue);
845 }
846 assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
847
848 let local_queue_clone_one = local_queue.clone();
849 let local_queue_clone_two = local_queue.clone();
850
851 let thread_one = std::thread::spawn(move || {
852 for _ in 0..LOCAL_QUEUE_CAP / 2 {
853 local_queue_clone_one.pop_front_and_release();
854 }
855 });
856
857 let thread_two = std::thread::spawn(move || {
858 for _ in 0..LOCAL_QUEUE_CAP / 2 {
859 local_queue_clone_two.pop_front_and_release();
860 }
861 });
862
863 thread_one.join().expect("failed");
864 thread_two.join().expect("failed");
865 assert!(local_queue.is_empty());
866
867 // 3. If the local queue is not empty, the multi-threaded operation to take out
868 // more than the number of existing tasks, check whether the function is correct
869 let local_queue = Arc::new(LocalQueue::new());
870
871 let (arc_handle, _) = Driver::initialize();
872
873 for _ in 0..LOCAL_QUEUE_CAP {
874 let exe_scheduler =
875 Arc::downgrade(&Arc::new(MultiThreadScheduler::new(2, arc_handle.clone())));
876 let (task, _) = create_task(
877 &builder,
878 exe_scheduler,
879 test_future(),
880 VirtualTableType::Ylong,
881 );
882 local_queue.push_back(task, &global_queue);
883 }
884 assert_eq!(local_queue.len(), LOCAL_QUEUE_CAP as u16);
885
886 let local_queue_clone_one = local_queue.clone();
887 let local_queue_clone_two = local_queue.clone();
888
889 let thread_one = std::thread::spawn(move || {
890 for _ in 0..LOCAL_QUEUE_CAP {
891 local_queue_clone_one.pop_front_and_release();
892 }
893 });
894
895 let thread_two = std::thread::spawn(move || {
896 for _ in 0..LOCAL_QUEUE_CAP {
897 local_queue_clone_two.pop_front_and_release();
898 }
899 });
900
901 thread_one.join().expect("failed");
902 thread_two.join().expect("failed");
903 assert!(local_queue.is_empty());
904 }
905
906 /// InnerBuffer::steal_into() UT test cases
907 ///
908 /// # Brief
909 /// case execution
910 /// 1. In the single-threaded case, the number of tasks already in the local
911 /// queue is not more than half, steal from other local queues, the
912 /// number of steals is 0, check whether the function is completed
913 #[test]
ut_inner_buffer_steal_into_zero()914 fn ut_inner_buffer_steal_into_zero() {
915 let local_queue = LocalQueue::new();
916 let other_local_queue = LocalQueue::new();
917
918 assert!(other_local_queue.steal_into(&local_queue).is_none());
919 }
920
921 /// InnerBuffer::steal_into() UT test cases
922 ///
923 /// # Brief
924 /// case execution
925 /// 1. In the single-threaded case, the number of tasks already in the local
926 /// queue is not more than half, steal from other local queues, the
927 /// number of steals is not 0, check whether the function is completed
928 #[test]
ut_inner_buffer_steal_into_less_than_half()929 fn ut_inner_buffer_steal_into_less_than_half() {
930 let builder = TaskBuilder::new();
931 let (arc_handle, _) = Driver::initialize();
932 let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
933
934 let local_queue = LocalQueue::new();
935 let other_local_queue = LocalQueue::new();
936 let global_queue = GlobalQueue::new();
937
938 for _ in 0..LOCAL_QUEUE_CAP {
939 let exe_scheduler = Arc::downgrade(&multi_scheduler);
940 let (task, _) = create_task(
941 &builder,
942 exe_scheduler,
943 test_future(),
944 VirtualTableType::Ylong,
945 );
946 other_local_queue.push_back(task, &global_queue);
947 }
948
949 other_local_queue.steal_into_and_release(&local_queue);
950
951 assert_eq!(other_local_queue.len(), (LOCAL_QUEUE_CAP / 2) as u16);
952 assert_eq!(local_queue.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
953 }
954
955 /// InnerBuffer::steal_into() UT test cases
956 ///
957 /// # Brief
958 /// case execution
959 /// 1. Multi-threaded case, other queues are doing take out operations, but
960 /// steal from this queue to see if the function is completed
961 #[test]
ut_inner_buffer_steal_into_multi_thread()962 fn ut_inner_buffer_steal_into_multi_thread() {
963 let builder = TaskBuilder::new();
964 let (arc_handle, _) = Driver::initialize();
965 let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
966
967 let local_queue = Arc::new(LocalQueue::new());
968 let local_queue_clone = local_queue.clone();
969
970 let other_local_queue = Arc::new(LocalQueue::new());
971 let other_local_queue_clone_one = other_local_queue.clone();
972 let other_local_queue_clone_two = other_local_queue.clone();
973
974 let global_queue = GlobalQueue::new();
975 for _ in 0..LOCAL_QUEUE_CAP {
976 let exe_scheduler = Arc::downgrade(&multi_scheduler);
977 let (task, _) = create_task(
978 &builder,
979 exe_scheduler,
980 test_future(),
981 VirtualTableType::Ylong,
982 );
983 other_local_queue.push_back(task, &global_queue);
984 }
985
986 let thread_one = std::thread::spawn(move || {
987 for _ in 0..LOCAL_QUEUE_CAP / 2 {
988 other_local_queue_clone_one.pop_front_and_release();
989 }
990 });
991
992 let thread_two = std::thread::spawn(move || {
993 other_local_queue_clone_two.steal_into_and_release(&local_queue_clone);
994 });
995
996 thread_one.join().expect("failed");
997 thread_two.join().expect("failed");
998
999 assert_eq!(
1000 other_local_queue.len() + local_queue.len() + 1,
1001 (LOCAL_QUEUE_CAP / 2) as u16
1002 );
1003 }
1004
1005 /// InnerBuffer::steal_into() UT test cases
1006 ///
1007 /// # Brief
1008 /// case execution
1009 /// 1. In the multi-threaded case, other queues are being stolen by
1010 /// non-local queues, steal from that stolen queue and see if the
1011 /// function is completed invalid value, and the property value should be
1012 /// related to the entry after the initialization is completed
1013 #[test]
ut_inner_buffer_steal_into_multi_threaded_complex()1014 fn ut_inner_buffer_steal_into_multi_threaded_complex() {
1015 let global_queue = GlobalQueue::new();
1016
1017 let builder = TaskBuilder::new();
1018 let (arc_handle, _) = Driver::initialize();
1019 let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
1020
1021 let local_queue_one = Arc::new(LocalQueue::new());
1022 let local_queue_one_clone = local_queue_one.clone();
1023
1024 let local_queue_two = Arc::new(LocalQueue::new());
1025 let local_queue_two_clone = local_queue_two.clone();
1026
1027 let other_local_queue = Arc::new(LocalQueue::new());
1028 let other_local_queue_clone_one = other_local_queue.clone();
1029 let other_local_queue_clone_two = other_local_queue.clone();
1030
1031 for _ in 0..LOCAL_QUEUE_CAP {
1032 let exe_scheduler = Arc::downgrade(&multi_scheduler);
1033 let (task, _) = create_task(
1034 &builder,
1035 exe_scheduler,
1036 test_future(),
1037 VirtualTableType::Ylong,
1038 );
1039 other_local_queue.push_back(task, &global_queue);
1040 }
1041
1042 let thread_one = std::thread::spawn(move || {
1043 park();
1044 other_local_queue_clone_one.steal_into_and_release(&local_queue_one_clone);
1045 });
1046
1047 let thread_two = std::thread::spawn(move || {
1048 other_local_queue_clone_two.steal_into_and_release(&local_queue_two_clone);
1049 });
1050
1051 thread_two.join().expect("failed");
1052 thread_one.thread().unpark();
1053 thread_one.join().expect("failed");
1054
1055 assert_eq!(local_queue_two.len(), (LOCAL_QUEUE_CAP / 2 - 1) as u16);
1056 assert_eq!(local_queue_one.len(), (LOCAL_QUEUE_CAP / 4 - 1) as u16);
1057 }
1058
1059 /// InnerBuffer::steal_into() UT test cases
1060 ///
1061 /// # Brief
1062 /// case execution
1063 /// 1. In the single-threaded case, the local queue has more than half the
1064 /// number of tasks, steal from other local queues, the number of steals
1065 /// is 0, check whether the function is completed
1066 #[test]
ut_inner_buffer_steal_into_more_than_half()1067 fn ut_inner_buffer_steal_into_more_than_half() {
1068 // 1. In the single-threaded case, the local queue has more than half the number
1069 // of tasks, steal from other local queues, the number of steals is 0, check
1070 // whether the function is completed
1071 let local_queue = LocalQueue::new();
1072 let other_local_queue = LocalQueue::new();
1073 let global_queue = GlobalQueue::new();
1074
1075 let builder = TaskBuilder::new();
1076 let (arc_handle, _) = Driver::initialize();
1077 let multi_scheduler = Arc::new(MultiThreadScheduler::new(1, arc_handle));
1078
1079 for _ in 0..LOCAL_QUEUE_CAP {
1080 let exe_scheduler = Arc::downgrade(&multi_scheduler);
1081 let (task, _) = create_task(
1082 &builder,
1083 exe_scheduler,
1084 test_future(),
1085 VirtualTableType::Ylong,
1086 );
1087 local_queue.push_back(task, &global_queue);
1088 }
1089
1090 for _ in 0..LOCAL_QUEUE_CAP {
1091 let exe_scheduler = Arc::downgrade(&multi_scheduler);
1092 let (task, _) = create_task(
1093 &builder,
1094 exe_scheduler,
1095 test_future(),
1096 VirtualTableType::Ylong,
1097 );
1098 other_local_queue.push_back(task, &global_queue);
1099 }
1100
1101 assert!(other_local_queue.steal_into(&local_queue).is_none());
1102 }
1103 }
1104