1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::sync::Mutex;
16 
17 cfg_ffrt!(
18     use ylong_ffrt::{ffrt_set_cpu_worker_max_num, ffrt_set_worker_stack_size, Qos};
19     use std::collections::HashMap;
20     use libc::{c_uint, c_ulong};
21     use std::time::Duration;
22     use crate::builder::ScheduleAlgo;
23 );
24 
25 #[cfg(not(feature = "ffrt"))]
26 use crate::builder::common_builder::impl_common;
27 use crate::builder::CommonBuilder;
28 #[cfg(feature = "multi_instance_runtime")]
29 use crate::executor::{AsyncHandle, Runtime};
30 
31 pub(crate) static GLOBAL_BUILDER: Mutex<Option<MultiThreadBuilder>> = Mutex::new(None);
32 
33 /// Runtime builder that configures a multi-threaded runtime, or the global
34 /// runtime.
35 pub struct MultiThreadBuilder {
36     pub(crate) common: CommonBuilder,
37 
38     #[cfg(not(feature = "ffrt"))]
39     /// Maximum thread number for core thread pool
40     pub(crate) core_thread_size: Option<usize>,
41 
42     #[cfg(feature = "ffrt")]
43     /// Thread number for each qos
44     pub(crate) thread_num_by_qos: HashMap<Qos, u32>,
45 }
46 
47 impl MultiThreadBuilder {
new() -> Self48     pub(crate) fn new() -> Self {
49         MultiThreadBuilder {
50             common: CommonBuilder::new(),
51             #[cfg(not(feature = "ffrt"))]
52             core_thread_size: None,
53             #[cfg(feature = "ffrt")]
54             thread_num_by_qos: HashMap::new(),
55         }
56     }
57 
58     /// Configures the global runtime.
59     ///
60     /// # Error
61     /// If the global runtime is already running or this method has been called
62     /// before, then it will return an `AlreadyExists` error.
build_global(self) -> io::Result<()>63     pub fn build_global(self) -> io::Result<()> {
64         let mut builder = GLOBAL_BUILDER.lock().unwrap();
65         if builder.is_some() {
66             return Err(io::ErrorKind::AlreadyExists.into());
67         }
68 
69         #[cfg(feature = "ffrt")]
70         unsafe {
71             for (qos, worker_num) in self.thread_num_by_qos.iter() {
72                 ffrt_set_cpu_worker_max_num(*qos, *worker_num as c_uint);
73             }
74 
75             for (qos, stack_size) in self.common.stack_size_by_qos.iter() {
76                 ffrt_set_worker_stack_size(*qos, *stack_size as c_ulong);
77             }
78         }
79 
80         *builder = Some(self);
81         Ok(())
82     }
83 }
84 
85 #[cfg(feature = "ffrt")]
86 impl MultiThreadBuilder {
87     /// Sets the maximum worker number for a specific qos group.
88     ///
89     /// If a worker number has already been set for a qos, calling the method
90     /// with the same qos will overwrite the old value.
91     ///
92     /// # Error
93     /// The accepted worker number range for each qos is [1, 20]. If 0 is passed
94     /// in, then the maximum worker number will be set to 1. If a number
95     /// greater than 20 is passed in, then the maximum worker number will be
96     /// set to 20.
max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self97     pub fn max_worker_num_by_qos(mut self, qos: Qos, num: u32) -> Self {
98         let worker = match num {
99             0 => 1,
100             n if n > 20 => 20,
101             n => n,
102         };
103         self.thread_num_by_qos.insert(qos, worker);
104         self
105     }
106 
107     /// Sets the name prefix for all worker threads.
worker_name(mut self, name: String) -> Self108     pub fn worker_name(mut self, name: String) -> Self {
109         self.common.worker_name = Some(name);
110         self
111     }
112 
113     /// Sets the number of core worker threads.
114     ///
115     ///
116     /// The boundary of thread number is 1-64:
117     /// If sets a number smaller than 1, then thread number would be set to 1.
118     /// If sets a number larger than 64, then thread number would be set to 64.
119     /// The default value is the number of cores of the cpu.
120     ///
121     /// # Examples
122     /// ```
123     /// use crate::ylong_runtime::builder::RuntimeBuilder;
124     ///
125     /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8);
126     /// ```
worker_num(self, core_pool_size: usize) -> Self127     pub fn worker_num(self, core_pool_size: usize) -> Self {
128         self.max_worker_num_by_qos(Qos::Default, core_pool_size as u32)
129     }
130 
131     /// Sets the core affinity of the worker threads
132     ///
133     /// # Note
134     /// This method does nothing now under ffrt feature.
is_affinity(self, _is_affinity: bool) -> Self135     pub fn is_affinity(self, _is_affinity: bool) -> Self {
136         self
137     }
138 
139     /// Sets the schedule policy.
140     ///
141     /// # Note
142     /// This method does nothing now under ffrt feature.
schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self143     pub fn schedule_algo(self, _schedule_algo: ScheduleAlgo) -> Self {
144         self
145     }
146 
147     /// Sets the callback function to be called when a worker thread starts.
148     ///
149     /// # Note
150     /// This method does nothing now under ffrt feature.
after_start<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,151     pub fn after_start<F>(self, _f: F) -> Self
152     where
153         F: Fn() + Send + Sync + 'static,
154     {
155         self
156     }
157 
158     /// Sets the callback function to be called when a worker thread stops.
159     ///
160     /// # Note
161     /// This method does nothing now under ffrt feature.
before_stop<F>(self, _f: F) -> Self where F: Fn() + Send + Sync + 'static,162     pub fn before_stop<F>(self, _f: F) -> Self
163     where
164         F: Fn() + Send + Sync + 'static,
165     {
166         self
167     }
168 
169     /// Sets the maximum number of permanent threads in blocking thread pool
170     ///
171     /// # Note
172     /// This method does nothing now under ffrt feature.
blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self173     pub fn blocking_permanent_thread_num(self, _blocking_permanent_thread_num: u8) -> Self {
174         self
175     }
176 
177     /// Sets the number of threads that the runtime could spawn additionally
178     /// besides the core thread pool.
179     ///
180     /// The boundary is 1-64.
181     ///
182     /// # Note
183     /// This method does nothing now under ffrt feature.
max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self184     pub fn max_blocking_pool_size(self, _max_blocking_pool_size: u8) -> Self {
185         self
186     }
187 
188     /// Sets how long will the thread be kept alive inside the blocking pool
189     /// after it becomes idle.
190     ///
191     /// # Note
192     /// This method does nothing now under ffrt feature.
keep_alive_time(self, _keep_alive_time: Duration) -> Self193     pub fn keep_alive_time(self, _keep_alive_time: Duration) -> Self {
194         self
195     }
196 
197     /// Sets the thread stack size for a specific qos group.
198     ///
199     /// If a stack size has already been set for a qos, calling the method
200     /// with the same qos will overwrite the old value
201     ///
202     /// # Error
203     /// The lowest accepted stack size is 16k. If a value under 16k is passed
204     /// in, then the stack size will be set to 16k instead.
stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self205     pub fn stack_size_by_qos(mut self, qos: Qos, stack_size: usize) -> Self {
206         const PTHREAD_STACK_MIN: usize = 16 * 1000;
207 
208         let stack_size = match stack_size {
209             n if n < PTHREAD_STACK_MIN => PTHREAD_STACK_MIN,
210             n => n,
211         };
212         self.common.stack_size_by_qos.insert(qos, stack_size);
213         self
214     }
215 
216     /// Sets the stack size for every worker thread that gets spawned by the
217     /// runtime. The minimum stack size is 1.
worker_stack_size(self, stack_size: usize) -> Self218     pub fn worker_stack_size(self, stack_size: usize) -> Self {
219         self.stack_size_by_qos(Qos::Default, stack_size)
220     }
221 }
222 
223 #[cfg(not(feature = "ffrt"))]
224 impl MultiThreadBuilder {
225     /// Initializes the runtime and returns its instance.
226     #[cfg(feature = "multi_instance_runtime")]
build(&mut self) -> io::Result<Runtime>227     pub fn build(&mut self) -> io::Result<Runtime> {
228         use crate::builder::initialize_async_spawner;
229         let async_spawner = initialize_async_spawner(self)?;
230 
231         Ok(Runtime {
232             async_spawner: AsyncHandle::MultiThread(async_spawner),
233         })
234     }
235 
236     /// Sets the number of core worker threads.
237     ///
238     ///
239     /// The boundary of thread number is 1-64:
240     /// If sets a number smaller than 1, then thread number would be set to 1.
241     /// If sets a number larger than 64, then thread number would be set to 64.
242     /// The default value is the number of cores of the cpu.
243     ///
244     /// # Examples
245     /// ```
246     /// use crate::ylong_runtime::builder::RuntimeBuilder;
247     ///
248     /// let runtime = RuntimeBuilder::new_multi_thread().worker_num(8);
249     /// ```
worker_num(mut self, core_pool_size: usize) -> Self250     pub fn worker_num(mut self, core_pool_size: usize) -> Self {
251         if core_pool_size < 1 {
252             self.core_thread_size = Some(1);
253         } else if core_pool_size > 64 {
254             self.core_thread_size = Some(64);
255         } else {
256             self.core_thread_size = Some(core_pool_size);
257         }
258         self
259     }
260 }
261 
262 #[cfg(not(feature = "ffrt"))]
263 impl_common!(MultiThreadBuilder);
264 
265 #[cfg(feature = "full")]
266 #[cfg(test)]
267 mod test {
268     use crate::builder::RuntimeBuilder;
269     use crate::executor::{global_default_async, AsyncHandle};
270 
271     /// UT test cases for blocking on a time sleep without initializing the
272     /// runtime.
273     ///
274     /// # Brief
275     /// 1. Configure the global runtime to make it have six core threads
276     /// 2. Get the global runtime
277     /// 3. Check the core thread number of the runtime
278     /// 4. Call build_global once more
279     /// 5. Check the error
280     #[test]
ut_build_global()281     fn ut_build_global() {
282         let ret = RuntimeBuilder::new_multi_thread()
283             .worker_num(6)
284             .max_blocking_pool_size(3)
285             .build_global();
286         assert!(ret.is_ok());
287 
288         let async_pool = global_default_async();
289         match &async_pool.async_spawner {
290             AsyncHandle::CurrentThread(_) => unreachable!(),
291             AsyncHandle::MultiThread(x) => {
292                 assert_eq!(x.inner.total, 6);
293             }
294         }
295 
296         let ret = RuntimeBuilder::new_multi_thread()
297             .worker_num(2)
298             .max_blocking_pool_size(3)
299             .build_global();
300         assert!(ret.is_err());
301     }
302 }
303 
304 #[cfg(feature = "ffrt")]
305 #[cfg(test)]
306 mod ffrt_test {
307     use ylong_ffrt::Qos::{Default, UserInitiated, UserInteractive};
308 
309     use crate::builder::MultiThreadBuilder;
310 
311     /// UT test cases for max_worker_num_by_qos
312     ///
313     /// # Brief
314     /// 1. Sets UserInteractive qos group to have 0 maximum worker number.
315     /// 2. Checks if the actual value is 1
316     /// 3. Sets UserInteractive qos group to have 21 maximum worker number.
317     /// 4. Checks if the actual value is 20
318     /// 5. Set Default qos group to have 8 maximum worker number.
319     /// 6. Checks if the actual value is 8.
320     /// 7. Calls build_global on the builder, check if the return value is Ok
321     #[test]
ut_set_max_worker()322     fn ut_set_max_worker() {
323         let builder = MultiThreadBuilder::new();
324         let builder = builder.max_worker_num_by_qos(UserInteractive, 0);
325         let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap();
326         assert_eq!(*num, 1);
327 
328         let builder = builder.max_worker_num_by_qos(UserInteractive, 21);
329         let num = builder.thread_num_by_qos.get(&UserInteractive).unwrap();
330         assert_eq!(*num, 20);
331 
332         let builder = MultiThreadBuilder::new().max_worker_num_by_qos(Default, 8);
333         let num = builder.thread_num_by_qos.get(&Default).unwrap();
334         assert_eq!(*num, 8);
335     }
336 
337     /// UT cases for stack_size_by_qos
338     ///
339     /// # Brief
340     /// 1. Sets UserInitiated qos group's stack size to 16k - 1
341     /// 2. Checks if the actual stack size is 16k
342     /// 3. Sets UserInteractive qos group's stack size to 16k
343     /// 4. Checks if the actual stack size is 16k
344     /// 5. Sets Default qos group's stack size to 16M
345     /// 6. Checks if the actual stack size is 16M
346     /// 7. Sets UserInteractive qos group's stack size to 16k + 1
347     /// 8. Checks if the actual stack size is 16k + 1
348     #[test]
ut_set_stack_size()349     fn ut_set_stack_size() {
350         let builder = MultiThreadBuilder::new();
351         let builder = builder.stack_size_by_qos(UserInitiated, 16 * 1000 - 1);
352         let num = builder
353             .common
354             .stack_size_by_qos
355             .get(&UserInitiated)
356             .unwrap();
357         assert_eq!(*num, 16 * 1000);
358 
359         let builder = MultiThreadBuilder::new();
360         let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000);
361         let num = builder
362             .common
363             .stack_size_by_qos
364             .get(&UserInteractive)
365             .unwrap();
366         assert_eq!(*num, 16 * 1000);
367 
368         let builder = MultiThreadBuilder::new();
369         let builder = builder.stack_size_by_qos(Default, 16 * 1000 * 1000);
370         let num = builder.common.stack_size_by_qos.get(&Default).unwrap();
371         assert_eq!(*num, 16 * 1000 * 1000);
372 
373         let builder = MultiThreadBuilder::new();
374         let builder = builder.stack_size_by_qos(UserInteractive, 16 * 1000 + 1);
375         let num = builder
376             .common
377             .stack_size_by_qos
378             .get(&UserInteractive)
379             .unwrap();
380         assert_eq!(*num, 16 * 1000 + 1);
381     }
382 }
383