1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! The code inside a signal handler should be async-signal-safe, you can check
15 //! the definition here: <https://man7.org/linux/man-pages/man7/signal-safety.7.html.>
16 //! For short, a signal can be happened at anytime in a thread and the signal
17 //! handler will be executed on the same exact thread. Therefore, if the signal
18 //! handler function needs a resource that has been already acquired by the
19 //! thread (like a nonreentrant mutex), it could cause deadlock.
20 //!
21 //! In this crate, the signal handler needs to read the action of a signal from
22 //! a global singleton signal-manager. This signal-manager should be protected
23 //! by a lock to ensure atomicity. However, we could not use the regular
24 //! [`std::sync::RwLock`] because this lock is not async-signal-safe.
25 //!
26 //! Thus, we need to implement a spinning RwLock that provides non-block read
27 //! method for the signal handler to use.
28 
29 use std::hint;
30 use std::marker::PhantomData;
31 use std::ops::Deref;
32 use std::ptr::null_mut;
33 use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
34 use std::sync::{Mutex, MutexGuard};
35 
36 const VERSIONS: usize = 2;
37 const HOLDER_COUNT_MAX: usize = usize::MAX / 2;
38 
39 pub(crate) struct SpinningRwLock<T> {
40     version: AtomicUsize,
41     data: [AtomicPtr<T>; VERSIONS],
42     version_holder_count: [AtomicUsize; VERSIONS],
43     write_lock: Mutex<()>,
44     _phantom: PhantomData<T>,
45 }
46 
47 impl<T> SpinningRwLock<T> {
new(data: T) -> Self48     pub(crate) fn new(data: T) -> Self {
49         let val = Box::new(data);
50         let val_ptr = Box::into_raw(val);
51 
52         let datas = [AtomicPtr::new(val_ptr), Default::default()];
53 
54         SpinningRwLock {
55             data: datas,
56             version: Default::default(),
57             version_holder_count: Default::default(),
58             write_lock: Mutex::new(()),
59             _phantom: Default::default(),
60         }
61     }
62 
read(&self) -> ReadGuard<T>63     pub(crate) fn read(&self) -> ReadGuard<T> {
64         loop {
65             let version = self.version.load(Ordering::SeqCst) % VERSIONS;
66             let curr_count = &self.version_holder_count[version];
67 
68             if curr_count.fetch_add(1, Ordering::SeqCst) > HOLDER_COUNT_MAX {
69                 // read function is called inside a signal handler, so we cannot return an error
70                 // or panic directly, instead we use libc::abort
71                 unsafe { libc::abort() };
72             }
73 
74             // This data could already be nullptr in the following execution order
75             // 1. reader loads the current version
76             // 2. writer increments the version
77             // 3. writer sets old data to nullptr
78             // 4. writer blocking waits until old version counter is 0
79             // 5. reader increments the old version counter
80             // 6. reader acquires the old data using the old version
81             // In this case, reader should try again.
82             let data = self.data[version].load(Ordering::SeqCst);
83             if data.is_null() {
84                 curr_count.fetch_sub(1, Ordering::SeqCst);
85                 continue;
86             }
87             // this is safe because we just check the data is not nullptr, which means the
88             // writer has not yet released this data. The reader adds the holder
89             // count before acquire the data, the writer will not release the
90             // data until the all readers get dropped.
91             let data = unsafe { &*data };
92 
93             return ReadGuard {
94                 data,
95                 version_holder_count: curr_count,
96             };
97         }
98     }
99 
write(&self) -> WriteGuard<T>100     pub(crate) fn write(&self) -> WriteGuard<T> {
101         let guard = self.write_lock.lock().unwrap();
102         let version = self.version.load(Ordering::SeqCst);
103 
104         WriteGuard {
105             lock: self,
106             version,
107             _guard: guard,
108         }
109     }
110 
wait_version_release(&self, version: usize)111     pub(crate) fn wait_version_release(&self, version: usize) {
112         let count = &self.version_holder_count[version];
113         while count.load(Ordering::SeqCst) != 0 {
114             hint::spin_loop();
115         }
116     }
117 }
118 
119 pub(crate) struct ReadGuard<'a, T: 'a> {
120     pub(crate) data: &'a T,
121     version_holder_count: &'a AtomicUsize,
122 }
123 
124 impl<'a, T> Drop for ReadGuard<'a, T> {
drop(&mut self)125     fn drop(&mut self) {
126         self.version_holder_count.fetch_sub(1, Ordering::SeqCst);
127     }
128 }
129 
130 impl<'a, T> Deref for ReadGuard<'a, T> {
131     type Target = T;
132 
deref(&self) -> &Self::Target133     fn deref(&self) -> &Self::Target {
134         self.data
135     }
136 }
137 
138 pub(crate) struct WriteGuard<'a, T: 'a> {
139     lock: &'a SpinningRwLock<T>,
140     version: usize,
141     _guard: MutexGuard<'a, ()>,
142 }
143 
144 impl<'a, T> WriteGuard<'a, T> {
store(&mut self, val: T)145     pub(crate) fn store(&mut self, val: T) {
146         let val = Box::new(val);
147         let val_ptr = Box::into_raw(val);
148 
149         let old_version = self.version % VERSIONS;
150         let new_version = (old_version + 1) % VERSIONS;
151         self.lock.data[new_version].store(val_ptr, Ordering::SeqCst);
152         self.lock.version.store(new_version, Ordering::SeqCst);
153 
154         let old_data = self.lock.data[old_version].swap(null_mut(), Ordering::SeqCst);
155         self.lock.wait_version_release(old_version);
156         self.version = new_version;
157 
158         // the old data is valid and currently no one is holding it,
159         // therefore the drop is safe
160         unsafe {
161             drop(Box::from_raw(old_data));
162         }
163     }
164 }
165 
166 impl<'a, T> Deref for WriteGuard<'a, T> {
167     type Target = T;
168 
deref(&self) -> &Self::Target169     fn deref(&self) -> &Self::Target {
170         let data = self.lock.data[self.version].load(Ordering::SeqCst);
171         // the write guard always points to a valid data ptr
172         unsafe { &*data }
173     }
174 }
175