1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::{Cell, RefCell};
15 use std::ptr;
16 use std::sync::Arc;
17 use std::task::Waker;
18 
19 /// worker struct info and method
20 use crate::executor::async_pool::MultiThreadScheduler;
21 use crate::executor::driver::Handle;
22 use crate::executor::parker::Parker;
23 use crate::executor::queue::LocalQueue;
24 use crate::task::Task;
25 
26 thread_local! {
27     pub(crate) static CURRENT_WORKER: Cell<* const ()> = Cell::new(ptr::null());
28     pub(crate) static CURRENT_HANDLE: Cell<* const ()> = Cell::new(ptr::null());
29 }
30 
31 pub(crate) struct WorkerContext {
32     pub(crate) worker: Arc<Worker>,
33 }
34 
35 impl WorkerContext {
36     #[inline]
run(&mut self)37     fn run(&mut self) {
38         let worker_ref = &self.worker;
39         worker_ref.run(self);
40     }
41 
wake_yield(&self) -> bool42     pub(crate) fn wake_yield(&self) -> bool {
43         let mut yielded = self.worker.yielded.borrow_mut();
44         if yielded.is_empty() {
45             return false;
46         }
47         for waker in yielded.drain(..) {
48             waker.wake();
49         }
50         true
51     }
52 
53     #[inline]
release(&mut self)54     fn release(&mut self) {
55         self.worker.release();
56     }
57 }
58 
59 pub(crate) struct WorkerHandle {
60     pub(crate) _handle: Arc<Handle>,
61 }
62 
63 /// Gets the handle of the current thread
64 #[cfg(all(not(feature = "ffrt"), any(feature = "net", feature = "time")))]
65 #[inline]
get_current_handle() -> Option<&'static WorkerHandle>66 pub(crate) fn get_current_handle() -> Option<&'static WorkerHandle> {
67     CURRENT_HANDLE.with(|ctx| {
68         let val = ctx.get();
69         if val.is_null() {
70             None
71         } else {
72             Some(unsafe { &*(val.cast::<WorkerHandle>()) })
73         }
74     })
75 }
76 
77 /// Gets the worker context of the current thread
78 #[inline]
get_current_ctx() -> Option<&'static WorkerContext>79 pub(crate) fn get_current_ctx() -> Option<&'static WorkerContext> {
80     CURRENT_WORKER.with(|ctx| {
81         let val = ctx.get();
82         if val.is_null() {
83             None
84         } else {
85             Some(unsafe { &*(val.cast::<WorkerContext>()) })
86         }
87     })
88 }
89 
90 /// Runs the worker thread
run_worker(worker: Arc<Worker>, handle: Arc<Handle>)91 pub(crate) fn run_worker(worker: Arc<Worker>, handle: Arc<Handle>) {
92     let mut cur_context = WorkerContext { worker };
93 
94     let cur_handle = WorkerHandle { _handle: handle };
95 
96     struct Reset(*const (), *const ());
97 
98     impl Drop for Reset {
99         fn drop(&mut self) {
100             CURRENT_WORKER.with(|ctx| ctx.set(self.0));
101             CURRENT_HANDLE.with(|handle| handle.set(self.1));
102         }
103     }
104     // store the worker to tls
105     let _guard = CURRENT_WORKER.with(|cur| {
106         let prev_ctx = cur.get();
107         cur.set((&cur_context as *const WorkerContext).cast::<()>());
108 
109         let handle = CURRENT_HANDLE.with(|handle| {
110             let prev_handle = handle.get();
111             handle.set((&cur_handle as *const WorkerHandle).cast::<()>());
112             prev_handle
113         });
114 
115         Reset(prev_ctx, handle)
116     });
117 
118     cur_context.run();
119     cur_context.release();
120     drop(cur_handle);
121 }
122 
123 pub(crate) struct Worker {
124     pub(crate) index: usize,
125     pub(crate) scheduler: Arc<MultiThreadScheduler>,
126     pub(crate) inner: RefCell<Box<Inner>>,
127     pub(crate) lifo: RefCell<Option<Task>>,
128     pub(crate) yielded: RefCell<Vec<Waker>>,
129 }
130 
131 unsafe impl Send for Worker {}
132 unsafe impl Sync for Worker {}
133 
134 impl Worker {
run(&self, worker_ctx: &WorkerContext)135     fn run(&self, worker_ctx: &WorkerContext) {
136         let mut inner = self.inner.borrow_mut();
137         let inner = inner.as_mut();
138 
139         while !inner.is_cancel() {
140             inner.increment_count();
141             inner.periodic_check(self);
142 
143             if let Some(task) = self.get_task(inner, worker_ctx) {
144                 if inner.is_searching {
145                     inner.is_searching = false;
146                     self.scheduler.wake_up_rand_one_if_last_search();
147                 }
148                 task.run();
149                 continue;
150             }
151 
152             // if there is no task, park the worker
153             self.park_timeout(inner, worker_ctx);
154             self.check_cancel(inner);
155 
156             if !inner.is_searching && self.scheduler.is_waked_by_last_search(self.index) {
157                 inner.is_searching = true;
158             }
159         }
160     }
161 
get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task>162     fn get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task> {
163         // schedule lifo task first
164         let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
165         if let Some(task) = lifo_slot.take() {
166             return Some(task);
167         }
168 
169         self.scheduler.dequeue(inner, worker_ctx)
170     }
171 
172     #[inline]
check_cancel(&self, inner: &mut Inner)173     fn check_cancel(&self, inner: &mut Inner) {
174         inner.check_cancel(self)
175     }
176 
has_work(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> bool177     fn has_work(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> bool {
178         worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty()
179     }
180 
park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext)181     fn park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
182         // still has works to do, go back to work
183         if self.has_work(inner, worker_ctx) {
184             return;
185         }
186         self.scheduler.turn_to_sleep(inner, self.index);
187         inner.is_searching = false;
188 
189         while !inner.is_cancel {
190             inner.parker.park();
191 
192             if self.has_work(inner, worker_ctx) {
193                 self.scheduler.turn_from_sleep(&self.index);
194                 break;
195             }
196 
197             if self.scheduler.is_parked(&self.index) {
198                 self.check_cancel(inner);
199                 continue;
200             }
201             break;
202         }
203     }
204 
205     /// Gets Worker's Inner with ptr.
206     ///
207     /// # Safety
208     /// We can't get Inner with `RefCell::borrow()`, because the worker will
209     /// always hold the borrow_mut until drop. So we can only get Inner by ptr.
210     /// This method can only be used to obtain values
211     #[cfg(feature = "metrics")]
get_inner_ptr(&self) -> &Inner212     pub(crate) unsafe fn get_inner_ptr(&self) -> &Inner {
213         let ptr = self.inner.as_ptr();
214         &*ptr
215     }
216 
217     #[inline]
release(&self)218     fn release(&self) {
219         // wait for tasks in queue to finish
220         while !self.scheduler.has_no_work() {}
221     }
222 }
223 
224 pub(crate) struct Inner {
225     /// A counter to define whether schedule global queue or local queue
226     pub(crate) count: u32,
227     /// Whether the workers are canceled
228     is_cancel: bool,
229     /// local queue
230     pub(crate) run_queue: LocalQueue,
231     pub(crate) parker: Parker,
232     pub(crate) is_searching: bool,
233 }
234 
235 impl Inner {
new(run_queues: LocalQueue, parker: Parker) -> Self236     pub(crate) fn new(run_queues: LocalQueue, parker: Parker) -> Self {
237         Inner {
238             count: 0,
239             is_cancel: false,
240             run_queue: run_queues,
241             parker,
242             is_searching: false,
243         }
244     }
245 }
246 
247 const GLOBAL_PERIODIC_INTERVAL: u8 = 61;
248 
249 impl Inner {
250     #[inline]
increment_count(&mut self)251     fn increment_count(&mut self) {
252         self.count = self.count.wrapping_add(1);
253     }
254 
255     // checks if the worker is canceled
256     #[inline]
check_cancel(&mut self, worker: &Worker)257     fn check_cancel(&mut self, worker: &Worker) {
258         if !self.is_cancel {
259             self.is_cancel = worker.scheduler.is_cancel();
260         }
261     }
262 
263     #[inline]
periodic_check(&mut self, worker: &Worker)264     fn periodic_check(&mut self, worker: &Worker) {
265         if self.count & GLOBAL_PERIODIC_INTERVAL as u32 == 0 {
266             self.check_cancel(worker);
267             if let Ok(mut driver) = self.parker.get_driver().try_lock() {
268                 driver.run_once();
269             }
270         }
271     }
272 
273     #[inline]
is_cancel(&self) -> bool274     fn is_cancel(&self) -> bool {
275         self.is_cancel
276     }
277 }
278