1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cell::UnsafeCell;
15 use std::future::Future;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::pin::Pin;
19 use std::ptr::{addr_of_mut, NonNull};
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
22 use std::sync::Mutex;
23 use std::task::{Context, Poll, Waker};
24
25 use ylong_io::Interest;
26
27 use crate::futures::poll_fn;
28 use crate::net::{Ready, ReadyEvent};
29 use crate::util::bit::{Bit, Mask};
30 use crate::util::linked_list::{Link, LinkedList, Node};
31 use crate::util::slab::Entry;
32
33 const GENERATION: Mask = Mask::new(7, 24);
34 pub(crate) const DRIVER_TICK: Mask = Mask::new(8, 16);
35 pub(crate) const READINESS: Mask = Mask::new(16, 0);
36
37 // ScheduleIO::status structure
38 //
39 // | reserved | generation | driver tick | readiness |
40 // |----------|------------|-------------|-----------|
41 // | 1 bit | 7 bits | 8 bits | 16 bits |
42 pub(crate) struct ScheduleIO {
43 /// ScheduleIO status
44 pub(crate) status: AtomicUsize,
45
46 /// Wakers that wait for this IO
47 waiters: Mutex<Waiters>,
48 }
49
50 #[derive(Default)]
51 pub(crate) struct Waiters {
52 list: LinkedList<Waiter>,
53
54 // Reader & writer wakers are for AsyncRead/AsyncWriter
55 reader: Option<Waker>,
56
57 writer: Option<Waker>,
58
59 is_shutdown: bool,
60 }
61
62 pub(crate) struct Waiter {
63 waker: Option<Waker>,
64
65 interest: Interest,
66
67 is_ready: bool,
68
69 node: Node<Waiter>,
70
71 _p: PhantomPinned,
72 }
73
74 pub(crate) enum Tick {
75 Set(u8),
76 Clear(u8),
77 }
78
79 impl Default for ScheduleIO {
default() -> Self80 fn default() -> Self {
81 ScheduleIO {
82 status: AtomicUsize::new(0),
83 waiters: Mutex::new(Default::default()),
84 }
85 }
86 }
87
88 impl Default for Waiter {
default() -> Self89 fn default() -> Self {
90 Waiter {
91 waker: None,
92 interest: Interest::READABLE,
93 is_ready: false,
94 node: Node::new(),
95 _p: PhantomPinned,
96 }
97 }
98 }
99
100 unsafe impl Link for Waiter {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,101 unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
102 where
103 Self: Sized,
104 {
105 let node_ptr = addr_of_mut!(ptr.as_mut().node);
106 NonNull::new_unchecked(node_ptr)
107 }
108 }
109
110 impl Entry for ScheduleIO {
reset(&self)111 fn reset(&self) {
112 let status_bit = Bit::from_usize(self.status.load(Acquire));
113
114 let generation = status_bit.get_by_mask(GENERATION);
115 let new_generation = generation.wrapping_add(1);
116 let mut next = Bit::from_usize(0);
117 next.set_by_mask(GENERATION, new_generation);
118 self.status.store(next.as_usize(), Release);
119 }
120 }
121
122 impl ScheduleIO {
generation(&self) -> usize123 pub fn generation(&self) -> usize {
124 let base = Bit::from_usize(self.status.load(Acquire));
125 base.get_by_mask(GENERATION)
126 }
127
128 #[cfg(feature = "net")]
poll_readiness( &self, cx: &mut Context<'_>, interest: Interest, ) -> Poll<ReadyEvent>129 pub(crate) fn poll_readiness(
130 &self,
131 cx: &mut Context<'_>,
132 interest: Interest,
133 ) -> Poll<ReadyEvent> {
134 // Get current status and check if it contains our interest
135 let curr_bit = Bit::from_usize(self.status.load(Acquire));
136 let ready = Ready::from_usize(curr_bit.get_by_mask(READINESS)).intersection(interest);
137
138 if ready.is_empty() {
139 let mut waiters = self.waiters.lock().unwrap();
140 // Put the waker associated with the context into the waiters
141 match interest {
142 Interest::WRITABLE => waiters.writer = Some(cx.waker().clone()),
143 Interest::READABLE => waiters.reader = Some(cx.waker().clone()),
144 _ => unreachable!(),
145 }
146
147 // Check one more time to see if any event is ready
148 let ready_event = self.get_readiness(interest);
149 if !waiters.is_shutdown && ready_event.ready.is_empty() {
150 Poll::Pending
151 } else {
152 Poll::Ready(ready_event)
153 }
154 } else {
155 let tick = curr_bit.get_by_mask(DRIVER_TICK) as u8;
156 Poll::Ready(ReadyEvent::new(tick, ready))
157 }
158 }
159
160 #[inline]
get_readiness(&self, interest: Interest) -> ReadyEvent161 pub(crate) fn get_readiness(&self, interest: Interest) -> ReadyEvent {
162 let status_bit = Bit::from_usize(self.status.load(Acquire));
163 let ready = Ready::from_usize(status_bit.get_by_mask(READINESS)).intersection(interest);
164 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
165 ReadyEvent::new(tick, ready)
166 }
167
readiness(&self, interest: Interest) -> io::Result<ReadyEvent>168 pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
169 let mut fut = self.readiness_fut(interest);
170 let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
171
172 poll_fn(|cx| Pin::new(&mut fut).poll(cx)).await
173 }
174
readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent>175 async fn readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent> {
176 Readiness::new(self, interest).await
177 }
178
shutdown(&self)179 pub(crate) fn shutdown(&self) {
180 self.wake0(Ready::ALL, true);
181 }
182
clear_readiness(&self, ready: ReadyEvent)183 pub(crate) fn clear_readiness(&self, ready: ReadyEvent) {
184 let mask_no_closed = ready.get_ready() - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
185 let _ = self.set_readiness(None, Tick::Clear(ready.get_tick()), |curr| {
186 curr - mask_no_closed
187 });
188 }
189
set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> io::Result<()>190 pub(crate) fn set_readiness(
191 &self,
192 token: Option<usize>,
193 tick: Tick,
194 f: impl Fn(Ready) -> Ready,
195 ) -> io::Result<()> {
196 let mut current = self.status.load(Acquire);
197 loop {
198 let current_bit = Bit::from_usize(current);
199 let current_generation = current_bit.get_by_mask(GENERATION);
200
201 // if token's generation is different from ScheduleIO's generation,
202 // this token is already expired.
203 if let Some(token) = token {
204 if Bit::from_usize(token).get_by_mask(GENERATION) != current_generation {
205 return Err(io::Error::new(
206 io::ErrorKind::Other,
207 "Token no longer valid.",
208 ));
209 }
210 }
211
212 let current_readiness = Ready::from_usize(current_bit.get_by_mask(READINESS));
213 let new_readiness = f(current_readiness);
214 let mut new_bit = Bit::from_usize(new_readiness.as_usize());
215
216 Self::handle_tick(&tick, &mut new_bit, ¤t_bit)?;
217 new_bit.set_by_mask(GENERATION, current_generation);
218 match self
219 .status
220 .compare_exchange(current, new_bit.as_usize(), AcqRel, Acquire)
221 {
222 Ok(_) => return Ok(()),
223 // status has been changed already, so we repeats the loop
224 Err(actual) => current = actual,
225 }
226 }
227 }
228
handle_tick(tick: &Tick, new_bit: &mut Bit, current_bit: &Bit) -> io::Result<()>229 pub(crate) fn handle_tick(tick: &Tick, new_bit: &mut Bit, current_bit: &Bit) -> io::Result<()> {
230 match tick {
231 Tick::Set(t) => new_bit.set_by_mask(DRIVER_TICK, *t as usize),
232 // Check the tick to see if the event has already been covered.
233 // If yes, clear the event.
234 Tick::Clear(t) => {
235 if current_bit.get_by_mask(DRIVER_TICK) as u8 != *t {
236 return Err(io::Error::new(
237 io::ErrorKind::Other,
238 "Readiness has been covered.",
239 ));
240 }
241 new_bit.set_by_mask(DRIVER_TICK, *t as usize);
242 }
243 }
244 Ok(())
245 }
246
wake(&self, ready: Ready)247 pub(crate) fn wake(&self, ready: Ready) {
248 self.wake0(ready, false);
249 }
250
wake0(&self, ready: Ready, shutdown: bool)251 fn wake0(&self, ready: Ready, shutdown: bool) {
252 let mut wakers = Vec::new();
253 let mut waiters = self.waiters.lock().unwrap();
254 waiters.is_shutdown |= shutdown;
255
256 if ready.is_readable() {
257 if let Some(waker) = waiters.reader.take() {
258 wakers.push(waker);
259 }
260 }
261
262 if ready.is_writable() {
263 if let Some(waker) = waiters.writer.take() {
264 wakers.push(waker);
265 }
266 }
267
268 waiters.list.drain_filtered(|waiter| {
269 if ready.satisfies(waiter.interest) {
270 if let Some(waker) = waiter.waker.take() {
271 waiter.is_ready = true;
272 wakers.push(waker);
273 }
274 return true;
275 }
276 false
277 });
278
279 drop(waiters);
280 for waker in wakers.iter_mut() {
281 waker.wake_by_ref();
282 }
283 }
284 }
285
286 impl Drop for ScheduleIO {
drop(&mut self)287 fn drop(&mut self) {
288 self.wake(Ready::ALL);
289 }
290 }
291
292 unsafe impl Send for ScheduleIO {}
293 unsafe impl Sync for ScheduleIO {}
294
295 pub(crate) struct Readiness<'a> {
296 schedule_io: &'a ScheduleIO,
297
298 state: State,
299
300 waiter: UnsafeCell<Waiter>,
301 }
302
303 enum State {
304 Init,
305 Waiting,
306 Done,
307 }
308
309 impl Readiness<'_> {
new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_>310 pub(crate) fn new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_> {
311 Readiness {
312 schedule_io,
313 state: State::Init,
314 waiter: UnsafeCell::new(Waiter {
315 waker: None,
316 interest,
317 is_ready: false,
318 node: Node::new(),
319 _p: PhantomPinned,
320 }),
321 }
322 }
323 }
324
poll_init( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, interest: Interest, cx: &mut Context<'_>, ) -> Poll<io::Result<ReadyEvent>>325 fn poll_init(
326 schedule_io: &ScheduleIO,
327 state: &mut State,
328 waiter: &UnsafeCell<Waiter>,
329 interest: Interest,
330 cx: &mut Context<'_>,
331 ) -> Poll<io::Result<ReadyEvent>> {
332 let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
333 let readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
334 let ready = readiness.intersection(interest);
335
336 // if events are ready, change status to done
337 if !ready.is_empty() {
338 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
339 *state = State::Done;
340 return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
341 }
342
343 let mut waiters = schedule_io.waiters.lock().unwrap();
344
345 let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
346 let mut readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
347
348 if waiters.is_shutdown {
349 readiness = Ready::ALL;
350 }
351
352 let ready = readiness.intersection(interest);
353
354 // check one more time to see if events are ready
355 if !ready.is_empty() {
356 let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
357 *state = State::Done;
358 return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
359 }
360
361 unsafe {
362 (*waiter.get()).waker = Some(cx.waker().clone());
363
364 waiters
365 .list
366 .push_front(NonNull::new_unchecked(waiter.get()));
367 }
368
369 *state = State::Waiting;
370 Poll::Pending
371 }
372
373 // return true if pending
set_waker( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, cx: &mut Context<'_>, ) -> bool374 fn set_waker(
375 schedule_io: &ScheduleIO,
376 state: &mut State,
377 waiter: &UnsafeCell<Waiter>,
378 cx: &mut Context<'_>,
379 ) -> bool {
380 // waiters could also be accessed in other places, so get the lock
381 let waiters = schedule_io.waiters.lock().unwrap();
382
383 let waiter = unsafe { &mut *waiter.get() };
384 if waiter.is_ready {
385 *state = State::Done;
386 } else {
387 // We set a waker to this waiter in State::init,
388 // therefore waiter.waker must be a some at this point
389 if !waiter.waker.as_ref().unwrap().will_wake(cx.waker()) {
390 waiter.waker = Some(cx.waker().clone());
391 }
392 return true;
393 }
394 drop(waiters);
395 false
396 }
397
poll_state( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, interest: Interest, cx: &mut Context<'_>, ) -> Option<Poll<io::Result<ReadyEvent>>>398 fn poll_state(
399 schedule_io: &ScheduleIO,
400 state: &mut State,
401 waiter: &UnsafeCell<Waiter>,
402 interest: Interest,
403 cx: &mut Context<'_>,
404 ) -> Option<Poll<io::Result<ReadyEvent>>> {
405 match *state {
406 State::Init => {
407 if let Poll::Ready(res) = poll_init(schedule_io, state, waiter, interest, cx) {
408 return Some(Poll::Ready(res));
409 }
410 }
411 State::Waiting => {
412 if set_waker(schedule_io, state, waiter, cx) {
413 return Some(Poll::Pending);
414 }
415 }
416 State::Done => {
417 let status_bit = Bit::from_usize(schedule_io.status.load(Acquire));
418 return Some(Poll::Ready(Ok(ReadyEvent::new(
419 status_bit.get_by_mask(DRIVER_TICK) as u8,
420 Ready::from_interest(interest),
421 ))));
422 }
423 }
424 None
425 }
426
427 impl Future for Readiness<'_> {
428 type Output = io::Result<ReadyEvent>;
429
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>430 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
431 let (schedule_io, state, waiter) = unsafe {
432 let me = self.get_unchecked_mut();
433 (me.schedule_io, &mut me.state, &me.waiter)
434 };
435 // Safety: `waiter.interest` never changes after initialization.
436 let interest = unsafe { (*waiter.get()).interest };
437 loop {
438 if let Some(poll_res) = poll_state(schedule_io, state, waiter, interest, cx) {
439 return poll_res;
440 }
441 }
442 }
443 }
444
445 unsafe impl Sync for Readiness<'_> {}
446 unsafe impl Send for Readiness<'_> {}
447
448 impl Drop for Readiness<'_> {
drop(&mut self)449 fn drop(&mut self) {
450 let mut waiters = self.schedule_io.waiters.lock().unwrap();
451 // Safety: There is only one queue holding the node, and this is the only way
452 // for the node to dequeue.
453 unsafe {
454 waiters
455 .list
456 .remove(NonNull::new_unchecked(self.waiter.get()));
457 }
458 }
459 }
460
461 #[cfg(test)]
462 mod schedule_io_test {
463 use std::io;
464 use std::sync::atomic::Ordering::{Acquire, Release};
465
466 use crate::net::{Ready, ReadyEvent, ScheduleIO, Tick};
467 use crate::util::slab::Entry;
468
469 /// UT test cases for schedule_io defalut
470 ///
471 /// # Brief
472 /// 1. Call default
473 /// 2. Verify the returned results
474 #[test]
ut_schedule_io_default()475 fn ut_schedule_io_default() {
476 let mut schedule_io = ScheduleIO::default();
477 let status = schedule_io.status.load(Acquire);
478 assert_eq!(status, 0);
479 let is_shutdown = schedule_io.waiters.get_mut().unwrap().is_shutdown;
480 assert!(!is_shutdown);
481 }
482
483 /// UT test cases for schedule_io reset
484 ///
485 /// # Brief
486 /// 1. Create a ScheduleIO
487 /// 2. Call reset
488 /// 3. Verify the returned results
489 #[test]
ut_schedule_io_reset()490 fn ut_schedule_io_reset() {
491 let schedule_io = ScheduleIO::default();
492 let pre_status = schedule_io.status.load(Acquire);
493 assert_eq!(pre_status, 0x00);
494 schedule_io.reset();
495 let after_status = schedule_io.status.load(Acquire);
496 assert_eq!(after_status, 0x1000000);
497 }
498
499 /// UT test cases for schedule_io generation
500 ///
501 /// # Brief
502 /// 1. Create a ScheduleIO
503 /// 2. Call generation
504 /// 3. Verify the returned results
505 #[test]
ut_schedule_io_generation()506 fn ut_schedule_io_generation() {
507 let schedule_io = ScheduleIO::default();
508 schedule_io.status.store(0x7f000000, Release);
509 assert_eq!(schedule_io.generation(), 0x7f);
510 }
511
512 /// UT test cases for schedule_io shutdown
513 ///
514 /// # Brief
515 /// 1. Create a ScheduleIO
516 /// 2. Call shutdown
517 /// 3. Verify the returned results
518 #[test]
ut_schedule_io_shutdown()519 fn ut_schedule_io_shutdown() {
520 let mut schedule_io = ScheduleIO::default();
521 schedule_io.shutdown();
522 assert!(schedule_io.waiters.get_mut().unwrap().is_shutdown);
523 }
524
525 /// UT test cases for schedule_io clear_readiness
526 ///
527 /// # Brief
528 /// 1. Create a ScheduleIO
529 /// 2. Call clear_readiness
530 /// 3. Verify the returned results
531 #[test]
ut_schedule_io_clear_readiness()532 fn ut_schedule_io_clear_readiness() {
533 let schedule_io = ScheduleIO::default();
534 schedule_io.status.store(0x0000000f, Release);
535 schedule_io.clear_readiness(ReadyEvent::new(0, Ready::from_usize(0x1)));
536 let status = schedule_io.status.load(Acquire);
537 assert_eq!(status, 0x0000000e);
538 }
539
540 /// UT test cases for schedule_io set_readiness
541 ///
542 /// # Brief
543 /// 1. Create a ScheduleIO
544 /// 2. Call set_readiness
545 /// 3. Verify the returned results
546 #[test]
ut_schedule_io_set_readiness()547 fn ut_schedule_io_set_readiness() {
548 ut_schedule_io_set_readiness_01();
549 ut_schedule_io_set_readiness_02();
550 ut_schedule_io_set_readiness_03();
551
552 fn ut_schedule_io_set_readiness_01() {
553 let schedule_io = ScheduleIO::default();
554 let token = 0x7f000000usize;
555 let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
556 let err = ret.err().unwrap();
557 assert_eq!(err.kind(), io::ErrorKind::Other);
558 assert_eq!(
559 format! {"{}", err.into_inner().unwrap()},
560 "Token no longer valid."
561 );
562 }
563
564 fn ut_schedule_io_set_readiness_02() {
565 let schedule_io = ScheduleIO::default();
566 let token = 0x00000000usize;
567 let ret = schedule_io.set_readiness(Some(token), Tick::Clear(1), |curr| curr);
568 let err = ret.err().unwrap();
569 assert_eq!(err.kind(), io::ErrorKind::Other);
570 assert_eq!(
571 format! {"{}", err.into_inner().unwrap()},
572 "Readiness has been covered."
573 );
574 }
575
576 fn ut_schedule_io_set_readiness_03() {
577 let schedule_io = ScheduleIO::default();
578 let token = 0x00000000usize;
579 let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
580 assert!(ret.is_ok());
581 let status = schedule_io.status.load(Acquire);
582 assert_eq!(status, 0x00010000);
583 }
584 }
585 }
586