1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cell::UnsafeCell;
15 use std::future::Future;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::pin::Pin;
19 use std::ptr::{addr_of_mut, NonNull};
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst};
22 use std::sync::Mutex;
23 use std::task::{Context, Poll, Waker};
24 
25 use ylong_io::Interest;
26 
27 use crate::futures::poll_fn;
28 use crate::net::{Ready, ReadyEvent};
29 use crate::util::bit::{Bit, Mask};
30 use crate::util::linked_list::{Link, LinkedList, Node};
31 use crate::util::slab::Entry;
32 
33 const GENERATION: Mask = Mask::new(7, 24);
34 pub(crate) const DRIVER_TICK: Mask = Mask::new(8, 16);
35 pub(crate) const READINESS: Mask = Mask::new(16, 0);
36 
37 // ScheduleIO::status structure
38 //
39 // | reserved | generation | driver tick | readiness |
40 // |----------|------------|-------------|-----------|
41 // |  1 bit   |   7 bits   |   8 bits    |  16 bits  |
42 pub(crate) struct ScheduleIO {
43     /// ScheduleIO status
44     pub(crate) status: AtomicUsize,
45 
46     /// Wakers that wait for this IO
47     waiters: Mutex<Waiters>,
48 }
49 
50 #[derive(Default)]
51 pub(crate) struct Waiters {
52     list: LinkedList<Waiter>,
53 
54     // Reader & writer wakers are for AsyncRead/AsyncWriter
55     reader: Option<Waker>,
56 
57     writer: Option<Waker>,
58 
59     is_shutdown: bool,
60 }
61 
62 pub(crate) struct Waiter {
63     waker: Option<Waker>,
64 
65     interest: Interest,
66 
67     is_ready: bool,
68 
69     node: Node<Waiter>,
70 
71     _p: PhantomPinned,
72 }
73 
74 pub(crate) enum Tick {
75     Set(u8),
76     Clear(u8),
77 }
78 
79 impl Default for ScheduleIO {
default() -> Self80     fn default() -> Self {
81         ScheduleIO {
82             status: AtomicUsize::new(0),
83             waiters: Mutex::new(Default::default()),
84         }
85     }
86 }
87 
88 impl Default for Waiter {
default() -> Self89     fn default() -> Self {
90         Waiter {
91             waker: None,
92             interest: Interest::READABLE,
93             is_ready: false,
94             node: Node::new(),
95             _p: PhantomPinned,
96         }
97     }
98 }
99 
100 unsafe impl Link for Waiter {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,101     unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
102     where
103         Self: Sized,
104     {
105         let node_ptr = addr_of_mut!(ptr.as_mut().node);
106         NonNull::new_unchecked(node_ptr)
107     }
108 }
109 
110 impl Entry for ScheduleIO {
reset(&self)111     fn reset(&self) {
112         let status_bit = Bit::from_usize(self.status.load(Acquire));
113 
114         let generation = status_bit.get_by_mask(GENERATION);
115         let new_generation = generation.wrapping_add(1);
116         let mut next = Bit::from_usize(0);
117         next.set_by_mask(GENERATION, new_generation);
118         self.status.store(next.as_usize(), Release);
119     }
120 }
121 
122 impl ScheduleIO {
generation(&self) -> usize123     pub fn generation(&self) -> usize {
124         let base = Bit::from_usize(self.status.load(Acquire));
125         base.get_by_mask(GENERATION)
126     }
127 
128     #[cfg(feature = "net")]
poll_readiness( &self, cx: &mut Context<'_>, interest: Interest, ) -> Poll<ReadyEvent>129     pub(crate) fn poll_readiness(
130         &self,
131         cx: &mut Context<'_>,
132         interest: Interest,
133     ) -> Poll<ReadyEvent> {
134         // Get current status and check if it contains our interest
135         let curr_bit = Bit::from_usize(self.status.load(Acquire));
136         let ready = Ready::from_usize(curr_bit.get_by_mask(READINESS)).intersection(interest);
137 
138         if ready.is_empty() {
139             let mut waiters = self.waiters.lock().unwrap();
140             // Put the waker associated with the context into the waiters
141             match interest {
142                 Interest::WRITABLE => waiters.writer = Some(cx.waker().clone()),
143                 Interest::READABLE => waiters.reader = Some(cx.waker().clone()),
144                 _ => unreachable!(),
145             }
146 
147             // Check one more time to see if any event is ready
148             let ready_event = self.get_readiness(interest);
149             if !waiters.is_shutdown && ready_event.ready.is_empty() {
150                 Poll::Pending
151             } else {
152                 Poll::Ready(ready_event)
153             }
154         } else {
155             let tick = curr_bit.get_by_mask(DRIVER_TICK) as u8;
156             Poll::Ready(ReadyEvent::new(tick, ready))
157         }
158     }
159 
160     #[inline]
get_readiness(&self, interest: Interest) -> ReadyEvent161     pub(crate) fn get_readiness(&self, interest: Interest) -> ReadyEvent {
162         let status_bit = Bit::from_usize(self.status.load(Acquire));
163         let ready = Ready::from_usize(status_bit.get_by_mask(READINESS)).intersection(interest);
164         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
165         ReadyEvent::new(tick, ready)
166     }
167 
readiness(&self, interest: Interest) -> io::Result<ReadyEvent>168     pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
169         let mut fut = self.readiness_fut(interest);
170         let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
171 
172         poll_fn(|cx| Pin::new(&mut fut).poll(cx)).await
173     }
174 
readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent>175     async fn readiness_fut(&self, interest: Interest) -> io::Result<ReadyEvent> {
176         Readiness::new(self, interest).await
177     }
178 
shutdown(&self)179     pub(crate) fn shutdown(&self) {
180         self.wake0(Ready::ALL, true);
181     }
182 
clear_readiness(&self, ready: ReadyEvent)183     pub(crate) fn clear_readiness(&self, ready: ReadyEvent) {
184         let mask_no_closed = ready.get_ready() - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
185         let _ = self.set_readiness(None, Tick::Clear(ready.get_tick()), |curr| {
186             curr - mask_no_closed
187         });
188     }
189 
set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> io::Result<()>190     pub(crate) fn set_readiness(
191         &self,
192         token: Option<usize>,
193         tick: Tick,
194         f: impl Fn(Ready) -> Ready,
195     ) -> io::Result<()> {
196         let mut current = self.status.load(Acquire);
197         loop {
198             let current_bit = Bit::from_usize(current);
199             let current_generation = current_bit.get_by_mask(GENERATION);
200 
201             // if token's generation is different from ScheduleIO's generation,
202             // this token is already expired.
203             if let Some(token) = token {
204                 if Bit::from_usize(token).get_by_mask(GENERATION) != current_generation {
205                     return Err(io::Error::new(
206                         io::ErrorKind::Other,
207                         "Token no longer valid.",
208                     ));
209                 }
210             }
211 
212             let current_readiness = Ready::from_usize(current_bit.get_by_mask(READINESS));
213             let new_readiness = f(current_readiness);
214             let mut new_bit = Bit::from_usize(new_readiness.as_usize());
215 
216             Self::handle_tick(&tick, &mut new_bit, &current_bit)?;
217             new_bit.set_by_mask(GENERATION, current_generation);
218             match self
219                 .status
220                 .compare_exchange(current, new_bit.as_usize(), AcqRel, Acquire)
221             {
222                 Ok(_) => return Ok(()),
223                 // status has been changed already, so we repeats the loop
224                 Err(actual) => current = actual,
225             }
226         }
227     }
228 
handle_tick(tick: &Tick, new_bit: &mut Bit, current_bit: &Bit) -> io::Result<()>229     pub(crate) fn handle_tick(tick: &Tick, new_bit: &mut Bit, current_bit: &Bit) -> io::Result<()> {
230         match tick {
231             Tick::Set(t) => new_bit.set_by_mask(DRIVER_TICK, *t as usize),
232             // Check the tick to see if the event has already been covered.
233             // If yes, clear the event.
234             Tick::Clear(t) => {
235                 if current_bit.get_by_mask(DRIVER_TICK) as u8 != *t {
236                     return Err(io::Error::new(
237                         io::ErrorKind::Other,
238                         "Readiness has been covered.",
239                     ));
240                 }
241                 new_bit.set_by_mask(DRIVER_TICK, *t as usize);
242             }
243         }
244         Ok(())
245     }
246 
wake(&self, ready: Ready)247     pub(crate) fn wake(&self, ready: Ready) {
248         self.wake0(ready, false);
249     }
250 
wake0(&self, ready: Ready, shutdown: bool)251     fn wake0(&self, ready: Ready, shutdown: bool) {
252         let mut wakers = Vec::new();
253         let mut waiters = self.waiters.lock().unwrap();
254         waiters.is_shutdown |= shutdown;
255 
256         if ready.is_readable() {
257             if let Some(waker) = waiters.reader.take() {
258                 wakers.push(waker);
259             }
260         }
261 
262         if ready.is_writable() {
263             if let Some(waker) = waiters.writer.take() {
264                 wakers.push(waker);
265             }
266         }
267 
268         waiters.list.drain_filtered(|waiter| {
269             if ready.satisfies(waiter.interest) {
270                 if let Some(waker) = waiter.waker.take() {
271                     waiter.is_ready = true;
272                     wakers.push(waker);
273                 }
274                 return true;
275             }
276             false
277         });
278 
279         drop(waiters);
280         for waker in wakers.iter_mut() {
281             waker.wake_by_ref();
282         }
283     }
284 }
285 
286 impl Drop for ScheduleIO {
drop(&mut self)287     fn drop(&mut self) {
288         self.wake(Ready::ALL);
289     }
290 }
291 
292 unsafe impl Send for ScheduleIO {}
293 unsafe impl Sync for ScheduleIO {}
294 
295 pub(crate) struct Readiness<'a> {
296     schedule_io: &'a ScheduleIO,
297 
298     state: State,
299 
300     waiter: UnsafeCell<Waiter>,
301 }
302 
303 enum State {
304     Init,
305     Waiting,
306     Done,
307 }
308 
309 impl Readiness<'_> {
new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_>310     pub(crate) fn new(schedule_io: &ScheduleIO, interest: Interest) -> Readiness<'_> {
311         Readiness {
312             schedule_io,
313             state: State::Init,
314             waiter: UnsafeCell::new(Waiter {
315                 waker: None,
316                 interest,
317                 is_ready: false,
318                 node: Node::new(),
319                 _p: PhantomPinned,
320             }),
321         }
322     }
323 }
324 
poll_init( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, interest: Interest, cx: &mut Context<'_>, ) -> Poll<io::Result<ReadyEvent>>325 fn poll_init(
326     schedule_io: &ScheduleIO,
327     state: &mut State,
328     waiter: &UnsafeCell<Waiter>,
329     interest: Interest,
330     cx: &mut Context<'_>,
331 ) -> Poll<io::Result<ReadyEvent>> {
332     let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
333     let readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
334     let ready = readiness.intersection(interest);
335 
336     // if events are ready, change status to done
337     if !ready.is_empty() {
338         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
339         *state = State::Done;
340         return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
341     }
342 
343     let mut waiters = schedule_io.waiters.lock().unwrap();
344 
345     let status_bit = Bit::from_usize(schedule_io.status.load(SeqCst));
346     let mut readiness = Ready::from_usize(status_bit.get_by_mask(READINESS));
347 
348     if waiters.is_shutdown {
349         readiness = Ready::ALL;
350     }
351 
352     let ready = readiness.intersection(interest);
353 
354     // check one more time to see if events are ready
355     if !ready.is_empty() {
356         let tick = status_bit.get_by_mask(DRIVER_TICK) as u8;
357         *state = State::Done;
358         return Poll::Ready(Ok(ReadyEvent::new(tick, ready)));
359     }
360 
361     unsafe {
362         (*waiter.get()).waker = Some(cx.waker().clone());
363 
364         waiters
365             .list
366             .push_front(NonNull::new_unchecked(waiter.get()));
367     }
368 
369     *state = State::Waiting;
370     Poll::Pending
371 }
372 
373 // return true if pending
set_waker( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, cx: &mut Context<'_>, ) -> bool374 fn set_waker(
375     schedule_io: &ScheduleIO,
376     state: &mut State,
377     waiter: &UnsafeCell<Waiter>,
378     cx: &mut Context<'_>,
379 ) -> bool {
380     // waiters could also be accessed in other places, so get the lock
381     let waiters = schedule_io.waiters.lock().unwrap();
382 
383     let waiter = unsafe { &mut *waiter.get() };
384     if waiter.is_ready {
385         *state = State::Done;
386     } else {
387         // We set a waker to this waiter in State::init,
388         // therefore waiter.waker must be a some at this point
389         if !waiter.waker.as_ref().unwrap().will_wake(cx.waker()) {
390             waiter.waker = Some(cx.waker().clone());
391         }
392         return true;
393     }
394     drop(waiters);
395     false
396 }
397 
poll_state( schedule_io: &ScheduleIO, state: &mut State, waiter: &UnsafeCell<Waiter>, interest: Interest, cx: &mut Context<'_>, ) -> Option<Poll<io::Result<ReadyEvent>>>398 fn poll_state(
399     schedule_io: &ScheduleIO,
400     state: &mut State,
401     waiter: &UnsafeCell<Waiter>,
402     interest: Interest,
403     cx: &mut Context<'_>,
404 ) -> Option<Poll<io::Result<ReadyEvent>>> {
405     match *state {
406         State::Init => {
407             if let Poll::Ready(res) = poll_init(schedule_io, state, waiter, interest, cx) {
408                 return Some(Poll::Ready(res));
409             }
410         }
411         State::Waiting => {
412             if set_waker(schedule_io, state, waiter, cx) {
413                 return Some(Poll::Pending);
414             }
415         }
416         State::Done => {
417             let status_bit = Bit::from_usize(schedule_io.status.load(Acquire));
418             return Some(Poll::Ready(Ok(ReadyEvent::new(
419                 status_bit.get_by_mask(DRIVER_TICK) as u8,
420                 Ready::from_interest(interest),
421             ))));
422         }
423     }
424     None
425 }
426 
427 impl Future for Readiness<'_> {
428     type Output = io::Result<ReadyEvent>;
429 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>430     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
431         let (schedule_io, state, waiter) = unsafe {
432             let me = self.get_unchecked_mut();
433             (me.schedule_io, &mut me.state, &me.waiter)
434         };
435         // Safety: `waiter.interest` never changes after initialization.
436         let interest = unsafe { (*waiter.get()).interest };
437         loop {
438             if let Some(poll_res) = poll_state(schedule_io, state, waiter, interest, cx) {
439                 return poll_res;
440             }
441         }
442     }
443 }
444 
445 unsafe impl Sync for Readiness<'_> {}
446 unsafe impl Send for Readiness<'_> {}
447 
448 impl Drop for Readiness<'_> {
drop(&mut self)449     fn drop(&mut self) {
450         let mut waiters = self.schedule_io.waiters.lock().unwrap();
451         // Safety: There is only one queue holding the node, and this is the only way
452         // for the node to dequeue.
453         unsafe {
454             waiters
455                 .list
456                 .remove(NonNull::new_unchecked(self.waiter.get()));
457         }
458     }
459 }
460 
461 #[cfg(test)]
462 mod schedule_io_test {
463     use std::io;
464     use std::sync::atomic::Ordering::{Acquire, Release};
465 
466     use crate::net::{Ready, ReadyEvent, ScheduleIO, Tick};
467     use crate::util::slab::Entry;
468 
469     /// UT test cases for schedule_io defalut
470     ///
471     /// # Brief
472     /// 1. Call default
473     /// 2. Verify the returned results
474     #[test]
ut_schedule_io_default()475     fn ut_schedule_io_default() {
476         let mut schedule_io = ScheduleIO::default();
477         let status = schedule_io.status.load(Acquire);
478         assert_eq!(status, 0);
479         let is_shutdown = schedule_io.waiters.get_mut().unwrap().is_shutdown;
480         assert!(!is_shutdown);
481     }
482 
483     /// UT test cases for schedule_io reset
484     ///
485     /// # Brief
486     /// 1. Create a ScheduleIO
487     /// 2. Call reset
488     /// 3. Verify the returned results
489     #[test]
ut_schedule_io_reset()490     fn ut_schedule_io_reset() {
491         let schedule_io = ScheduleIO::default();
492         let pre_status = schedule_io.status.load(Acquire);
493         assert_eq!(pre_status, 0x00);
494         schedule_io.reset();
495         let after_status = schedule_io.status.load(Acquire);
496         assert_eq!(after_status, 0x1000000);
497     }
498 
499     /// UT test cases for schedule_io generation
500     ///
501     /// # Brief
502     /// 1. Create a ScheduleIO
503     /// 2. Call generation
504     /// 3. Verify the returned results
505     #[test]
ut_schedule_io_generation()506     fn ut_schedule_io_generation() {
507         let schedule_io = ScheduleIO::default();
508         schedule_io.status.store(0x7f000000, Release);
509         assert_eq!(schedule_io.generation(), 0x7f);
510     }
511 
512     /// UT test cases for schedule_io shutdown
513     ///
514     /// # Brief
515     /// 1. Create a ScheduleIO
516     /// 2. Call shutdown
517     /// 3. Verify the returned results
518     #[test]
ut_schedule_io_shutdown()519     fn ut_schedule_io_shutdown() {
520         let mut schedule_io = ScheduleIO::default();
521         schedule_io.shutdown();
522         assert!(schedule_io.waiters.get_mut().unwrap().is_shutdown);
523     }
524 
525     /// UT test cases for schedule_io clear_readiness
526     ///
527     /// # Brief
528     /// 1. Create a ScheduleIO
529     /// 2. Call clear_readiness
530     /// 3. Verify the returned results
531     #[test]
ut_schedule_io_clear_readiness()532     fn ut_schedule_io_clear_readiness() {
533         let schedule_io = ScheduleIO::default();
534         schedule_io.status.store(0x0000000f, Release);
535         schedule_io.clear_readiness(ReadyEvent::new(0, Ready::from_usize(0x1)));
536         let status = schedule_io.status.load(Acquire);
537         assert_eq!(status, 0x0000000e);
538     }
539 
540     /// UT test cases for schedule_io set_readiness
541     ///
542     /// # Brief
543     /// 1. Create a ScheduleIO
544     /// 2. Call set_readiness
545     /// 3. Verify the returned results
546     #[test]
ut_schedule_io_set_readiness()547     fn ut_schedule_io_set_readiness() {
548         ut_schedule_io_set_readiness_01();
549         ut_schedule_io_set_readiness_02();
550         ut_schedule_io_set_readiness_03();
551 
552         fn ut_schedule_io_set_readiness_01() {
553             let schedule_io = ScheduleIO::default();
554             let token = 0x7f000000usize;
555             let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
556             let err = ret.err().unwrap();
557             assert_eq!(err.kind(), io::ErrorKind::Other);
558             assert_eq!(
559                 format! {"{}", err.into_inner().unwrap()},
560                 "Token no longer valid."
561             );
562         }
563 
564         fn ut_schedule_io_set_readiness_02() {
565             let schedule_io = ScheduleIO::default();
566             let token = 0x00000000usize;
567             let ret = schedule_io.set_readiness(Some(token), Tick::Clear(1), |curr| curr);
568             let err = ret.err().unwrap();
569             assert_eq!(err.kind(), io::ErrorKind::Other);
570             assert_eq!(
571                 format! {"{}", err.into_inner().unwrap()},
572                 "Readiness has been covered."
573             );
574         }
575 
576         fn ut_schedule_io_set_readiness_03() {
577             let schedule_io = ScheduleIO::default();
578             let token = 0x00000000usize;
579             let ret = schedule_io.set_readiness(Some(token), Tick::Set(1), |curr| curr);
580             assert!(ret.is_ok());
581             let status = schedule_io.status.load(Acquire);
582             assert_eq!(status, 0x00010000);
583         }
584     }
585 }
586