1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Mutual exclusion locks 15 16 use std::cell::UnsafeCell; 17 use std::error::Error; 18 use std::fmt; 19 use std::fmt::{Display, Formatter}; 20 use std::ops::{Deref, DerefMut}; 21 22 use crate::sync::semaphore_inner::SemaphoreInner; 23 24 /// An async version of [`std::sync::Mutex`] 25 /// 26 /// Often it's considered as normal to use [`std::sync::Mutex`] on an 27 /// asynchronous environment. The primal purpose of this async mutex is to 28 /// protect shared reference of io, which contains a lot await point during 29 /// reading and writing. If you only wants to protect a data across 30 /// different threads, [`std::sync::Mutex`] will probably gain you better 31 /// performance. 32 /// 33 /// When using across different futures, users need to wrap the mutex inside an 34 /// Arc, just like the use of [`std::sync::Mutex`]. 35 pub struct Mutex<T: ?Sized> { 36 /// Semaphore to provide mutual exclusion 37 sem: SemaphoreInner, 38 /// The data protected by this mutex 39 data: UnsafeCell<T>, 40 } 41 42 unsafe impl<T: ?Sized + Send> Send for Mutex<T> {} 43 unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} 44 45 /// Error of Mutex 46 #[derive(Debug, Eq, PartialEq, Clone)] 47 pub struct LockError; 48 49 impl Display for LockError { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result50 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 51 write!(f, "Try lock error.") 52 } 53 } 54 55 impl Error for LockError {} 56 57 impl<T: Sized> Mutex<T> { 58 /// Creates a mutex protecting the data passed in. 59 /// 60 /// # Examples 61 /// 62 /// ``` 63 /// use ylong_runtime::sync::mutex::Mutex; 64 /// 65 /// let _a = Mutex::new(2); 66 /// ``` new(t: T) -> Mutex<T>67 pub fn new(t: T) -> Mutex<T> { 68 Mutex { 69 // bounded by permit::max 70 sem: SemaphoreInner::new(1).unwrap(), 71 data: UnsafeCell::new(t), 72 } 73 } 74 } 75 76 impl<T: ?Sized> Mutex<T> { 77 /// Locks the mutex. 78 /// 79 /// If the mutex is already held by others, asynchronously waits for it to 80 /// release. 81 /// 82 /// # Examples 83 /// 84 /// ``` 85 /// use std::sync::Arc; 86 /// 87 /// use ylong_runtime::sync::mutex::Mutex; 88 /// 89 /// let _res = ylong_runtime::block_on(async { 90 /// let lock = Arc::new(Mutex::new(2)); 91 /// let mut n = lock.lock().await; 92 /// *n += 1; 93 /// assert_eq!(*n, 3); 94 /// }); 95 /// ``` lock(&self) -> MutexGuard<'_, T>96 pub async fn lock(&self) -> MutexGuard<'_, T> { 97 // The result of `acquire()` will be `Err()` only when the semaphore is closed. 98 // `Mutex` will not close, so the result of `acquire()` must be `Ok(())`. 99 self.sem.acquire().await.unwrap(); 100 MutexGuard(self) 101 } 102 103 /// Attempts to get the mutex. 104 /// 105 /// If the lock is already held by others, LockError will be returned. 106 /// Otherwise, the mutex guard will be returned. 107 /// 108 /// # Examples 109 /// 110 /// ``` 111 /// use std::sync::Arc; 112 /// 113 /// use ylong_runtime::sync::mutex::Mutex; 114 /// 115 /// let _res = ylong_runtime::block_on(async { 116 /// let mutex = Arc::new(Mutex::new(0)); 117 /// match mutex.try_lock() { 118 /// Ok(lock) => println!("{}", lock), 119 /// Err(_) => {} 120 /// }; 121 /// }); 122 /// ``` try_lock(&self) -> Result<MutexGuard<'_, T>, LockError>123 pub fn try_lock(&self) -> Result<MutexGuard<'_, T>, LockError> { 124 match self.sem.try_acquire() { 125 Ok(_) => Ok(MutexGuard(self)), 126 Err(_) => Err(LockError), 127 } 128 } 129 130 /// Gets the mutable reference of the data protected by the lock without 131 /// actually holding the lock. 132 /// 133 /// This method takes the mutable reference of the mutex, so there is no 134 /// need to actually lock the mutex -- the mutable borrow statically 135 /// guarantees no locks exist. get_mut(&mut self) -> &mut T136 pub fn get_mut(&mut self) -> &mut T { 137 unsafe { &mut *self.data.get() } 138 } 139 } 140 141 /// Mutex guard to access the data after holding the mutex. 142 pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>); 143 144 impl<T: ?Sized> MutexGuard<'_, T> { 145 // Unlocks the mutex. Wakes the first future waiting for the mutex. unlock(&mut self)146 fn unlock(&mut self) { 147 self.0.sem.release(); 148 } 149 } 150 151 unsafe impl<T: ?Sized + Send + Sync> Sync for MutexGuard<'_, T> {} 152 153 /// The mutex will be released after the mutex guard is dropped. 154 impl<T: ?Sized> Drop for MutexGuard<'_, T> { drop(&mut self)155 fn drop(&mut self) { 156 self.unlock(); 157 } 158 } 159 160 impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result161 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 162 fmt::Debug::fmt(&**self, f) 163 } 164 } 165 166 impl<T: ?Sized + Display> Display for MutexGuard<'_, T> { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result167 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 168 Display::fmt(&**self, f) 169 } 170 } 171 172 impl<T: ?Sized> Deref for MutexGuard<'_, T> { 173 type Target = T; deref(&self) -> &Self::Target174 fn deref(&self) -> &Self::Target { 175 unsafe { &*self.0.data.get() } 176 } 177 } 178 179 impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { deref_mut(&mut self) -> &mut T180 fn deref_mut(&mut self) -> &mut T { 181 unsafe { &mut *self.0.data.get() } 182 } 183 } 184 185 #[cfg(test)] 186 mod tests { 187 use std::sync::atomic::{AtomicBool, Ordering}; 188 use std::sync::Arc; 189 use std::thread; 190 use std::time::Duration; 191 192 use super::*; 193 use crate::{block_on, spawn}; 194 195 /// UT test cases for Mutex::new() interface 196 /// 197 /// # Brief 198 /// 1. Creating a Concurrent Mutual Exclusion Lock 199 /// 2. Verify the state of the lock object and the size of the internal 200 /// value of the lock object 201 #[test] ut_mutex_new_01()202 fn ut_mutex_new_01() { 203 let lock = Mutex::new(10); 204 assert_eq!(lock.data.into_inner(), 10); 205 } 206 207 /// UT test cases for Mutex::lock() interface 208 /// 209 /// # Brief 210 /// 1. Creating a concurrent read/write lock 211 /// 2. Modification of data in the concurrent mutex lock 212 /// 3. Check the value in the changed concurrent mutex lock and check the 213 /// status bit of the lock 214 #[test] ut_mutex_lock_01()215 fn ut_mutex_lock_01() { 216 let mutex = Mutex::new(10); 217 block_on(async { 218 let mut lock = mutex.lock().await; 219 *lock += 1; 220 assert_eq!(*lock, 11); 221 }); 222 } 223 224 /// UT test cases for Mutex::try_lock() interface 225 /// 226 /// # Brief 227 /// 1. Creating a Concurrent Mutual Exclusion Lock 228 /// 2. Call try_lock() to try to get the lock 229 /// 3. Operation on in-lock values 230 /// 4. Calibrate in-lock values 231 #[test] ut_mutex_try_lock_01()232 fn ut_mutex_try_lock_01() { 233 let mutex = Mutex::new(10); 234 let mut lock = mutex.try_lock().unwrap(); 235 *lock += 100; 236 assert_eq!(*lock, 110); 237 } 238 239 /// UT test cases for Mutex::try_lock() interface 240 /// 241 /// # Brief 242 /// 1. Creating a Concurrent Mutual Exclusion Lock 243 /// 2. First build a concurrent process to hold the lock and sleep after 244 /// obtaining the lock to hold the lock for a long time 245 /// 3. Call try_lock() to try to get a lock 246 /// 4. Check try_lock return value is None 247 #[test] ut_mutex_try_lock_02()248 fn ut_mutex_try_lock_02() { 249 let mutex = Arc::new(Mutex::new(10)); 250 let mutex1 = mutex.clone(); 251 let flag = Arc::new(AtomicBool::new(true)); 252 let flag_clone = flag.clone(); 253 let lock_flag = Arc::new(AtomicBool::new(true)); 254 let lock_flag_clone = lock_flag.clone(); 255 256 spawn(async move { 257 let _lock = mutex1.lock().await; 258 lock_flag_clone.store(false, Ordering::SeqCst); 259 while flag_clone.load(Ordering::SeqCst) { 260 thread::sleep(Duration::from_millis(1)); 261 } 262 }); 263 while lock_flag.load(Ordering::SeqCst) { 264 thread::sleep(Duration::from_millis(1)); 265 } 266 let lock2 = mutex.try_lock(); 267 assert!(lock2.is_err()); 268 flag.store(false, Ordering::SeqCst); 269 } 270 271 /// UT test cases for Mutex::unlock() interface 272 /// 273 /// # Brief 274 /// 1. Creating a Concurrent Mutual Exclusion Lock 275 /// 2. Perform locking operation 276 /// 3. Call the drop() method to release the lock 277 /// 4. Verify the status of the concurrent mutex lock before, after and 278 /// after unlocking 279 #[test] ut_mutex_unlock_01()280 fn ut_mutex_unlock_01() { 281 let mutex = Mutex::new(10); 282 block_on(async move { 283 let lock = mutex.lock().await; 284 assert!(mutex.try_lock().is_err()); 285 drop(lock); 286 assert!(mutex.try_lock().is_ok()); 287 }); 288 } 289 } 290