1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::ops::Deref;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicU64;
18 use std::sync::{Arc, Mutex};
19 
20 use ylong_io::{Interest, Source, Token};
21 
22 use crate::net::{Ready, ScheduleIO, Tick};
23 use crate::util::bit::{Bit, Mask};
24 use crate::util::slab::{Address, Ref, Slab};
25 
26 cfg_ffrt! {
27     #[cfg(all(feature = "signal", target_os = "linux"))]
28     use crate::signal::unix::SignalDriver;
29     use libc::{c_void, c_int, c_uint, c_uchar};
30 }
31 
32 cfg_not_ffrt! {
33     use ylong_io::{Events, Poll};
34     use std::time::Duration;
35 
36     const EVENTS_MAX_CAPACITY: usize = 1024;
37     const WAKE_TOKEN: Token = Token(1 << 31);
38 }
39 
40 #[cfg(all(feature = "signal", target_family = "unix"))]
41 pub(crate) const SIGNAL_TOKEN: Token = Token((1 << 31) + 1);
42 const DRIVER_TICK_INIT: u8 = 0;
43 
44 // Token structure
45 // | reserved | generation | address |
46 // |----------|------------|---------|
47 // |   1 bit  |   7 bits   | 24 bits |
48 const GENERATION: Mask = Mask::new(7, 24);
49 const ADDRESS: Mask = Mask::new(24, 0);
50 
51 /// IO reactor that listens to fd events and wakes corresponding tasks.
52 pub(crate) struct IoDriver {
53     /// Stores every IO source that is ready
54     resources: Option<Slab<ScheduleIO>>,
55 
56     /// Counter used for slab struct to compact
57     tick: u8,
58 
59     /// Used for epoll
60     #[cfg(not(feature = "ffrt"))]
61     poll: Arc<Poll>,
62 
63     /// Stores IO events that need to be handled
64     #[cfg(not(feature = "ffrt"))]
65     events: Option<Events>,
66 
67     /// Indicate if there is a signal coming
68     #[cfg(all(not(feature = "ffrt"), feature = "signal", target_family = "unix"))]
69     signal_pending: bool,
70 
71     /// Save Handle used in metrics.
72     #[cfg(feature = "metrics")]
73     io_handle_inner: Arc<Inner>,
74 }
75 
76 pub(crate) struct IoHandle {
77     inner: Arc<Inner>,
78     #[cfg(not(feature = "ffrt"))]
79     pub(crate) waker: ylong_io::Waker,
80 }
81 
82 cfg_ffrt!(
83     use std::mem::MaybeUninit;
84     static mut DRIVER: MaybeUninit<IoDriver> = MaybeUninit::uninit();
85     static mut HANDLE: MaybeUninit<IoHandle> = MaybeUninit::uninit();
86 );
87 
88 #[cfg(feature = "ffrt")]
89 impl IoHandle {
new(inner: Arc<Inner>) -> Self90     fn new(inner: Arc<Inner>) -> Self {
91         IoHandle { inner }
92     }
93 
get_ref() -> &'static Self94     pub(crate) fn get_ref() -> &'static Self {
95         IoDriver::initialize();
96         unsafe { &*HANDLE.as_ptr() }
97     }
98 }
99 
100 #[cfg(not(feature = "ffrt"))]
101 impl IoHandle {
new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self102     fn new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self {
103         IoHandle { inner, waker }
104     }
105 
106     #[cfg(feature = "metrics")]
get_registered_count(&self) -> u64107     pub(crate) fn get_registered_count(&self) -> u64 {
108         self.inner
109             .metrics
110             .registered_count
111             .load(std::sync::atomic::Ordering::Acquire)
112     }
113 
114     #[cfg(feature = "metrics")]
get_ready_count(&self) -> u64115     pub(crate) fn get_ready_count(&self) -> u64 {
116         self.inner
117             .metrics
118             .ready_count
119             .load(std::sync::atomic::Ordering::Acquire)
120     }
121 }
122 
123 impl Deref for IoHandle {
124     type Target = Arc<Inner>;
125 
deref(&self) -> &Self::Target126     fn deref(&self) -> &Self::Target {
127         &self.inner
128     }
129 }
130 
131 /// In charge of two functionalities
132 ///
133 /// 1)IO registration
134 /// 2)Resource management
135 pub(crate) struct Inner {
136     /// When the driver gets dropped, the resources in the driver will be
137     /// transmitted to here. Then all the slabs inside will get dropped when
138     /// Inner's ref count clears to zero, so there is no concurrent problem
139     /// when new slabs gets inserted
140     resources: Mutex<Option<Slab<ScheduleIO>>>,
141 
142     /// Used to register scheduleIO into the slab
143     allocator: Slab<ScheduleIO>,
144 
145     /// Used to register fd
146     #[cfg(not(feature = "ffrt"))]
147     registry: Arc<Poll>,
148 
149     /// Metrics
150     #[cfg(feature = "metrics")]
151     metrics: InnerMetrics,
152 }
153 
154 /// Metrics of Inner
155 #[cfg(feature = "metrics")]
156 struct InnerMetrics {
157     /// Fd registered count. This value will only increment, not decrease.
158     registered_count: AtomicU64,
159 
160     /// Ready events count. This value will only increment, not decrease.
161     ready_count: AtomicU64,
162 }
163 
164 impl IoDriver {
165     /// IO dispatch function. Wakes the task through the token getting from the
166     /// epoll events.
dispatch(&mut self, token: Token, ready: Ready)167     fn dispatch(&mut self, token: Token, ready: Ready) {
168         let addr_bit = Bit::from_usize(token.0);
169         let addr = addr_bit.get_by_mask(ADDRESS);
170 
171         // IoDriver at this point has been initialized, therefore resources must be some
172         let io = match self
173             .resources
174             .as_mut()
175             .unwrap()
176             .get(Address::from_usize(addr))
177         {
178             Some(io) => io,
179             None => return,
180         };
181 
182         if io
183             .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready)
184             .is_err()
185         {
186             return;
187         }
188 
189         // Wake the io task
190         io.wake(ready)
191     }
192 }
193 
194 #[cfg(not(feature = "ffrt"))]
195 impl IoDriver {
initialize() -> (IoHandle, IoDriver)196     pub(crate) fn initialize() -> (IoHandle, IoDriver) {
197         let poll =
198             Poll::new().unwrap_or_else(|e| panic!("IO poller initialize failed, error: {e}"));
199         let waker = ylong_io::Waker::new(&poll, WAKE_TOKEN)
200             .unwrap_or_else(|e| panic!("ylong_io waker construction failed, error: {e}"));
201         let arc_poll = Arc::new(poll);
202         let events = Events::with_capacity(EVENTS_MAX_CAPACITY);
203         let slab = Slab::new();
204         let allocator = slab.handle();
205         let inner = Arc::new(Inner {
206             resources: Mutex::new(None),
207             allocator,
208             registry: arc_poll.clone(),
209             #[cfg(feature = "metrics")]
210             metrics: InnerMetrics {
211                 registered_count: AtomicU64::new(0),
212                 ready_count: AtomicU64::new(0),
213             },
214         });
215 
216         let driver = IoDriver {
217             resources: Some(slab),
218             events: Some(events),
219             tick: DRIVER_TICK_INIT,
220             poll: arc_poll,
221             #[cfg(feature = "metrics")]
222             io_handle_inner: inner.clone(),
223             #[cfg(all(feature = "signal", target_family = "unix"))]
224             signal_pending: false,
225         };
226 
227         (IoHandle::new(inner, waker), driver)
228     }
229 
230     /// Runs the driver. This method will blocking wait for fd events to come in
231     /// and then wakes the corresponding tasks through the events.
232     ///
233     /// In linux environment, the driver uses epoll.
drive(&mut self, time_out: Option<Duration>) -> io::Result<bool>234     pub(crate) fn drive(&mut self, time_out: Option<Duration>) -> io::Result<bool> {
235         use ylong_io::EventTrait;
236 
237         // For every 255 ticks, cleans the redundant entries inside the slab
238         const COMPACT_INTERVAL: u8 = 255;
239 
240         self.tick = self.tick.wrapping_add(1);
241 
242         if self.tick == COMPACT_INTERVAL {
243             unsafe {
244                 // IoDriver at this point has been initialized, therefore resources must be some
245                 self.resources.as_mut().unwrap().compact();
246             }
247         }
248 
249         let mut events = match self.events.take() {
250             Some(ev) => ev,
251             None => {
252                 let err = io::Error::new(io::ErrorKind::Other, "driver event store missing.");
253                 return Err(err);
254             }
255         };
256         match self.poll.poll(&mut events, time_out) {
257             Ok(_) => {}
258             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
259             Err(err) => return Err(err),
260         }
261         let has_events = !events.is_empty();
262 
263         for event in events.iter() {
264             let token = event.token();
265             if token == WAKE_TOKEN {
266                 continue;
267             }
268             #[cfg(all(feature = "signal", target_family = "unix"))]
269             if token == SIGNAL_TOKEN {
270                 self.signal_pending = true;
271                 continue;
272             }
273             let ready = Ready::from_event(event);
274             self.dispatch(token, ready);
275         }
276         #[cfg(feature = "metrics")]
277         self.io_handle_inner
278             .metrics
279             .ready_count
280             .fetch_add(events.len() as u64, std::sync::atomic::Ordering::AcqRel);
281 
282         self.events = Some(events);
283         Ok(has_events)
284     }
285 
286     #[cfg(all(feature = "signal", target_family = "unix"))]
process_signal(&mut self) -> bool287     pub(crate) fn process_signal(&mut self) -> bool {
288         let pending = self.signal_pending;
289         self.signal_pending = false;
290         pending
291     }
292 }
293 
294 #[cfg(feature = "ffrt")]
295 impl IoDriver {
initialize()296     fn initialize() {
297         static ONCE: std::sync::Once = std::sync::Once::new();
298         ONCE.call_once(|| unsafe {
299             let slab = Slab::new();
300             let allocator = slab.handle();
301             let inner = Arc::new(Inner {
302                 resources: Mutex::new(None),
303                 allocator,
304             });
305 
306             let driver = IoDriver {
307                 resources: Some(slab),
308                 tick: DRIVER_TICK_INIT,
309             };
310             HANDLE = MaybeUninit::new(IoHandle::new(inner));
311             DRIVER = MaybeUninit::new(driver);
312         });
313     }
314 
315     /// Initializes the single instance IO driver.
get_mut_ref() -> &'static mut IoDriver316     pub(crate) fn get_mut_ref() -> &'static mut IoDriver {
317         IoDriver::initialize();
318         unsafe { &mut *DRIVER.as_mut_ptr() }
319     }
320 }
321 
322 #[cfg(all(feature = "ffrt", feature = "signal", target_os = "linux"))]
ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _new_tick: c_uchar)323 extern "C" fn ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _new_tick: c_uchar) {
324     let token = Token::from_usize(data as usize);
325     if token == SIGNAL_TOKEN {
326         SignalDriver::get_mut_ref().broadcast();
327         #[cfg(feature = "process")]
328         crate::process::GlobalZombieChild::get_instance().release_zombie();
329     }
330 }
331 
332 #[cfg(feature = "ffrt")]
ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar)333 extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar) {
334     const COMPACT_INTERVAL: u8 = 255;
335 
336     let driver = IoDriver::get_mut_ref();
337 
338     if new_tick == COMPACT_INTERVAL && driver.tick != new_tick {
339         unsafe {
340             driver.resources.as_mut().unwrap().compact();
341         }
342     }
343     driver.tick = new_tick;
344 
345     let token = Token::from_usize(data as usize);
346     let ready = crate::net::ready::from_event_inner(ready as i32);
347     driver.dispatch(token, ready);
348 }
349 
350 impl Inner {
allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)>351     fn allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)> {
352         let (addr, schedule_io) = unsafe {
353             self.allocator.allocate().ok_or_else(|| {
354                 io::Error::new(
355                     io::ErrorKind::Other,
356                     "driver at max registered I/O resources.",
357                 )
358             })?
359         };
360         let mut base = Bit::from_usize(0);
361         base.set_by_mask(GENERATION, schedule_io.generation());
362         base.set_by_mask(ADDRESS, addr.as_usize());
363         Ok((schedule_io, base.as_usize()))
364     }
365 }
366 
367 #[cfg(not(feature = "ffrt"))]
368 impl Inner {
369     #[cfg(all(feature = "signal", target_family = "unix"))]
register_source_with_token( &self, io: &mut impl Source, token: Token, interest: Interest, ) -> io::Result<()>370     pub(crate) fn register_source_with_token(
371         &self,
372         io: &mut impl Source,
373         token: Token,
374         interest: Interest,
375     ) -> io::Result<()> {
376         self.registry.register(io, token, interest)
377     }
378 
379     /// Registers the fd of the `Source` object
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>380     pub(crate) fn register_source(
381         &self,
382         io: &mut impl Source,
383         interest: Interest,
384     ) -> io::Result<Ref<ScheduleIO>> {
385         // Allocates space for the slab. If reaches maximum capacity, error will be
386         // returned
387         let (schedule_io, token) = self.allocate_schedule_io_pair()?;
388 
389         self.registry
390             .register(io, Token::from_usize(token), interest)?;
391         #[cfg(feature = "metrics")]
392         self.metrics
393             .registered_count
394             .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
395         Ok(schedule_io)
396     }
397 
398     /// Deregisters the fd of the `Source` object.
deregister_source(&self, io: &mut impl Source) -> io::Result<()>399     pub(crate) fn deregister_source(&self, io: &mut impl Source) -> io::Result<()> {
400         self.registry.deregister(io)
401     }
402 }
403 
404 #[cfg(feature = "ffrt")]
405 impl Inner {
406     #[cfg(all(feature = "signal", target_os = "linux"))]
register_source_with_token( &self, io: &mut impl Source, token: Token, interest: Interest, )407     pub(crate) fn register_source_with_token(
408         &self,
409         io: &mut impl Source,
410         token: Token,
411         interest: Interest,
412     ) {
413         let event = interest.into_io_event();
414         unsafe {
415             ylong_ffrt::ffrt_poller_register(
416                 io.get_fd() as c_int,
417                 event,
418                 token.0 as *const c_void,
419                 ffrt_dispatch_signal_event,
420             );
421         }
422     }
423 
424     /// Registers the fd of the `Source` object
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>425     pub(crate) fn register_source(
426         &self,
427         io: &mut impl Source,
428         interest: Interest,
429     ) -> io::Result<Ref<ScheduleIO>> {
430         // Allocates space for the slab. If reaches maximum capacity, error will be
431         // returned
432         let (schedule_io, token) = self.allocate_schedule_io_pair()?;
433 
434         let event = interest.into_io_event();
435         unsafe {
436             ylong_ffrt::ffrt_poller_register(
437                 io.get_fd() as c_int,
438                 event,
439                 token as *const c_void,
440                 ffrt_dispatch_event,
441             );
442         }
443 
444         Ok(schedule_io)
445     }
446 }
447 
448 impl Drop for Inner {
drop(&mut self)449     fn drop(&mut self) {
450         let resources = self.resources.lock().unwrap().take();
451 
452         if let Some(mut slab) = resources {
453             slab.for_each(|io| {
454                 io.shutdown();
455             });
456         }
457     }
458 }
459