1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! An asynchronous version of [`std::sync::RwLock`]
15 
16 use std::cell::UnsafeCell;
17 use std::fmt;
18 use std::ops::{Deref, DerefMut};
19 use std::sync::atomic::AtomicI64;
20 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
21 
22 use crate::sync::semaphore_inner::SemaphoreInner;
23 use crate::sync::LockError;
24 
25 const MAX_READS: i64 = i64::MAX >> 2;
26 
27 /// An asynchronous version of [`std::sync::RwLock`].
28 ///
29 /// Rwlock allows multiple readers or a single writer to operate concurrently.
30 /// Readers are only allowed to read the data, but the writer is the only one
31 /// can change the data inside.
32 ///
33 /// This Rwlock's policy is writer first, to prevent writers from starving.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use ylong_runtime::sync::rwlock::RwLock;
39 ///
40 /// ylong_runtime::block_on(async {
41 ///     let lock = RwLock::new(0);
42 ///
43 ///     // Can have multiple read locks at the same time
44 ///     let r1 = lock.read().await;
45 ///     let r2 = lock.read().await;
46 ///     assert_eq!(*r1, 0);
47 ///     assert_eq!(*r2, 0);
48 ///     drop((r1, r2));
49 ///
50 ///     // Only one write lock at a time
51 ///     let mut w = lock.write().await;
52 ///     *w += 1;
53 ///     assert_eq!(*w, 1);
54 /// });
55 /// ```
56 pub struct RwLock<T: ?Sized> {
57     read_sem: SemaphoreInner,
58     write_sem: SemaphoreInner,
59     write_mutex: SemaphoreInner,
60     read_count: AtomicI64,
61     read_wait: AtomicI64,
62     data: UnsafeCell<T>,
63 }
64 
65 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
66 unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
67 
68 impl<T: Sized> RwLock<T> {
69     /// Creates a new RwLock. `T` is the data that needs to be protected
70     /// by this RwLock.
71     ///
72     /// # Examples
73     ///
74     /// ```
75     /// use ylong_runtime::sync::rwlock::RwLock;
76     ///
77     /// let lock = RwLock::new(0);
78     /// ```
new(t: T) -> RwLock<T>79     pub fn new(t: T) -> RwLock<T> {
80         RwLock {
81             // bounded by permit::max
82             read_sem: SemaphoreInner::new(0).unwrap(),
83             write_sem: SemaphoreInner::new(0).unwrap(),
84             write_mutex: SemaphoreInner::new(1).unwrap(),
85             read_count: AtomicI64::new(0),
86             read_wait: AtomicI64::new(0),
87             data: UnsafeCell::new(t),
88         }
89     }
90 }
91 
92 impl<T: ?Sized> RwLock<T> {
93     /// Asynchronously acquires the read lock.
94     ///
95     /// If there is a writer holding the write lock, then this method will wait
96     /// asynchronously for the write lock to get released.
97     ///
98     /// But if the write lock is not held, it's ok for multiple readers to hold
99     /// the read lock concurrently.
100     ///
101     ///
102     ///
103     /// # Examples
104     ///
105     /// ```
106     /// use ylong_runtime::sync::rwlock::RwLock;
107     ///
108     /// ylong_runtime::block_on(async {
109     ///     let lock = RwLock::new(0);
110     ///     let r1 = lock.read().await;
111     ///     assert_eq!(*r1, 0);
112     /// });
113     /// ```
read(&self) -> RwLockReadGuard<'_, T>114     pub async fn read(&self) -> RwLockReadGuard<'_, T> {
115         if self.read_count.fetch_add(1, Release) < 0 {
116             // The result of `acquire()` will be `Err()` only when the semaphore is closed.
117             // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`.
118             self.read_sem.acquire().await.unwrap();
119         }
120         RwLockReadGuard(self)
121     }
122 
123     /// Attempts to get the read lock. If another writer is holding the write
124     /// lock, then None will be returned. Otherwise, the ReadMutexGuard will
125     /// be returned.
126     ///
127     /// # Examples
128     ///
129     /// ```
130     /// use ylong_runtime::sync::rwlock::RwLock;
131     ///
132     /// let lock = RwLock::new(0);
133     /// let r1 = lock.try_read().unwrap();
134     /// assert_eq!(*r1, 0);
135     /// ```
try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError>136     pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError> {
137         let mut read_count = self.read_count.load(Acquire);
138         loop {
139             if read_count < 0 {
140                 return Err(LockError);
141             }
142             match self
143                 .read_count
144                 .compare_exchange(read_count, read_count + 1, AcqRel, Acquire)
145             {
146                 Ok(_) => return Ok(RwLockReadGuard(self)),
147                 Err(curr) => read_count = curr,
148             }
149         }
150     }
151 
152     /// Asynchronously acquires the write lock.
153     ///
154     /// If there is other readers or writers, then this method will wait
155     /// asynchronously for them to get released.
156     ///
157     /// # Examples
158     ///
159     /// ```
160     /// use ylong_runtime::sync::rwlock::RwLock;
161     ///
162     /// ylong_runtime::block_on(async {
163     ///     let lock = RwLock::new(0);
164     ///     let mut r1 = lock.write().await;
165     ///     *r1 += 1;
166     ///     assert_eq!(*r1, 1);
167     /// });
168     /// ```
write(&self) -> RwLockWriteGuard<'_, T>169     pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
170         // The result of `acquire()` will be `Err()` only when the semaphore is closed.
171         // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`.
172         self.write_mutex.acquire().await.unwrap();
173         let read_count = self.read_count.fetch_sub(MAX_READS, Release);
174         // If the `read_count` is not 0, it indicates that there is currently a reader
175         // holding a read lock. If the `read_wait` is 0 after addition, it
176         // indicates that all readers have been dropped.
177         if read_count >= 0 && self.read_wait.fetch_add(read_count, Release) != -read_count {
178             self.write_sem.acquire().await.unwrap();
179         }
180         RwLockWriteGuard(self)
181     }
182 
183     /// Attempts to acquire the write lock.
184     ///
185     /// If any other task holds the read/write lock, None will be returned.
186     ///
187     /// # Examples
188     ///
189     /// ```
190     /// use ylong_runtime::sync::rwlock::RwLock;
191     ///
192     /// let lock = RwLock::new(0);
193     /// let mut r1 = lock.try_write().unwrap();
194     /// *r1 += 1;
195     /// assert_eq!(*r1, 1);
196     /// ```
try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError>197     pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError> {
198         if self.write_mutex.try_acquire().is_err() {
199             return Err(LockError);
200         }
201         match self
202             .read_count
203             .compare_exchange(0, -MAX_READS, AcqRel, Acquire)
204         {
205             Ok(_) => Ok(RwLockWriteGuard(self)),
206             Err(_) => {
207                 self.write_mutex.release();
208                 Err(LockError)
209             }
210         }
211     }
212 
213     /// Consumes the lock, and returns the data protected by it.
214     ///
215     /// # Examples
216     ///
217     /// ```
218     /// use ylong_runtime::sync::rwlock::RwLock;
219     ///
220     /// let lock = RwLock::new(0);
221     /// assert_eq!(lock.into_inner(), 0);
222     /// ```
into_inner(self) -> T where T: Sized,223     pub fn into_inner(self) -> T
224     where
225         T: Sized,
226     {
227         self.data.into_inner()
228     }
229 
230     /// Gets the mutable reference of the data protected by the lock.
231     ///
232     /// This method takes the mutable reference of the RwLock, so there is no
233     /// need to actually lock the RwLock -- the mutable borrow statically
234     /// guarantees no locks exist.
235     /// ```
236     /// use ylong_runtime::sync::rwlock::RwLock;
237     ///
238     /// ylong_runtime::block_on(async {
239     ///     let mut lock = RwLock::new(0);
240     ///     *lock.get_mut() = 10;
241     ///     assert_eq!(*lock.write().await, 10);
242     /// });
243     /// ```
get_mut(&mut self) -> &mut T244     pub fn get_mut(&mut self) -> &mut T {
245         unsafe { &mut *self.data.get() }
246     }
247 }
248 
249 /// Read guard to access the data after holding the mutex.
250 pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock<T>);
251 
252 unsafe impl<T: ?Sized + Send> Send for RwLockReadGuard<'_, T> {}
253 unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
254 
255 /// Releases the read lock. Wakes any waiting writer if it's the last one
256 /// holding the read lock.
257 impl<T: ?Sized> RwLockReadGuard<'_, T> {
unlock(&mut self)258     fn unlock(&mut self) {
259         if self.0.read_count.fetch_sub(1, Release) < 0
260             && self.0.read_wait.fetch_sub(1, Release) == 1
261         {
262             self.0.write_sem.release();
263         }
264     }
265 }
266 
267 /// Unlock the read lock when ReadGuard is dropped.
268 impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
drop(&mut self)269     fn drop(&mut self) {
270         self.unlock();
271     }
272 }
273 
274 impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
275     type Target = T;
276 
deref(&self) -> &T277     fn deref(&self) -> &T {
278         unsafe { &*self.0.data.get() }
279     }
280 }
281 
282 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result283     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284         fmt::Debug::fmt(&**self, f)
285     }
286 }
287 
288 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result289     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290         fmt::Display::fmt(&**self, f)
291     }
292 }
293 
294 /// RwLock write guard
295 pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock<T>);
296 
297 unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {}
298 unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
299 
300 /// Wakes all waiting readers first and releases the write lock when WriteGuard
301 /// is dropped.
302 impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
drop(&mut self)303     fn drop(&mut self) {
304         let read_count = self.0.read_count.fetch_add(MAX_READS, Release) + MAX_READS;
305         self.0.read_sem.release_multi(read_count as usize);
306         self.0.write_mutex.release();
307     }
308 }
309 
310 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result311     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312         fmt::Debug::fmt(&**self, f)
313     }
314 }
315 
316 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result317     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318         fmt::Display::fmt(&**self, f)
319     }
320 }
321 
322 impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> {
323     type Target = T;
deref(&self) -> &Self::Target324     fn deref(&self) -> &Self::Target {
325         unsafe { &*self.0.data.get() }
326     }
327 }
328 
329 impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> {
deref_mut(&mut self) -> &mut T330     fn deref_mut(&mut self) -> &mut T {
331         unsafe { &mut *self.0.data.get() }
332     }
333 }
334 
335 #[cfg(test)]
336 mod tests {
337     use std::sync::Arc;
338 
339     use super::*;
340     use crate::{block_on, spawn};
341 
342     /// UT test cases for Rwlock::new()
343     ///
344     /// # Brief
345     /// 1. Create a concurrent read/write lock with structure and value as input
346     ///    parameters
347     /// 2. Verify the contents of the read/write lock
348     #[test]
ut_rwlock_new_01()349     fn ut_rwlock_new_01() {
350         pub struct Test {
351             flag: bool,
352             num: usize,
353         }
354         block_on(async {
355             let lock = RwLock::new(Test { flag: true, num: 1 });
356             assert!(lock.read().await.flag);
357             assert_eq!(lock.read().await.num, 1);
358             let lock2 = RwLock::new(0);
359             assert_eq!(*lock2.read().await, 0);
360         });
361     }
362 
363     /// UT test cases for Rwlock::read()
364     ///
365     /// # Brief
366     /// 1. Creating a concurrent read/write lock
367     /// 2. Calling the read() function
368     /// 3. Verify the value of the read() function dereference
369     #[test]
ut_rwlock_read_01()370     fn ut_rwlock_read_01() {
371         block_on(async {
372             let lock = RwLock::new(100);
373             let a = lock.read().await;
374             assert_eq!(*a, 100);
375         });
376     }
377 
378     /// UT test cases for Rwlock::read()
379     ///
380     /// # Brief
381     /// 1. Creating a concurrent read/write lock
382     /// 2. Call the write() function to make changes to the concurrent
383     ///    read/write lock data
384     /// 3. Call the read() function to verify the value in the read/write lock
385     ///    of the concurrent process
386     #[test]
ut_rwlock_read_02()387     fn ut_rwlock_read_02() {
388         let lock = Arc::new(RwLock::new(100));
389         let lock2 = lock.clone();
390 
391         block_on(spawn(async move {
392             let mut loopmun = lock2.write().await;
393             *loopmun += 1;
394         }))
395         .unwrap();
396         block_on(async {
397             let a = lock.read().await;
398             assert_eq!(*a, 101);
399         });
400     }
401 
402     /// UT test cases for Rwlock::try_read()
403     ///
404     /// # Brief
405     /// 1. Creating a concurrent read/write lock
406     /// 2. Call try_read()
407     /// 3. Verify the value of the return value dereference
408     #[test]
ut_rwlock_try_read_01()409     fn ut_rwlock_try_read_01() {
410         let lock = RwLock::new(100);
411         let res = lock.try_read().unwrap();
412         assert_eq!(*res, 100);
413     }
414 
415     /// UT test cases for Rwlock::try_read()
416     ///
417     /// # Brief
418     /// 1. Creating a concurrent read/write lock
419     /// 2. Create a thread to call the write method to hold the lock, and then
420     ///    sleep to hold the lock for a long time
421     /// 3. Call try_read() to try to get a lock
422     /// 4. Check the try_read return value
423     #[test]
ut_rwlock_try_read_02()424     fn ut_rwlock_try_read_02() {
425         let lock = Arc::new(RwLock::new(100));
426         let mut a = lock.try_write().unwrap();
427         *a += 1;
428         let res = lock.try_read();
429         assert!(res.is_err());
430         *a += 1;
431         drop(a);
432         let res2 = lock.try_read();
433         assert!(res2.is_ok());
434     }
435 
436     /// UT test cases for Rwlock::write()
437     ///
438     /// # Brief
439     /// 1. Creating a concurrent read/write lock
440     /// 2. Create a call to the write interface to modify the value inside the
441     ///    concurrent read/write lock
442     /// 3. Verify the value of the concurrent read/write lock
443     #[test]
ut_rwlock_write_01()444     fn ut_rwlock_write_01() {
445         let lock = Arc::new(RwLock::new(100));
446         block_on(async {
447             let mut a = lock.write().await;
448             *a += 100;
449             assert_eq!(*a, 200);
450         });
451     }
452 
453     /// UT test cases for Rwlock::write()
454     ///
455     /// # Brief
456     /// 1. Creating a concurrent read/write lock
457     /// 2. First create a thread to obtain a write lock, modify the data in the
458     ///    concurrent read/write lock, and then hibernate to ensure that the
459     ///    lock is held for a long time
460     /// 3. Create two co-processes one to get a read lock and one to get a write
461     ///    lock, so that there is both a reader and a writer requesting the lock
462     /// 4. Verify the value inside the concurrent read/write lock when the
463     ///    concurrent read/write lock is obtained
464     #[test]
ut_rwlock_write_test_02()465     fn ut_rwlock_write_test_02() {
466         let lock = Arc::new(RwLock::new(100));
467         let lock2 = lock.clone();
468         let lock3 = lock.clone();
469         let lock4 = lock;
470 
471         let handle = spawn(async move {
472             let mut aa = lock2.write().await;
473             *aa += 100;
474         });
475         let handle1 = spawn(async move {
476             let mut aa = lock4.write().await;
477             *aa += 100;
478         });
479         block_on(handle).unwrap();
480         block_on(handle1).unwrap();
481         let handle2 = spawn(async move {
482             let aa = lock3.read().await;
483             assert_eq!(*aa, 300);
484         });
485         block_on(handle2).unwrap();
486     }
487 
488     /// UT test cases for Rwlock::try_write()
489     ///
490     /// # Brief
491     /// 1. Creating a concurrent read/write lock
492     /// 2. Call try_write() to try to get a write lock and modify the value in
493     ///    it
494     /// 3. Verify the value in the read/write lock of the concurrent process
495     #[test]
ut_rwlock_try_write_01()496     fn ut_rwlock_try_write_01() {
497         let lock = RwLock::new(100);
498         let mut aa = lock.try_write().unwrap();
499         *aa += 100;
500         assert_eq!(*aa, 200);
501     }
502 
503     /// UT test cases for Rwlock::try_write()
504     ///
505     /// # Brief
506     /// 1. Creating a concurrent read/write lock
507     /// 2. Execute command cargo test ut_rwlock_try_write_02
508     #[test]
ut_rwlock_try_write_02()509     fn ut_rwlock_try_write_02() {
510         let lock = Arc::new(RwLock::new(100));
511         let mut a = lock.try_write().unwrap();
512         *a += 1;
513         let res = lock.try_write();
514         assert!(res.is_err());
515         *a += 1;
516         drop(a);
517         let res2 = lock.try_write();
518         assert!(res2.is_ok());
519     }
520 
521     /// UT test cases for Rwlock::into_inner()
522     ///
523     /// # Brief
524     /// 1. Add a temporary library path to the project directory export
525     ///    LD_LIBRARY_PATH=$(pwd)/platform
526     /// 2. Execute command cargo test ut_rwlock_into_inner_01
527     #[test]
ut_rwlock_into_inner_01()528     fn ut_rwlock_into_inner_01() {
529         let lock = RwLock::new(10);
530         assert_eq!(lock.into_inner(), 10);
531     }
532 }
533