1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #![cfg(all(
15     target_os = "linux",
16     not(feature = "ffrt"),
17     feature = "multi_instance_runtime"
18 ))]
19 
20 use std::ffi::OsString;
21 use std::fs;
22 use std::mem::{size_of, zeroed};
23 
24 use libc::{
25     c_long, cpu_set_t, getpid, sched_getaffinity, sysconf, CPU_ISSET, _SC_NPROCESSORS_ONLN,
26 };
27 use ylong_runtime::builder::RuntimeBuilder;
28 
29 // Simple asynchronous tasks
test_future(num: usize) -> usize30 async fn test_future(num: usize) -> usize {
31     num
32 }
33 
34 // Complex asynchronous tasks
test_multi_future(i: usize, j: usize) -> usize35 async fn test_multi_future(i: usize, j: usize) -> usize {
36     let result_one = test_future(i).await;
37     let result_two = test_future(j).await;
38 
39     result_one + result_two
40 }
41 
42 // Multi-level nested asynchronous tasks
test_nested_future(i: usize, j: usize) -> usize43 async fn test_nested_future(i: usize, j: usize) -> usize {
44     test_multi_future(i, j).await
45 }
46 
47 // Gets the pid of all current threads (including the main thread)
dump_dir() -> Vec<OsString>48 unsafe fn dump_dir() -> Vec<OsString> {
49     let current_pid = getpid();
50     let dir = format!("/proc/{}/task", current_pid.to_string().as_str());
51     let mut result = Vec::new();
52 
53     for entry in fs::read_dir(dir.as_str()).expect("read failed") {
54         result.push(entry.unwrap().file_name());
55     }
56     result
57 }
58 
59 // Get the name of the thread based on the thread pid
name_of_pid(pid: &str) -> Option<String>60 unsafe fn name_of_pid(pid: &str) -> Option<String> {
61     let current_pid = getpid();
62     let path = format!(
63         "/proc/{}/task/{}/status",
64         current_pid.to_string().as_str(),
65         pid
66     );
67 
68     match fs::read_to_string(path) {
69         Ok(mut result) => {
70             let times_one = result.find('\t').unwrap();
71             let times_two = result.find('\n').unwrap();
72 
73             Some(result.drain(times_one + 1..times_two).collect())
74         }
75         Err(_) => None,
76     }
77 }
78 
get_other_thread_affinity(pid: i32) -> Vec<usize>79 fn get_other_thread_affinity(pid: i32) -> Vec<usize> {
80     unsafe {
81         let mut vec = vec![];
82         let cpus = get_cpu_num() as usize;
83         let mut set = new_cpu_set();
84         sched_getaffinity(pid, size_of::<cpu_set_t>(), &mut set);
85         for i in 0..cpus {
86             if CPU_ISSET(i, &set) {
87                 vec.push(i);
88             }
89         }
90         vec
91     }
92 }
93 
94 /// Returns an empty cpu set
new_cpu_set() -> cpu_set_t95 fn new_cpu_set() -> cpu_set_t {
96     unsafe { zeroed::<cpu_set_t>() }
97 }
98 
get_cpu_num() -> c_long99 fn get_cpu_num() -> c_long {
100     unsafe { sysconf(_SC_NPROCESSORS_ONLN) }
101 }
102 
103 /// SDV test cases for asynchronous thread pool
104 ///
105 /// # Brief
106 /// 1. Constructed environment:
107 ///     1. ASYNCHRONOUS THREAD POOL CAPACITY TOTAL SET TO 1.
108 ///     2. WHETHER TO TIE THE CORE IS_AFFINITY SET TO TRUE.
109 ///     3. THE THREAD NAME IS SET TO "1".
110 ///     4. THE THREAD STACK SIZE IS SET TO 10.
111 /// 2. Asynchronous tasks:
112 ///     1. Simple asynchronous tasks.
113 ///     2. Complex asynchronous tasks.
114 ///     3. Multi-level nested asynchronous tasks.
115 #[test]
sdv_async_pool_001()116 fn sdv_async_pool_001() {
117     let total = 1;
118     let is_affinity = true;
119     let worker_name = String::from("async_pool_001");
120     let stack_size = 10;
121 
122     let runtime = RuntimeBuilder::new_multi_thread()
123         .worker_name(worker_name)
124         .worker_stack_size(stack_size)
125         .worker_num(total)
126         .is_affinity(is_affinity)
127         .build()
128         .unwrap();
129 
130     let handles = vec![
131         runtime.spawn(test_future(1)),
132         runtime.spawn(test_multi_future(1, 2)),
133         runtime.spawn(test_nested_future(1, 2)),
134     ];
135 
136     unsafe {
137         for dir in dump_dir().iter() {
138             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
139             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
140                 if name == *"async-0-async_pool_001" {
141                     #[cfg(target_os = "linux")]
142                     assert_eq!(get_other_thread_affinity(pid).len(), 1);
143                     break;
144                 }
145             }
146         }
147     }
148 
149     for (times, handle) in handles.into_iter().enumerate() {
150         let result = runtime.block_on(handle).unwrap();
151         if times == 0 {
152             assert_eq!(result, 1);
153         }
154         if times == 1 {
155             assert_eq!(result, 3);
156         }
157         if times == 2 {
158             assert_eq!(result, 3);
159         }
160     }
161 }
162 
163 /// SDV test cases for asynchronous thread pool
164 ///
165 /// # Brief
166 /// 1. Constructed environment:
167 ///     1. Asynchronous thread pool capacity total set to 64.
168 ///     2. Whether to tie the core is_affinity set to true.
169 ///     3. The thread name is set to "1".
170 ///     4. The thread stack size is set to 20.
171 /// 2. Asynchronous tasks:
172 ///     1. Simple asynchronous tasks.
173 ///     2. Complex asynchronous tasks.
174 ///     3. Multi-level nested asynchronous tasks.
175 #[test]
sdv_async_pool_002()176 fn sdv_async_pool_002() {
177     let total = 64;
178     let is_affinity = true;
179     let worker_name = String::from("async_pool_002");
180     let stack_size = 20;
181 
182     let runtime = RuntimeBuilder::new_multi_thread()
183         .worker_name(worker_name)
184         .worker_stack_size(stack_size)
185         .worker_num(total)
186         .is_affinity(is_affinity)
187         .build()
188         .unwrap();
189 
190     let handles = vec![
191         runtime.spawn(test_future(1)),
192         runtime.spawn(test_multi_future(1, 2)),
193         runtime.spawn(test_nested_future(1, 2)),
194     ];
195 
196     unsafe {
197         for dir in dump_dir().iter() {
198             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
199             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
200                 if name == *"async-0-async_pool_002" {
201                     #[cfg(target_os = "linux")]
202                     assert_eq!(get_other_thread_affinity(pid).len(), 1);
203                     break;
204                 }
205             }
206         }
207     }
208 
209     for (times, handle) in handles.into_iter().enumerate() {
210         let result = runtime.block_on(handle).unwrap();
211         if times == 0 {
212             assert_eq!(result, 1);
213         }
214         if times == 1 {
215             assert_eq!(result, 3);
216         }
217         if times == 2 {
218             assert_eq!(result, 3);
219         }
220     }
221 }
222 
223 /// SDV test cases for asynchronous thread pool
224 ///
225 /// # Brief
226 /// 1. Constructed environment:
227 ///     1. Asynchronous thread pool capacity total set to 0.
228 ///     2. Whether to tie the core is_affinity set to true.
229 ///     3. The thread name is set to "2".
230 ///     4. The thread stack size is set to 10.
231 /// 2. Asynchronous tasks:
232 ///     1. Simple asynchronous tasks.
233 ///     2. Complex asynchronous tasks.
234 ///     3. Multi-level nested asynchronous tasks.
235 #[test]
sdv_async_pool_003()236 fn sdv_async_pool_003() {
237     let total = 0;
238     let is_affinity = true;
239     let worker_name = String::from("async_pool_003");
240     let stack_size = 10;
241 
242     let runtime = RuntimeBuilder::new_multi_thread()
243         .worker_name(worker_name)
244         .worker_stack_size(stack_size)
245         .worker_num(total)
246         .is_affinity(is_affinity)
247         .build()
248         .unwrap();
249 
250     let handles = vec![
251         runtime.spawn(test_future(1)),
252         runtime.spawn(test_multi_future(1, 2)),
253         runtime.spawn(test_nested_future(1, 2)),
254     ];
255 
256     unsafe {
257         for dir in dump_dir().iter() {
258             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
259             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
260                 if name == *"async-0-async_pool_003" {
261                     #[cfg(target_os = "linux")]
262                     assert_eq!(get_other_thread_affinity(pid).len(), 1);
263                     break;
264                 }
265             }
266         }
267     }
268 
269     for (times, handle) in handles.into_iter().enumerate() {
270         let result = runtime.block_on(handle).unwrap();
271         if times == 0 {
272             assert_eq!(result, 1);
273         }
274         if times == 1 {
275             assert_eq!(result, 3);
276         }
277         if times == 2 {
278             assert_eq!(result, 3);
279         }
280     }
281 }
282 
283 /// SDV test cases for asynchronous thread pool
284 ///
285 /// # Brief
286 /// 1. Constructed environment:
287 ///     1. Asynchronous thread pool capacity total set to 65.
288 ///     2. Whether to tie the core is_affinity set to true.
289 ///     3. The thread name is set to "2".
290 ///     4. The thread stack size is set to 10.
291 /// 2. Asynchronous tasks:
292 ///     1. Simple asynchronous tasks.
293 ///     2. Complex asynchronous tasks.
294 ///     3. Multi-level nested asynchronous tasks.
295 #[test]
sdv_async_pool_004()296 fn sdv_async_pool_004() {
297     let total = 65;
298     let is_affinity = true;
299     let worker_name = String::from("async_pool_004");
300     let stack_size = 20;
301 
302     let runtime = RuntimeBuilder::new_multi_thread()
303         .worker_name(worker_name)
304         .worker_stack_size(stack_size)
305         .worker_num(total)
306         .is_affinity(is_affinity)
307         .build()
308         .unwrap();
309 
310     let handles = vec![
311         runtime.spawn(test_future(1)),
312         runtime.spawn(test_multi_future(1, 2)),
313         runtime.spawn(test_nested_future(1, 2)),
314     ];
315 
316     unsafe {
317         for dir in dump_dir().iter() {
318             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
319             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
320                 if name == *"async-0-async_pool_004" {
321                     #[cfg(target_os = "linux")]
322                     assert_eq!(get_other_thread_affinity(pid).len(), 1);
323                     break;
324                 }
325             }
326         }
327     }
328 
329     for (times, handle) in handles.into_iter().enumerate() {
330         let result = runtime.block_on(handle).unwrap();
331         if times == 0 {
332             assert_eq!(result, 1);
333         }
334         if times == 1 {
335             assert_eq!(result, 3);
336         }
337         if times == 2 {
338             assert_eq!(result, 3);
339         }
340     }
341 }
342 
343 /// SDV test cases for asynchronous thread pool
344 ///
345 /// # Brief
346 /// 1. Constructed environment:
347 ///     1. Asynchronous thread pool capacity total set to 1.
348 ///     2. Whether to tie the core is_affinity set to false.
349 ///     3. The thread name is set to "1".
350 ///     4. The thread stack size is set to 10.
351 /// 2. Asynchronous tasks:
352 ///     1. Simple asynchronous tasks.
353 ///     2. Complex asynchronous tasks.
354 ///     3. Multi-level nested asynchronous tasks.
355 #[test]
sdv_async_pool_005()356 fn sdv_async_pool_005() {
357     let total = 1;
358     let is_affinity = false;
359     let worker_name = String::from("async_pool_005");
360     let stack_size = 10;
361 
362     let runtime = RuntimeBuilder::new_multi_thread()
363         .worker_name(worker_name)
364         .worker_stack_size(stack_size)
365         .worker_num(total)
366         .is_affinity(is_affinity)
367         .build()
368         .unwrap();
369 
370     let handles = vec![
371         runtime.spawn(test_future(1)),
372         runtime.spawn(test_multi_future(1, 2)),
373         runtime.spawn(test_nested_future(1, 2)),
374     ];
375 
376     unsafe {
377         for dir in dump_dir().iter() {
378             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
379             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
380                 if name == *"async-0-async_pool_005" {
381                     #[cfg(target_os = "linux")]
382                     assert_eq!(get_other_thread_affinity(pid).len(), get_cpu_num() as usize);
383                     break;
384                 }
385             }
386         }
387     }
388 
389     for (times, handle) in handles.into_iter().enumerate() {
390         let result = runtime.block_on(handle).unwrap();
391         if times == 0 {
392             assert_eq!(result, 1);
393         }
394         if times == 1 {
395             assert_eq!(result, 3);
396         }
397         if times == 2 {
398             assert_eq!(result, 3);
399         }
400     }
401 }
402 
403 /// SDV test cases for asynchronous thread pool
404 ///
405 /// # Brief
406 /// 1. Constructed environment:
407 ///     1. Asynchronous thread pool capacity total set to 64.
408 ///     2. Whether to tie the core is_affinity set to false.
409 ///     3. The thread name is set to "1".
410 ///     4. The thread stack size is set to 20.
411 /// 2. Asynchronous tasks:
412 ///     1. Simple asynchronous tasks.
413 ///     2. Complex asynchronous tasks.
414 ///     3. Multi-level nested asynchronous tasks.
415 #[test]
sdv_async_pool_006()416 fn sdv_async_pool_006() {
417     let total = 64;
418     let is_affinity = false;
419     let worker_name = String::from("async_pool_006");
420     let stack_size = 20;
421 
422     let runtime = RuntimeBuilder::new_multi_thread()
423         .worker_name(worker_name)
424         .worker_stack_size(stack_size)
425         .worker_num(total)
426         .is_affinity(is_affinity)
427         .build()
428         .unwrap();
429 
430     let handles = vec![
431         runtime.spawn(test_future(1)),
432         runtime.spawn(test_multi_future(1, 2)),
433         runtime.spawn(test_nested_future(1, 2)),
434     ];
435 
436     unsafe {
437         for dir in dump_dir().iter() {
438             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
439             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
440                 if name == *"async-0-async_pool_006" {
441                     #[cfg(target_os = "linux")]
442                     assert_eq!(get_other_thread_affinity(pid).len(), get_cpu_num() as usize);
443                     break;
444                 }
445             }
446         }
447     }
448 
449     for (times, handle) in handles.into_iter().enumerate() {
450         let result = runtime.block_on(handle).unwrap();
451         if times == 0 {
452             assert_eq!(result, 1);
453         }
454         if times == 1 {
455             assert_eq!(result, 3);
456         }
457         if times == 2 {
458             assert_eq!(result, 3);
459         }
460     }
461 }
462 
463 /// SDV test cases for asynchronous thread pool
464 ///
465 /// # Brief
466 /// 1. Constructed environment
467 ///     1. Asynchronous thread pool capacity total set to 0.
468 ///     2. Whether to tie the core is_affinity set to false.
469 ///     3. The thread name is set to "2".
470 ///     4. The thread stack size is set to 10.
471 /// 2. Asynchronous tasks
472 ///     1. Simple asynchronous tasks.
473 ///     2. Complex asynchronous tasks.
474 ///     3. Multi-level nested asynchronous tasks.
475 #[test]
sdv_async_pool_007()476 fn sdv_async_pool_007() {
477     let total = 0;
478     let is_affinity = false;
479     let worker_name = String::from("async_pool_007");
480     let stack_size = 10;
481 
482     let runtime = RuntimeBuilder::new_multi_thread()
483         .worker_name(worker_name)
484         .worker_stack_size(stack_size)
485         .worker_num(total)
486         .is_affinity(is_affinity)
487         .build()
488         .unwrap();
489 
490     let handles = vec![
491         runtime.spawn(test_future(1)),
492         runtime.spawn(test_multi_future(1, 2)),
493         runtime.spawn(test_nested_future(1, 2)),
494     ];
495 
496     unsafe {
497         for dir in dump_dir().iter() {
498             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
499             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
500                 if name == *"async-0-async_pool_007" {
501                     #[cfg(target_os = "linux")]
502                     assert_eq!(get_other_thread_affinity(pid).len(), get_cpu_num() as usize);
503                     break;
504                 }
505             }
506         }
507     }
508 
509     for (times, handle) in handles.into_iter().enumerate() {
510         let result = runtime.block_on(handle).unwrap();
511         if times == 0 {
512             assert_eq!(result, 1);
513         }
514         if times == 1 {
515             assert_eq!(result, 3);
516         }
517         if times == 2 {
518             assert_eq!(result, 3);
519         }
520     }
521 }
522 
523 /// SDV test cases for asynchronous thread pool
524 ///
525 /// # Brief
526 /// 1. Constructed environment:
527 ///     1. Asynchronous thread pool capacity total set to 65.
528 ///     2. Whether to tie the core is_affinity set to false.
529 ///     3. The thread name is set to "2".
530 ///     4. The thread stack size is set to 20.
531 /// 2. Asynchronous tasks:
532 ///     1. Simple asynchronous tasks.
533 ///     2. Complex asynchronous tasks.
534 ///     3. Multi-level nested asynchronous tasks.
535 #[test]
sdv_async_pool_008()536 fn sdv_async_pool_008() {
537     let total = 65;
538     let is_affinity = false;
539     let worker_name = String::from("async_pool_008");
540     let stack_size = 20;
541 
542     let runtime = RuntimeBuilder::new_multi_thread()
543         .worker_name(worker_name)
544         .worker_stack_size(stack_size)
545         .worker_num(total)
546         .is_affinity(is_affinity)
547         .build()
548         .unwrap();
549 
550     let handles = vec![
551         runtime.spawn(test_future(1)),
552         runtime.spawn(test_multi_future(1, 2)),
553         runtime.spawn(test_nested_future(1, 2)),
554     ];
555 
556     unsafe {
557         for dir in dump_dir().iter() {
558             let pid = dir.to_str().unwrap().parse::<i32>().unwrap();
559             if let Some(name) = name_of_pid(pid.to_string().as_str()) {
560                 if name == *"async-0-async_pool_008" {
561                     #[cfg(target_os = "linux")]
562                     assert_eq!(get_other_thread_affinity(pid).len(), get_cpu_num() as usize);
563                     break;
564                 }
565             }
566         }
567     }
568 
569     for (times, handle) in handles.into_iter().enumerate() {
570         let result = runtime.block_on(handle).unwrap();
571         if times == 0 {
572             assert_eq!(result, 1);
573         }
574         if times == 1 {
575             assert_eq!(result, 3);
576         }
577         if times == 2 {
578             assert_eq!(result, 3);
579         }
580     }
581 }
582