1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::{Cell, RefCell};
15 use std::ptr;
16 use std::sync::Arc;
17 use std::task::Waker;
18
19 /// worker struct info and method
20 use crate::executor::async_pool::MultiThreadScheduler;
21 use crate::executor::driver::Handle;
22 use crate::executor::parker::Parker;
23 use crate::executor::queue::LocalQueue;
24 use crate::task::Task;
25
26 thread_local! {
27 pub(crate) static CURRENT_WORKER: Cell<* const ()> = Cell::new(ptr::null());
28 pub(crate) static CURRENT_HANDLE: Cell<* const ()> = Cell::new(ptr::null());
29 }
30
31 pub(crate) struct WorkerContext {
32 pub(crate) worker: Arc<Worker>,
33 }
34
35 impl WorkerContext {
36 #[inline]
run(&mut self)37 fn run(&mut self) {
38 let worker_ref = &self.worker;
39 worker_ref.run(self);
40 }
41
wake_yield(&self) -> bool42 pub(crate) fn wake_yield(&self) -> bool {
43 let mut yielded = self.worker.yielded.borrow_mut();
44 if yielded.is_empty() {
45 return false;
46 }
47 for waker in yielded.drain(..) {
48 waker.wake();
49 }
50 true
51 }
52
53 #[inline]
release(&mut self)54 fn release(&mut self) {
55 self.worker.release();
56 }
57 }
58
59 pub(crate) struct WorkerHandle {
60 pub(crate) _handle: Arc<Handle>,
61 }
62
63 /// Gets the handle of the current thread
64 #[cfg(all(not(feature = "ffrt"), any(feature = "net", feature = "time")))]
65 #[inline]
get_current_handle() -> Option<&'static WorkerHandle>66 pub(crate) fn get_current_handle() -> Option<&'static WorkerHandle> {
67 CURRENT_HANDLE.with(|ctx| {
68 let val = ctx.get();
69 if val.is_null() {
70 None
71 } else {
72 Some(unsafe { &*(val.cast::<WorkerHandle>()) })
73 }
74 })
75 }
76
77 /// Gets the worker context of the current thread
78 #[inline]
get_current_ctx() -> Option<&'static WorkerContext>79 pub(crate) fn get_current_ctx() -> Option<&'static WorkerContext> {
80 CURRENT_WORKER.with(|ctx| {
81 let val = ctx.get();
82 if val.is_null() {
83 None
84 } else {
85 Some(unsafe { &*(val.cast::<WorkerContext>()) })
86 }
87 })
88 }
89
90 /// Runs the worker thread
run_worker(worker: Arc<Worker>, handle: Arc<Handle>)91 pub(crate) fn run_worker(worker: Arc<Worker>, handle: Arc<Handle>) {
92 let mut cur_context = WorkerContext { worker };
93
94 let cur_handle = WorkerHandle { _handle: handle };
95
96 struct Reset(*const (), *const ());
97
98 impl Drop for Reset {
99 fn drop(&mut self) {
100 CURRENT_WORKER.with(|ctx| ctx.set(self.0));
101 CURRENT_HANDLE.with(|handle| handle.set(self.1));
102 }
103 }
104 // store the worker to tls
105 let _guard = CURRENT_WORKER.with(|cur| {
106 let prev_ctx = cur.get();
107 cur.set((&cur_context as *const WorkerContext).cast::<()>());
108
109 let handle = CURRENT_HANDLE.with(|handle| {
110 let prev_handle = handle.get();
111 handle.set((&cur_handle as *const WorkerHandle).cast::<()>());
112 prev_handle
113 });
114
115 Reset(prev_ctx, handle)
116 });
117
118 cur_context.run();
119 cur_context.release();
120 drop(cur_handle);
121 }
122
123 pub(crate) struct Worker {
124 pub(crate) index: usize,
125 pub(crate) scheduler: Arc<MultiThreadScheduler>,
126 pub(crate) inner: RefCell<Box<Inner>>,
127 pub(crate) lifo: RefCell<Option<Task>>,
128 pub(crate) yielded: RefCell<Vec<Waker>>,
129 }
130
131 unsafe impl Send for Worker {}
132 unsafe impl Sync for Worker {}
133
134 impl Worker {
run(&self, worker_ctx: &WorkerContext)135 fn run(&self, worker_ctx: &WorkerContext) {
136 let mut inner = self.inner.borrow_mut();
137 let inner = inner.as_mut();
138
139 while !inner.is_cancel() {
140 inner.increment_count();
141 inner.periodic_check(self);
142
143 if let Some(task) = self.get_task(inner, worker_ctx) {
144 if inner.is_searching {
145 inner.is_searching = false;
146 self.scheduler.wake_up_rand_one_if_last_search();
147 }
148 task.run();
149 continue;
150 }
151
152 // if there is no task, park the worker
153 self.park_timeout(inner, worker_ctx);
154 self.check_cancel(inner);
155
156 if !inner.is_searching && self.scheduler.is_waked_by_last_search(self.index) {
157 inner.is_searching = true;
158 }
159 }
160 }
161
get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task>162 fn get_task(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> Option<Task> {
163 // schedule lifo task first
164 let mut lifo_slot = worker_ctx.worker.lifo.borrow_mut();
165 if let Some(task) = lifo_slot.take() {
166 return Some(task);
167 }
168
169 self.scheduler.dequeue(inner, worker_ctx)
170 }
171
172 #[inline]
check_cancel(&self, inner: &mut Inner)173 fn check_cancel(&self, inner: &mut Inner) {
174 inner.check_cancel(self)
175 }
176
has_work(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> bool177 fn has_work(&self, inner: &mut Inner, worker_ctx: &WorkerContext) -> bool {
178 worker_ctx.worker.lifo.borrow().is_some() || !inner.run_queue.is_empty()
179 }
180
park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext)181 fn park_timeout(&self, inner: &mut Inner, worker_ctx: &WorkerContext) {
182 // still has works to do, go back to work
183 if self.has_work(inner, worker_ctx) {
184 return;
185 }
186 self.scheduler.turn_to_sleep(inner, self.index);
187 inner.is_searching = false;
188
189 while !inner.is_cancel {
190 inner.parker.park();
191
192 if self.has_work(inner, worker_ctx) {
193 self.scheduler.turn_from_sleep(&self.index);
194 break;
195 }
196
197 if self.scheduler.is_parked(&self.index) {
198 self.check_cancel(inner);
199 continue;
200 }
201 break;
202 }
203 }
204
205 /// Gets Worker's Inner with ptr.
206 ///
207 /// # Safety
208 /// We can't get Inner with `RefCell::borrow()`, because the worker will
209 /// always hold the borrow_mut until drop. So we can only get Inner by ptr.
210 /// This method can only be used to obtain values
211 #[cfg(feature = "metrics")]
get_inner_ptr(&self) -> &Inner212 pub(crate) unsafe fn get_inner_ptr(&self) -> &Inner {
213 let ptr = self.inner.as_ptr();
214 &*ptr
215 }
216
217 #[inline]
release(&self)218 fn release(&self) {
219 // wait for tasks in queue to finish
220 while !self.scheduler.has_no_work() {}
221 }
222 }
223
224 pub(crate) struct Inner {
225 /// A counter to define whether schedule global queue or local queue
226 pub(crate) count: u32,
227 /// Whether the workers are canceled
228 is_cancel: bool,
229 /// local queue
230 pub(crate) run_queue: LocalQueue,
231 pub(crate) parker: Parker,
232 pub(crate) is_searching: bool,
233 }
234
235 impl Inner {
new(run_queues: LocalQueue, parker: Parker) -> Self236 pub(crate) fn new(run_queues: LocalQueue, parker: Parker) -> Self {
237 Inner {
238 count: 0,
239 is_cancel: false,
240 run_queue: run_queues,
241 parker,
242 is_searching: false,
243 }
244 }
245 }
246
247 const GLOBAL_PERIODIC_INTERVAL: u8 = 61;
248
249 impl Inner {
250 #[inline]
increment_count(&mut self)251 fn increment_count(&mut self) {
252 self.count = self.count.wrapping_add(1);
253 }
254
255 // checks if the worker is canceled
256 #[inline]
check_cancel(&mut self, worker: &Worker)257 fn check_cancel(&mut self, worker: &Worker) {
258 if !self.is_cancel {
259 self.is_cancel = worker.scheduler.is_cancel();
260 }
261 }
262
263 #[inline]
periodic_check(&mut self, worker: &Worker)264 fn periodic_check(&mut self, worker: &Worker) {
265 if self.count & GLOBAL_PERIODIC_INTERVAL as u32 == 0 {
266 self.check_cancel(worker);
267 if let Ok(mut driver) = self.parker.get_driver().try_lock() {
268 driver.run_once();
269 }
270 }
271 }
272
273 #[inline]
is_cancel(&self) -> bool274 fn is_cancel(&self) -> bool {
275 self.is_cancel
276 }
277 }
278