1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! An asynchronous version of [`std::sync::RwLock`] 15 16 use std::cell::UnsafeCell; 17 use std::fmt; 18 use std::ops::{Deref, DerefMut}; 19 use std::sync::atomic::AtomicI64; 20 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 21 22 use crate::sync::semaphore_inner::SemaphoreInner; 23 use crate::sync::LockError; 24 25 const MAX_READS: i64 = i64::MAX >> 2; 26 27 /// An asynchronous version of [`std::sync::RwLock`]. 28 /// 29 /// Rwlock allows multiple readers or a single writer to operate concurrently. 30 /// Readers are only allowed to read the data, but the writer is the only one 31 /// can change the data inside. 32 /// 33 /// This Rwlock's policy is writer first, to prevent writers from starving. 34 /// 35 /// # Examples 36 /// 37 /// ``` 38 /// use ylong_runtime::sync::rwlock::RwLock; 39 /// 40 /// ylong_runtime::block_on(async { 41 /// let lock = RwLock::new(0); 42 /// 43 /// // Can have multiple read locks at the same time 44 /// let r1 = lock.read().await; 45 /// let r2 = lock.read().await; 46 /// assert_eq!(*r1, 0); 47 /// assert_eq!(*r2, 0); 48 /// drop((r1, r2)); 49 /// 50 /// // Only one write lock at a time 51 /// let mut w = lock.write().await; 52 /// *w += 1; 53 /// assert_eq!(*w, 1); 54 /// }); 55 /// ``` 56 pub struct RwLock<T: ?Sized> { 57 read_sem: SemaphoreInner, 58 write_sem: SemaphoreInner, 59 write_mutex: SemaphoreInner, 60 read_count: AtomicI64, 61 read_wait: AtomicI64, 62 data: UnsafeCell<T>, 63 } 64 65 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {} 66 unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {} 67 68 impl<T: Sized> RwLock<T> { 69 /// Creates a new RwLock. `T` is the data that needs to be protected 70 /// by this RwLock. 71 /// 72 /// # Examples 73 /// 74 /// ``` 75 /// use ylong_runtime::sync::rwlock::RwLock; 76 /// 77 /// let lock = RwLock::new(0); 78 /// ``` new(t: T) -> RwLock<T>79 pub fn new(t: T) -> RwLock<T> { 80 RwLock { 81 // bounded by permit::max 82 read_sem: SemaphoreInner::new(0).unwrap(), 83 write_sem: SemaphoreInner::new(0).unwrap(), 84 write_mutex: SemaphoreInner::new(1).unwrap(), 85 read_count: AtomicI64::new(0), 86 read_wait: AtomicI64::new(0), 87 data: UnsafeCell::new(t), 88 } 89 } 90 } 91 92 impl<T: ?Sized> RwLock<T> { 93 /// Asynchronously acquires the read lock. 94 /// 95 /// If there is a writer holding the write lock, then this method will wait 96 /// asynchronously for the write lock to get released. 97 /// 98 /// But if the write lock is not held, it's ok for multiple readers to hold 99 /// the read lock concurrently. 100 /// 101 /// 102 /// 103 /// # Examples 104 /// 105 /// ``` 106 /// use ylong_runtime::sync::rwlock::RwLock; 107 /// 108 /// ylong_runtime::block_on(async { 109 /// let lock = RwLock::new(0); 110 /// let r1 = lock.read().await; 111 /// assert_eq!(*r1, 0); 112 /// }); 113 /// ``` read(&self) -> RwLockReadGuard<'_, T>114 pub async fn read(&self) -> RwLockReadGuard<'_, T> { 115 if self.read_count.fetch_add(1, Release) < 0 { 116 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 117 // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`. 118 self.read_sem.acquire().await.unwrap(); 119 } 120 RwLockReadGuard(self) 121 } 122 123 /// Attempts to get the read lock. If another writer is holding the write 124 /// lock, then None will be returned. Otherwise, the ReadMutexGuard will 125 /// be returned. 126 /// 127 /// # Examples 128 /// 129 /// ``` 130 /// use ylong_runtime::sync::rwlock::RwLock; 131 /// 132 /// let lock = RwLock::new(0); 133 /// let r1 = lock.try_read().unwrap(); 134 /// assert_eq!(*r1, 0); 135 /// ``` try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError>136 pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, LockError> { 137 let mut read_count = self.read_count.load(Acquire); 138 loop { 139 if read_count < 0 { 140 return Err(LockError); 141 } 142 match self 143 .read_count 144 .compare_exchange(read_count, read_count + 1, AcqRel, Acquire) 145 { 146 Ok(_) => return Ok(RwLockReadGuard(self)), 147 Err(curr) => read_count = curr, 148 } 149 } 150 } 151 152 /// Asynchronously acquires the write lock. 153 /// 154 /// If there is other readers or writers, then this method will wait 155 /// asynchronously for them to get released. 156 /// 157 /// # Examples 158 /// 159 /// ``` 160 /// use ylong_runtime::sync::rwlock::RwLock; 161 /// 162 /// ylong_runtime::block_on(async { 163 /// let lock = RwLock::new(0); 164 /// let mut r1 = lock.write().await; 165 /// *r1 += 1; 166 /// assert_eq!(*r1, 1); 167 /// }); 168 /// ``` write(&self) -> RwLockWriteGuard<'_, T>169 pub async fn write(&self) -> RwLockWriteGuard<'_, T> { 170 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 171 // `RwLock` will not close, so the result of `acquire()` must be `Ok(())`. 172 self.write_mutex.acquire().await.unwrap(); 173 let read_count = self.read_count.fetch_sub(MAX_READS, Release); 174 // If the `read_count` is not 0, it indicates that there is currently a reader 175 // holding a read lock. If the `read_wait` is 0 after addition, it 176 // indicates that all readers have been dropped. 177 if read_count >= 0 && self.read_wait.fetch_add(read_count, Release) != -read_count { 178 self.write_sem.acquire().await.unwrap(); 179 } 180 RwLockWriteGuard(self) 181 } 182 183 /// Attempts to acquire the write lock. 184 /// 185 /// If any other task holds the read/write lock, None will be returned. 186 /// 187 /// # Examples 188 /// 189 /// ``` 190 /// use ylong_runtime::sync::rwlock::RwLock; 191 /// 192 /// let lock = RwLock::new(0); 193 /// let mut r1 = lock.try_write().unwrap(); 194 /// *r1 += 1; 195 /// assert_eq!(*r1, 1); 196 /// ``` try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError>197 pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, LockError> { 198 if self.write_mutex.try_acquire().is_err() { 199 return Err(LockError); 200 } 201 match self 202 .read_count 203 .compare_exchange(0, -MAX_READS, AcqRel, Acquire) 204 { 205 Ok(_) => Ok(RwLockWriteGuard(self)), 206 Err(_) => { 207 self.write_mutex.release(); 208 Err(LockError) 209 } 210 } 211 } 212 213 /// Consumes the lock, and returns the data protected by it. 214 /// 215 /// # Examples 216 /// 217 /// ``` 218 /// use ylong_runtime::sync::rwlock::RwLock; 219 /// 220 /// let lock = RwLock::new(0); 221 /// assert_eq!(lock.into_inner(), 0); 222 /// ``` into_inner(self) -> T where T: Sized,223 pub fn into_inner(self) -> T 224 where 225 T: Sized, 226 { 227 self.data.into_inner() 228 } 229 230 /// Gets the mutable reference of the data protected by the lock. 231 /// 232 /// This method takes the mutable reference of the RwLock, so there is no 233 /// need to actually lock the RwLock -- the mutable borrow statically 234 /// guarantees no locks exist. 235 /// ``` 236 /// use ylong_runtime::sync::rwlock::RwLock; 237 /// 238 /// ylong_runtime::block_on(async { 239 /// let mut lock = RwLock::new(0); 240 /// *lock.get_mut() = 10; 241 /// assert_eq!(*lock.write().await, 10); 242 /// }); 243 /// ``` get_mut(&mut self) -> &mut T244 pub fn get_mut(&mut self) -> &mut T { 245 unsafe { &mut *self.data.get() } 246 } 247 } 248 249 /// Read guard to access the data after holding the mutex. 250 pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock<T>); 251 252 unsafe impl<T: ?Sized + Send> Send for RwLockReadGuard<'_, T> {} 253 unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {} 254 255 /// Releases the read lock. Wakes any waiting writer if it's the last one 256 /// holding the read lock. 257 impl<T: ?Sized> RwLockReadGuard<'_, T> { unlock(&mut self)258 fn unlock(&mut self) { 259 if self.0.read_count.fetch_sub(1, Release) < 0 260 && self.0.read_wait.fetch_sub(1, Release) == 1 261 { 262 self.0.write_sem.release(); 263 } 264 } 265 } 266 267 /// Unlock the read lock when ReadGuard is dropped. 268 impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> { drop(&mut self)269 fn drop(&mut self) { 270 self.unlock(); 271 } 272 } 273 274 impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> { 275 type Target = T; 276 deref(&self) -> &T277 fn deref(&self) -> &T { 278 unsafe { &*self.0.data.get() } 279 } 280 } 281 282 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 284 fmt::Debug::fmt(&**self, f) 285 } 286 } 287 288 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 290 fmt::Display::fmt(&**self, f) 291 } 292 } 293 294 /// RwLock write guard 295 pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock<T>); 296 297 unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {} 298 unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {} 299 300 /// Wakes all waiting readers first and releases the write lock when WriteGuard 301 /// is dropped. 302 impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> { drop(&mut self)303 fn drop(&mut self) { 304 let read_count = self.0.read_count.fetch_add(MAX_READS, Release) + MAX_READS; 305 self.0.read_sem.release_multi(read_count as usize); 306 self.0.write_mutex.release(); 307 } 308 } 309 310 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 312 fmt::Debug::fmt(&**self, f) 313 } 314 } 315 316 impl<T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'_, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 318 fmt::Display::fmt(&**self, f) 319 } 320 } 321 322 impl<T: ?Sized> Deref for RwLockWriteGuard<'_, T> { 323 type Target = T; deref(&self) -> &Self::Target324 fn deref(&self) -> &Self::Target { 325 unsafe { &*self.0.data.get() } 326 } 327 } 328 329 impl<T: ?Sized> DerefMut for RwLockWriteGuard<'_, T> { deref_mut(&mut self) -> &mut T330 fn deref_mut(&mut self) -> &mut T { 331 unsafe { &mut *self.0.data.get() } 332 } 333 } 334 335 #[cfg(test)] 336 mod tests { 337 use std::sync::Arc; 338 339 use super::*; 340 use crate::{block_on, spawn}; 341 342 /// UT test cases for Rwlock::new() 343 /// 344 /// # Brief 345 /// 1. Create a concurrent read/write lock with structure and value as input 346 /// parameters 347 /// 2. Verify the contents of the read/write lock 348 #[test] ut_rwlock_new_01()349 fn ut_rwlock_new_01() { 350 pub struct Test { 351 flag: bool, 352 num: usize, 353 } 354 block_on(async { 355 let lock = RwLock::new(Test { flag: true, num: 1 }); 356 assert!(lock.read().await.flag); 357 assert_eq!(lock.read().await.num, 1); 358 let lock2 = RwLock::new(0); 359 assert_eq!(*lock2.read().await, 0); 360 }); 361 } 362 363 /// UT test cases for Rwlock::read() 364 /// 365 /// # Brief 366 /// 1. Creating a concurrent read/write lock 367 /// 2. Calling the read() function 368 /// 3. Verify the value of the read() function dereference 369 #[test] ut_rwlock_read_01()370 fn ut_rwlock_read_01() { 371 block_on(async { 372 let lock = RwLock::new(100); 373 let a = lock.read().await; 374 assert_eq!(*a, 100); 375 }); 376 } 377 378 /// UT test cases for Rwlock::read() 379 /// 380 /// # Brief 381 /// 1. Creating a concurrent read/write lock 382 /// 2. Call the write() function to make changes to the concurrent 383 /// read/write lock data 384 /// 3. Call the read() function to verify the value in the read/write lock 385 /// of the concurrent process 386 #[test] ut_rwlock_read_02()387 fn ut_rwlock_read_02() { 388 let lock = Arc::new(RwLock::new(100)); 389 let lock2 = lock.clone(); 390 391 block_on(spawn(async move { 392 let mut loopmun = lock2.write().await; 393 *loopmun += 1; 394 })) 395 .unwrap(); 396 block_on(async { 397 let a = lock.read().await; 398 assert_eq!(*a, 101); 399 }); 400 } 401 402 /// UT test cases for Rwlock::try_read() 403 /// 404 /// # Brief 405 /// 1. Creating a concurrent read/write lock 406 /// 2. Call try_read() 407 /// 3. Verify the value of the return value dereference 408 #[test] ut_rwlock_try_read_01()409 fn ut_rwlock_try_read_01() { 410 let lock = RwLock::new(100); 411 let res = lock.try_read().unwrap(); 412 assert_eq!(*res, 100); 413 } 414 415 /// UT test cases for Rwlock::try_read() 416 /// 417 /// # Brief 418 /// 1. Creating a concurrent read/write lock 419 /// 2. Create a thread to call the write method to hold the lock, and then 420 /// sleep to hold the lock for a long time 421 /// 3. Call try_read() to try to get a lock 422 /// 4. Check the try_read return value 423 #[test] ut_rwlock_try_read_02()424 fn ut_rwlock_try_read_02() { 425 let lock = Arc::new(RwLock::new(100)); 426 let mut a = lock.try_write().unwrap(); 427 *a += 1; 428 let res = lock.try_read(); 429 assert!(res.is_err()); 430 *a += 1; 431 drop(a); 432 let res2 = lock.try_read(); 433 assert!(res2.is_ok()); 434 } 435 436 /// UT test cases for Rwlock::write() 437 /// 438 /// # Brief 439 /// 1. Creating a concurrent read/write lock 440 /// 2. Create a call to the write interface to modify the value inside the 441 /// concurrent read/write lock 442 /// 3. Verify the value of the concurrent read/write lock 443 #[test] ut_rwlock_write_01()444 fn ut_rwlock_write_01() { 445 let lock = Arc::new(RwLock::new(100)); 446 block_on(async { 447 let mut a = lock.write().await; 448 *a += 100; 449 assert_eq!(*a, 200); 450 }); 451 } 452 453 /// UT test cases for Rwlock::write() 454 /// 455 /// # Brief 456 /// 1. Creating a concurrent read/write lock 457 /// 2. First create a thread to obtain a write lock, modify the data in the 458 /// concurrent read/write lock, and then hibernate to ensure that the 459 /// lock is held for a long time 460 /// 3. Create two co-processes one to get a read lock and one to get a write 461 /// lock, so that there is both a reader and a writer requesting the lock 462 /// 4. Verify the value inside the concurrent read/write lock when the 463 /// concurrent read/write lock is obtained 464 #[test] ut_rwlock_write_test_02()465 fn ut_rwlock_write_test_02() { 466 let lock = Arc::new(RwLock::new(100)); 467 let lock2 = lock.clone(); 468 let lock3 = lock.clone(); 469 let lock4 = lock; 470 471 let handle = spawn(async move { 472 let mut aa = lock2.write().await; 473 *aa += 100; 474 }); 475 let handle1 = spawn(async move { 476 let mut aa = lock4.write().await; 477 *aa += 100; 478 }); 479 block_on(handle).unwrap(); 480 block_on(handle1).unwrap(); 481 let handle2 = spawn(async move { 482 let aa = lock3.read().await; 483 assert_eq!(*aa, 300); 484 }); 485 block_on(handle2).unwrap(); 486 } 487 488 /// UT test cases for Rwlock::try_write() 489 /// 490 /// # Brief 491 /// 1. Creating a concurrent read/write lock 492 /// 2. Call try_write() to try to get a write lock and modify the value in 493 /// it 494 /// 3. Verify the value in the read/write lock of the concurrent process 495 #[test] ut_rwlock_try_write_01()496 fn ut_rwlock_try_write_01() { 497 let lock = RwLock::new(100); 498 let mut aa = lock.try_write().unwrap(); 499 *aa += 100; 500 assert_eq!(*aa, 200); 501 } 502 503 /// UT test cases for Rwlock::try_write() 504 /// 505 /// # Brief 506 /// 1. Creating a concurrent read/write lock 507 /// 2. Execute command cargo test ut_rwlock_try_write_02 508 #[test] ut_rwlock_try_write_02()509 fn ut_rwlock_try_write_02() { 510 let lock = Arc::new(RwLock::new(100)); 511 let mut a = lock.try_write().unwrap(); 512 *a += 1; 513 let res = lock.try_write(); 514 assert!(res.is_err()); 515 *a += 1; 516 drop(a); 517 let res2 = lock.try_write(); 518 assert!(res2.is_ok()); 519 } 520 521 /// UT test cases for Rwlock::into_inner() 522 /// 523 /// # Brief 524 /// 1. Add a temporary library path to the project directory export 525 /// LD_LIBRARY_PATH=$(pwd)/platform 526 /// 2. Execute command cargo test ut_rwlock_into_inner_01 527 #[test] ut_rwlock_into_inner_01()528 fn ut_rwlock_into_inner_01() { 529 let lock = RwLock::new(10); 530 assert_eq!(lock.into_inner(), 10); 531 } 532 } 533