1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::io;
15 use std::ops::Deref;
16 #[cfg(feature = "metrics")]
17 use std::sync::atomic::AtomicU64;
18 use std::sync::{Arc, Mutex};
19
20 use ylong_io::{Interest, Source, Token};
21
22 use crate::net::{Ready, ScheduleIO, Tick};
23 use crate::util::bit::{Bit, Mask};
24 use crate::util::slab::{Address, Ref, Slab};
25
26 cfg_ffrt! {
27 #[cfg(all(feature = "signal", target_os = "linux"))]
28 use crate::signal::unix::SignalDriver;
29 use libc::{c_void, c_int, c_uint, c_uchar};
30 }
31
32 cfg_not_ffrt! {
33 use ylong_io::{Events, Poll};
34 use std::time::Duration;
35
36 const EVENTS_MAX_CAPACITY: usize = 1024;
37 const WAKE_TOKEN: Token = Token(1 << 31);
38 }
39
40 #[cfg(all(feature = "signal", target_family = "unix"))]
41 pub(crate) const SIGNAL_TOKEN: Token = Token((1 << 31) + 1);
42 const DRIVER_TICK_INIT: u8 = 0;
43
44 // Token structure
45 // | reserved | generation | address |
46 // |----------|------------|---------|
47 // | 1 bit | 7 bits | 24 bits |
48 const GENERATION: Mask = Mask::new(7, 24);
49 const ADDRESS: Mask = Mask::new(24, 0);
50
51 /// IO reactor that listens to fd events and wakes corresponding tasks.
52 pub(crate) struct IoDriver {
53 /// Stores every IO source that is ready
54 resources: Option<Slab<ScheduleIO>>,
55
56 /// Counter used for slab struct to compact
57 tick: u8,
58
59 /// Used for epoll
60 #[cfg(not(feature = "ffrt"))]
61 poll: Arc<Poll>,
62
63 /// Stores IO events that need to be handled
64 #[cfg(not(feature = "ffrt"))]
65 events: Option<Events>,
66
67 /// Indicate if there is a signal coming
68 #[cfg(all(not(feature = "ffrt"), feature = "signal", target_family = "unix"))]
69 signal_pending: bool,
70
71 /// Save Handle used in metrics.
72 #[cfg(feature = "metrics")]
73 io_handle_inner: Arc<Inner>,
74 }
75
76 pub(crate) struct IoHandle {
77 inner: Arc<Inner>,
78 #[cfg(not(feature = "ffrt"))]
79 pub(crate) waker: ylong_io::Waker,
80 }
81
82 cfg_ffrt!(
83 use std::mem::MaybeUninit;
84 static mut DRIVER: MaybeUninit<IoDriver> = MaybeUninit::uninit();
85 static mut HANDLE: MaybeUninit<IoHandle> = MaybeUninit::uninit();
86 );
87
88 #[cfg(feature = "ffrt")]
89 impl IoHandle {
new(inner: Arc<Inner>) -> Self90 fn new(inner: Arc<Inner>) -> Self {
91 IoHandle { inner }
92 }
93
get_ref() -> &'static Self94 pub(crate) fn get_ref() -> &'static Self {
95 IoDriver::initialize();
96 unsafe { &*HANDLE.as_ptr() }
97 }
98 }
99
100 #[cfg(not(feature = "ffrt"))]
101 impl IoHandle {
new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self102 fn new(inner: Arc<Inner>, waker: ylong_io::Waker) -> Self {
103 IoHandle { inner, waker }
104 }
105
106 #[cfg(feature = "metrics")]
get_registered_count(&self) -> u64107 pub(crate) fn get_registered_count(&self) -> u64 {
108 self.inner
109 .metrics
110 .registered_count
111 .load(std::sync::atomic::Ordering::Acquire)
112 }
113
114 #[cfg(feature = "metrics")]
get_ready_count(&self) -> u64115 pub(crate) fn get_ready_count(&self) -> u64 {
116 self.inner
117 .metrics
118 .ready_count
119 .load(std::sync::atomic::Ordering::Acquire)
120 }
121 }
122
123 impl Deref for IoHandle {
124 type Target = Arc<Inner>;
125
deref(&self) -> &Self::Target126 fn deref(&self) -> &Self::Target {
127 &self.inner
128 }
129 }
130
131 /// In charge of two functionalities
132 ///
133 /// 1)IO registration
134 /// 2)Resource management
135 pub(crate) struct Inner {
136 /// When the driver gets dropped, the resources in the driver will be
137 /// transmitted to here. Then all the slabs inside will get dropped when
138 /// Inner's ref count clears to zero, so there is no concurrent problem
139 /// when new slabs gets inserted
140 resources: Mutex<Option<Slab<ScheduleIO>>>,
141
142 /// Used to register scheduleIO into the slab
143 allocator: Slab<ScheduleIO>,
144
145 /// Used to register fd
146 #[cfg(not(feature = "ffrt"))]
147 registry: Arc<Poll>,
148
149 /// Metrics
150 #[cfg(feature = "metrics")]
151 metrics: InnerMetrics,
152 }
153
154 /// Metrics of Inner
155 #[cfg(feature = "metrics")]
156 struct InnerMetrics {
157 /// Fd registered count. This value will only increment, not decrease.
158 registered_count: AtomicU64,
159
160 /// Ready events count. This value will only increment, not decrease.
161 ready_count: AtomicU64,
162 }
163
164 impl IoDriver {
165 /// IO dispatch function. Wakes the task through the token getting from the
166 /// epoll events.
dispatch(&mut self, token: Token, ready: Ready)167 fn dispatch(&mut self, token: Token, ready: Ready) {
168 let addr_bit = Bit::from_usize(token.0);
169 let addr = addr_bit.get_by_mask(ADDRESS);
170
171 // IoDriver at this point has been initialized, therefore resources must be some
172 let io = match self
173 .resources
174 .as_mut()
175 .unwrap()
176 .get(Address::from_usize(addr))
177 {
178 Some(io) => io,
179 None => return,
180 };
181
182 if io
183 .set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready)
184 .is_err()
185 {
186 return;
187 }
188
189 // Wake the io task
190 io.wake(ready)
191 }
192 }
193
194 #[cfg(not(feature = "ffrt"))]
195 impl IoDriver {
initialize() -> (IoHandle, IoDriver)196 pub(crate) fn initialize() -> (IoHandle, IoDriver) {
197 let poll =
198 Poll::new().unwrap_or_else(|e| panic!("IO poller initialize failed, error: {e}"));
199 let waker = ylong_io::Waker::new(&poll, WAKE_TOKEN)
200 .unwrap_or_else(|e| panic!("ylong_io waker construction failed, error: {e}"));
201 let arc_poll = Arc::new(poll);
202 let events = Events::with_capacity(EVENTS_MAX_CAPACITY);
203 let slab = Slab::new();
204 let allocator = slab.handle();
205 let inner = Arc::new(Inner {
206 resources: Mutex::new(None),
207 allocator,
208 registry: arc_poll.clone(),
209 #[cfg(feature = "metrics")]
210 metrics: InnerMetrics {
211 registered_count: AtomicU64::new(0),
212 ready_count: AtomicU64::new(0),
213 },
214 });
215
216 let driver = IoDriver {
217 resources: Some(slab),
218 events: Some(events),
219 tick: DRIVER_TICK_INIT,
220 poll: arc_poll,
221 #[cfg(feature = "metrics")]
222 io_handle_inner: inner.clone(),
223 #[cfg(all(feature = "signal", target_family = "unix"))]
224 signal_pending: false,
225 };
226
227 (IoHandle::new(inner, waker), driver)
228 }
229
230 /// Runs the driver. This method will blocking wait for fd events to come in
231 /// and then wakes the corresponding tasks through the events.
232 ///
233 /// In linux environment, the driver uses epoll.
drive(&mut self, time_out: Option<Duration>) -> io::Result<bool>234 pub(crate) fn drive(&mut self, time_out: Option<Duration>) -> io::Result<bool> {
235 use ylong_io::EventTrait;
236
237 // For every 255 ticks, cleans the redundant entries inside the slab
238 const COMPACT_INTERVAL: u8 = 255;
239
240 self.tick = self.tick.wrapping_add(1);
241
242 if self.tick == COMPACT_INTERVAL {
243 unsafe {
244 // IoDriver at this point has been initialized, therefore resources must be some
245 self.resources.as_mut().unwrap().compact();
246 }
247 }
248
249 let mut events = match self.events.take() {
250 Some(ev) => ev,
251 None => {
252 let err = io::Error::new(io::ErrorKind::Other, "driver event store missing.");
253 return Err(err);
254 }
255 };
256 match self.poll.poll(&mut events, time_out) {
257 Ok(_) => {}
258 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
259 Err(err) => return Err(err),
260 }
261 let has_events = !events.is_empty();
262
263 for event in events.iter() {
264 let token = event.token();
265 if token == WAKE_TOKEN {
266 continue;
267 }
268 #[cfg(all(feature = "signal", target_family = "unix"))]
269 if token == SIGNAL_TOKEN {
270 self.signal_pending = true;
271 continue;
272 }
273 let ready = Ready::from_event(event);
274 self.dispatch(token, ready);
275 }
276 #[cfg(feature = "metrics")]
277 self.io_handle_inner
278 .metrics
279 .ready_count
280 .fetch_add(events.len() as u64, std::sync::atomic::Ordering::AcqRel);
281
282 self.events = Some(events);
283 Ok(has_events)
284 }
285
286 #[cfg(all(feature = "signal", target_family = "unix"))]
process_signal(&mut self) -> bool287 pub(crate) fn process_signal(&mut self) -> bool {
288 let pending = self.signal_pending;
289 self.signal_pending = false;
290 pending
291 }
292 }
293
294 #[cfg(feature = "ffrt")]
295 impl IoDriver {
initialize()296 fn initialize() {
297 static ONCE: std::sync::Once = std::sync::Once::new();
298 ONCE.call_once(|| unsafe {
299 let slab = Slab::new();
300 let allocator = slab.handle();
301 let inner = Arc::new(Inner {
302 resources: Mutex::new(None),
303 allocator,
304 });
305
306 let driver = IoDriver {
307 resources: Some(slab),
308 tick: DRIVER_TICK_INIT,
309 };
310 HANDLE = MaybeUninit::new(IoHandle::new(inner));
311 DRIVER = MaybeUninit::new(driver);
312 });
313 }
314
315 /// Initializes the single instance IO driver.
get_mut_ref() -> &'static mut IoDriver316 pub(crate) fn get_mut_ref() -> &'static mut IoDriver {
317 IoDriver::initialize();
318 unsafe { &mut *DRIVER.as_mut_ptr() }
319 }
320 }
321
322 #[cfg(all(feature = "ffrt", feature = "signal", target_os = "linux"))]
ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _new_tick: c_uchar)323 extern "C" fn ffrt_dispatch_signal_event(data: *const c_void, _ready: c_uint, _new_tick: c_uchar) {
324 let token = Token::from_usize(data as usize);
325 if token == SIGNAL_TOKEN {
326 SignalDriver::get_mut_ref().broadcast();
327 #[cfg(feature = "process")]
328 crate::process::GlobalZombieChild::get_instance().release_zombie();
329 }
330 }
331
332 #[cfg(feature = "ffrt")]
ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar)333 extern "C" fn ffrt_dispatch_event(data: *const c_void, ready: c_uint, new_tick: c_uchar) {
334 const COMPACT_INTERVAL: u8 = 255;
335
336 let driver = IoDriver::get_mut_ref();
337
338 if new_tick == COMPACT_INTERVAL && driver.tick != new_tick {
339 unsafe {
340 driver.resources.as_mut().unwrap().compact();
341 }
342 }
343 driver.tick = new_tick;
344
345 let token = Token::from_usize(data as usize);
346 let ready = crate::net::ready::from_event_inner(ready as i32);
347 driver.dispatch(token, ready);
348 }
349
350 impl Inner {
allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)>351 fn allocate_schedule_io_pair(&self) -> io::Result<(Ref<ScheduleIO>, usize)> {
352 let (addr, schedule_io) = unsafe {
353 self.allocator.allocate().ok_or_else(|| {
354 io::Error::new(
355 io::ErrorKind::Other,
356 "driver at max registered I/O resources.",
357 )
358 })?
359 };
360 let mut base = Bit::from_usize(0);
361 base.set_by_mask(GENERATION, schedule_io.generation());
362 base.set_by_mask(ADDRESS, addr.as_usize());
363 Ok((schedule_io, base.as_usize()))
364 }
365 }
366
367 #[cfg(not(feature = "ffrt"))]
368 impl Inner {
369 #[cfg(all(feature = "signal", target_family = "unix"))]
register_source_with_token( &self, io: &mut impl Source, token: Token, interest: Interest, ) -> io::Result<()>370 pub(crate) fn register_source_with_token(
371 &self,
372 io: &mut impl Source,
373 token: Token,
374 interest: Interest,
375 ) -> io::Result<()> {
376 self.registry.register(io, token, interest)
377 }
378
379 /// Registers the fd of the `Source` object
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>380 pub(crate) fn register_source(
381 &self,
382 io: &mut impl Source,
383 interest: Interest,
384 ) -> io::Result<Ref<ScheduleIO>> {
385 // Allocates space for the slab. If reaches maximum capacity, error will be
386 // returned
387 let (schedule_io, token) = self.allocate_schedule_io_pair()?;
388
389 self.registry
390 .register(io, Token::from_usize(token), interest)?;
391 #[cfg(feature = "metrics")]
392 self.metrics
393 .registered_count
394 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
395 Ok(schedule_io)
396 }
397
398 /// Deregisters the fd of the `Source` object.
deregister_source(&self, io: &mut impl Source) -> io::Result<()>399 pub(crate) fn deregister_source(&self, io: &mut impl Source) -> io::Result<()> {
400 self.registry.deregister(io)
401 }
402 }
403
404 #[cfg(feature = "ffrt")]
405 impl Inner {
406 #[cfg(all(feature = "signal", target_os = "linux"))]
register_source_with_token( &self, io: &mut impl Source, token: Token, interest: Interest, )407 pub(crate) fn register_source_with_token(
408 &self,
409 io: &mut impl Source,
410 token: Token,
411 interest: Interest,
412 ) {
413 let event = interest.into_io_event();
414 unsafe {
415 ylong_ffrt::ffrt_poller_register(
416 io.get_fd() as c_int,
417 event,
418 token.0 as *const c_void,
419 ffrt_dispatch_signal_event,
420 );
421 }
422 }
423
424 /// Registers the fd of the `Source` object
register_source( &self, io: &mut impl Source, interest: Interest, ) -> io::Result<Ref<ScheduleIO>>425 pub(crate) fn register_source(
426 &self,
427 io: &mut impl Source,
428 interest: Interest,
429 ) -> io::Result<Ref<ScheduleIO>> {
430 // Allocates space for the slab. If reaches maximum capacity, error will be
431 // returned
432 let (schedule_io, token) = self.allocate_schedule_io_pair()?;
433
434 let event = interest.into_io_event();
435 unsafe {
436 ylong_ffrt::ffrt_poller_register(
437 io.get_fd() as c_int,
438 event,
439 token as *const c_void,
440 ffrt_dispatch_event,
441 );
442 }
443
444 Ok(schedule_io)
445 }
446 }
447
448 impl Drop for Inner {
drop(&mut self)449 fn drop(&mut self) {
450 let resources = self.resources.lock().unwrap().take();
451
452 if let Some(mut slab) = resources {
453 slab.for_each(|io| {
454 io.shutdown();
455 });
456 }
457 }
458 }
459