1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::sync::Mutex; 16 17 cfg_ffrt!( 18 use ylong_ffrt::{ffrt_set_cpu_worker_max_num, ffrt_set_worker_stack_size, Qos}; 19 use std::collections::HashMap; 20 use libc::{c_uint, c_ulong}; 21 use std::time::Duration; 22 use crate::builder::ScheduleAlgo; 23 ); 24 25 #[cfg(not(feature = "ffrt"))] 26 use crate::builder::common_builder::impl_common; 27 use crate::builder::CommonBuilder; 28 #[cfg(feature = "multi_instance_runtime")] 29 use crate::executor::{AsyncHandle, Runtime}; 30 31 pub(crate) static GLOBAL_BUILDER: Mutex<Option<MultiThreadBuilder>> = Mutex::new(None); 32 33 /// Runtime builder that configures a multi-threaded runtime, or the global 34 /// runtime. 35 pub struct MultiThreadBuilder { 36 pub(crate) common: CommonBuilder, 37 38 #[cfg(not(feature = "ffrt"))] 39 /// Maximum thread number for core thread pool 40 pub(crate) core_thread_size: Option<usize>, 41 42 #[cfg(feature = "ffrt")] 43 /// Thread number for each qos 44 pub(crate) thread_num_by_qos: HashMap<Qos, u32>, 45 } 46 47 impl MultiThreadBuilder { new() -> Self48 pub(crate) fn new() -> Self { 49 MultiThreadBuilder { 50 common: CommonBuilder::new(), 51 #[cfg(not(feature = "ffrt"))] 52 core_thread_size: None, 53 #[cfg(feature = "ffrt")] 54 thread_num_by_qos: HashMap::new(), 55 } 56 } 57 58 /// Configures the global runtime. 59 /// 60 /// # Error 61 /// If the global runtime is already running or this method has been called 62 /// before, then it will return an `AlreadyExists` error. build_global(self) -> io::Result<()>63 pub fn build_global(self) -> io::Result<()> { 64 let mut builder = GLOBAL_BUILDER.lock().unwrap(); 65 if builder.is_some() { 66 return Err(io::ErrorKind::AlreadyExists.into()); 67 } 68 69 #[cfg(feature = "ffrt")] 70 unsafe { 71 for (qos, worker_num) in self.thread_num_by_qos.iter() { 72 ffrt_set_cpu_worker_max_num(*qos, *worker_num as c_uint); 73 } 74 75 for (qos, stack_size) in self.common.stack_size_by_qos.iter() { 76 ffrt_set_worker_stack_size(*qos, *stack_size as c_ulong); 77 } 78 } 79 80 *builder = Some(self); 81 Ok(()) 82 } 83 } 84 85 #[cfg(feature = "ffrt")] 86 impl MultiThreadBuilder { 87 /// Sets the maximum worker number for a specific qos group. 88 /// 89 /// If a worker number has already been set for a qos, calling the method 90 /// with the same qos will overwrite the old value. 91 /// 92 /// # Error 93 /// The accepted worker number range for each qos is [1, 20]. If 0 is passed 94 /// in, then the maximum worker number will be set to 1. If a number 95 /// greater than 20 is passed in, then the maximum worker number will be 96 /// set to 20. max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self97 pub fn max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self { 98 let worker = match num { 99 0 => 1, 100 n if n > 20 => 20, 101 n => n, 102 }; 103 self.thread_num_by_qos.insert(qos, worker); 104 self 105 } 106 107 /// Sets the name prefix for all worker threads. worker_name(mut self, name: String) -> Self108 pub fn worker_name(mut self, name: String) -> Self { 109 self.common.worker_name = Some(name); 110 self 111 } 112 113 /// Sets the number of core worker threads. 114 /// 115 /// 116 /// The boundary of thread number is 1-64: 117 /// If sets a number smaller than 1, then thread number would be set to 1. 118 /// If sets a number larger than 64, then thread number would be set to 64. 119 /// The default value is the number of cores of the cpu. 120 /// 121 /// # Examples 122 /// ``` 123 /// use crate::ylong_runtime::builder::RuntimeBuilder; 124 /// 125 /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8); 126 /// ``` worker_num(self, core_pool_size: usize) -> Self127 pub fn worker_num(self, core_pool_size: usize) -> Self { 128 self.max_worker_num_by_qos(Qos::Default, core_pool_size as u32) 129 } 130 131 /// Sets the core affinity of the worker threads 132 /// 133 /// # Note 134 /// This method does nothing now under ffrt feature. is_affinity(self, _is_affinity: bool) -> Self135 pub fn is_affinity(self, _is_affinity: bool) -> Self { 136 self 137 } 138 139 /// Sets the schedule policy. 140 /// 141 /// # Note 142 /// This method does nothing now under ffrt feature. schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self143 pub fn schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self { 144 self 145 } 146 147 /// Sets the callback function to be called when a worker thread starts. 148 /// 149 /// # Note 150 /// This method does nothing now under ffrt feature. after_start<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,151 pub fn after_start<F>(self, _f: F) -> Self 152 where 153 F: Fn() + Send + Sync + 'static, 154 { 155 self 156 } 157 158 /// Sets the callback function to be called when a worker thread stops. 159 /// 160 /// # Note 161 /// This method does nothing now under ffrt feature. before_stop<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,162 pub fn before_stop<F>(self, _f: F) -> Self 163 where 164 F: Fn() + Send + Sync + 'static, 165 { 166 self 167 } 168 169 /// Sets the maximum number of permanent threads in blocking thread pool 170 /// 171 /// # Note 172 /// This method does nothing now under ffrt feature. blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self173 pub fn blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self { 174 self 175 } 176 177 /// Sets the number of threads that the runtime could spawn additionally 178 /// besides the core thread pool. 179 /// 180 /// The boundary is 1-64. 181 /// 182 /// # Note 183 /// This method does nothing now under ffrt feature. max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self184 pub fn max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self { 185 self 186 } 187 188 /// Sets how long will the thread be kept alive inside the blocking pool 189 /// after it becomes idle. 190 /// 191 /// # Note 192 /// This method does nothing now under ffrt feature. keep_alive_time(self, _keep_alive_time: Duration) -> Self193 pub fn keep_alive_time(self, _keep_alive_time: Duration) -> Self { 194 self 195 } 196 197 /// Sets the thread stack size for a specific qos group. 198 /// 199 /// If a stack size has already been set for a qos, calling the method 200 /// with the same qos will overwrite the old value 201 /// 202 /// # Error 203 /// The lowest accepted stack size is 16k. If a value under 16k is passed 204 /// in, then the stack size will be set to 16k instead. stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self205 pub fn stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self { 206 const PTHREAD_STACK_MIN: usize = 16 * 1000; 207 208 let stack_size = match stack_size { 209 n if n < PTHREAD_STACK_MIN => PTHREAD_STACK_MIN, 210 n => n, 211 }; 212 self.common.stack_size_by_qos.insert(qos, stack_size); 213 self 214 } 215 216 /// Sets the stack size for every worker thread that gets spawned by the 217 /// runtime. The minimum stack size is 1. worker_stack_size(self, stack_size: usize) -> Self218 pub fn worker_stack_size(self, stack_size: usize) -> Self { 219 self.stack_size_by_qos(Qos::Default, stack_size) 220 } 221 } 222 223 #[cfg(not(feature = "ffrt"))] 224 impl MultiThreadBuilder { 225 /// Initializes the runtime and returns its instance. 226 #[cfg(feature = "multi_instance_runtime")] build(&mut self) -> io::Result<Runtime>227 pub fn build(&mut self) -> io::Result<Runtime> { 228 use crate::builder::initialize_async_spawner; 229 let async_spawner = initialize_async_spawner(self)?; 230 231 Ok(Runtime { 232 async_spawner: AsyncHandle::MultiThread(async_spawner), 233 }) 234 } 235 236 /// Sets the number of core worker threads. 237 /// 238 /// 239 /// The boundary of thread number is 1-64: 240 /// If sets a number smaller than 1, then thread number would be set to 1. 241 /// If sets a number larger than 64, then thread number would be set to 64. 242 /// The default value is the number of cores of the cpu. 243 /// 244 /// # Examples 245 /// ``` 246 /// use crate::ylong_runtime::builder::RuntimeBuilder; 247 /// 248 /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8); 249 /// ``` worker_num(mut self, core_pool_size: usize) -> Self250 pub fn worker_num(mut self, core_pool_size: usize) -> Self { 251 if core_pool_size < 1 { 252 self.core_thread_size = Some(1); 253 } else if core_pool_size > 64 { 254 self.core_thread_size = Some(64); 255 } else { 256 self.core_thread_size = Some(core_pool_size); 257 } 258 self 259 } 260 } 261 262 #[cfg(not(feature = "ffrt"))] 263 impl_common!(MultiThreadBuilder); 264 265 #[cfg(feature = "full")] 266 #[cfg(test)] 267 mod test { 268 use crate::builder::RuntimeBuilder; 269 use crate::executor::{global_default_async, AsyncHandle}; 270 271 /// UT test cases for blocking on a time sleep without initializing the 272 /// runtime. 273 /// 274 /// # Brief 275 /// 1. Configure the global runtime to make it have six core threads 276 /// 2. Get the global runtime 277 /// 3. Check the core thread number of the runtime 278 /// 4. Call build_global once more 279 /// 5. Check the error 280 #[test] ut_build_global()281 fn ut_build_global() { 282 let ret = RuntimeBuilder::new_multi_thread() 283 .worker_num(6) 284 .max_blocking_pool_size(3) 285 .build_global(); 286 assert!(ret.is_ok()); 287 288 let async_pool = global_default_async(); 289 match &async_pool.async_spawner { 290 AsyncHandle::CurrentThread(_) => unreachable!(), 291 AsyncHandle::MultiThread(x) => { 292 assert_eq!(x.inner.total, 6); 293 } 294 } 295 296 let ret = RuntimeBuilder::new_multi_thread() 297 .worker_num(2) 298 .max_blocking_pool_size(3) 299 .build_global(); 300 assert!(ret.is_err()); 301 } 302 } 303 304 #[cfg(feature = "ffrt")] 305 #[cfg(test)] 306 mod ffrt_test { 307 use ylong_ffrt::Qos::{Default, UserInitiated, UserInteractive}; 308 309 use crate::builder::MultiThreadBuilder; 310 311 /// UT test cases for max_worker_num_by_qos 312 /// 313 /// # Brief 314 /// 1. Sets UserInteractive qos group to have 0 maximum worker number. 315 /// 2. Checks if the actual value is 1 316 /// 3. Sets UserInteractive qos group to have 21 maximum worker number. 317 /// 4. Checks if the actual value is 20 318 /// 5. Set Default qos group to have 8 maximum worker number. 319 /// 6. Checks if the actual value is 8. 320 /// 7. Calls build_global on the builder, check if the return value is Ok 321 #[test] ut_set_max_worker()322 fn ut_set_max_worker() { 323 let builder = MultiThreadBuilder::new(); 324 let builder = builder.max_worker_num_by_qos(UserInteractive, 0); 325 let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap(); 326 assert_eq!(*num, 1); 327 328 let builder = builder.max_worker_num_by_qos(UserInteractive, 21); 329 let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap(); 330 assert_eq!(*num, 20); 331 332 let builder = MultiThreadBuilder::new().max_worker_num_by_qos(Default, 8); 333 let num = builder.thread_num_by_qos.get(&Default).unwrap(); 334 assert_eq!(*num, 8); 335 } 336 337 /// UT cases for stack_size_by_qos 338 /// 339 /// # Brief 340 /// 1. Sets UserInitiated qos group's stack size to 16k - 1 341 /// 2. Checks if the actual stack size is 16k 342 /// 3. Sets UserInteractive qos group's stack size to 16k 343 /// 4. Checks if the actual stack size is 16k 344 /// 5. Sets Default qos group's stack size to 16M 345 /// 6. Checks if the actual stack size is 16M 346 /// 7. Sets UserInteractive qos group's stack size to 16k + 1 347 /// 8. Checks if the actual stack size is 16k + 1 348 #[test] ut_set_stack_size()349 fn ut_set_stack_size() { 350 let builder = MultiThreadBuilder::new(); 351 let builder = builder.stack_size_by_qos(UserInitiated, 16 * 1000 - 1); 352 let num = builder 353 .common 354 .stack_size_by_qos 355 .get(&UserInitiated) 356 .unwrap(); 357 assert_eq!(*num, 16 * 1000); 358 359 let builder = MultiThreadBuilder::new(); 360 let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000); 361 let num = builder 362 .common 363 .stack_size_by_qos 364 .get(&UserInteractive) 365 .unwrap(); 366 assert_eq!(*num, 16 * 1000); 367 368 let builder = MultiThreadBuilder::new(); 369 let builder = builder.stack_size_by_qos(Default, 16 * 1000 * 1000); 370 let num = builder.common.stack_size_by_qos.get(&Default).unwrap(); 371 assert_eq!(*num, 16 * 1000 * 1000); 372 373 let builder = MultiThreadBuilder::new(); 374 let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000 + 1); 375 let num = builder 376 .common 377 .stack_size_by_qos 378 .get(&UserInteractive) 379 .unwrap(); 380 assert_eq!(*num, 16 * 1000 + 1); 381 } 382 } 383