1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 //! Tests of asynchronous scheduling.
17 
18 use std::ffi::{ c_void, c_char, c_int, CString };
19 use std::io::Error;
20 use std::os::fd::RawFd;
21 use std::sync::{ Arc, Condvar, Mutex };
22 use std::sync::atomic::{ AtomicI32, Ordering };
23 use std::time::Duration;
24 
25 use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
26 
27 use fusion_scheduler_rust::{ Handler, IEpollHandler, LIBC_EPOLLIN };
28 use fusion_utils_rust::call_debug_enter;
29 
30 const LOG_LABEL: HiLogLabel = HiLogLabel {
31     log_type: LogType::LogCore,
32     domain: 0xD002220,
33     tag: "FusionSchedulerTest",
34 };
35 
36 struct EpollHandlerImpl {
37     fds: [RawFd; 2],
38     data: i32,
39 }
40 
41 impl EpollHandlerImpl {
signal(&self, data: i32)42     fn signal(&self, data: i32)
43     {
44         error!(LOG_LABEL, "EpollHandlerImpl::signal once");
45         let ret = unsafe {
46             libc::write(self.fds[1], std::ptr::addr_of!(data) as *const c_void, std::mem::size_of_val(&data))
47         };
48         if ret == -1 {
49             error!(LOG_LABEL, "libc::write fail");
50         }
51     }
52 
fd(&self) -> RawFd53     fn fd(&self) -> RawFd
54     {
55         self.fds[0]
56     }
57 
dispatch(&mut self, events: u32)58     fn dispatch(&mut self, events: u32)
59     {
60         call_debug_enter!("EpollHandlerImpl::dispatch");
61         if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
62             let data: i32 = 0;
63 
64             let ret = unsafe {
65                 libc::read(self.fds[0], std::ptr::addr_of!(data) as *mut c_void, std::mem::size_of_val(&data))
66             };
67             if ret == -1 {
68                 error!(LOG_LABEL, "libc::read fail");
69             }
70             info!(LOG_LABEL, "EpollHandlerImpl::dispatch({}), data:{}", @public(self.fds[0]), @public(data));
71             self.data = data;
72         }
73     }
74 
data(&self) -> i3275     fn data(&self) -> i32
76     {
77         self.data
78     }
79 }
80 
81 impl Drop for EpollHandlerImpl {
drop(&mut self)82     fn drop(&mut self)
83     {
84         for fd in &mut self.fds {
85             if *fd != -1 {
86                 unsafe { libc::close(*fd) };
87                 *fd = -1;
88             }
89         }
90     }
91 }
92 
93 struct EpollHandler {
94     inner: Mutex<EpollHandlerImpl>,
95     var: Condvar,
96 }
97 
98 impl EpollHandler {
new() -> Self99     fn new() -> Self
100     {
101         let mut fds: [c_int; 2] = [-1; 2];
102 
103         let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
104         if ret != 0 {
105             error!(LOG_LABEL, "In EpollHandler::new, libc::pipe2 fail:{:?}", @public(Error::last_os_error()));
106         }
107         debug!(LOG_LABEL, "In EpollHandler::new, fds:({},{})", @public(fds[0]), @public(fds[1]));
108         Self {
109             inner: Mutex::new(EpollHandlerImpl {
110                 fds,
111                 data: -1,
112             }),
113             var: Condvar::new(),
114         }
115     }
116 
signal(&self, data: i32)117     fn signal(&self, data: i32)
118     {
119         let guard = self.inner.lock().unwrap();
120         guard.signal(data);
121     }
122 
data(&self) -> i32123     fn data(&self) -> i32
124     {
125         let guard = self.inner.lock().unwrap();
126         guard.data()
127     }
128 
wait(&self, dur: Duration) -> bool129     fn wait(&self, dur: Duration) -> bool
130     {
131         call_debug_enter!("EpollHandler::wait");
132         let guard = self.inner.lock().unwrap();
133         let (_, ret) = self.var.wait_timeout(guard, dur).unwrap();
134         if ret.timed_out() {
135             info!(LOG_LABEL, "In EpollHandler::wait, timeout");
136             false
137         } else {
138             true
139         }
140     }
141 }
142 
143 impl IEpollHandler for EpollHandler {
fd(&self) -> RawFd144     fn fd(&self) -> RawFd
145     {
146         let guard = self.inner.lock().unwrap();
147         guard.fd()
148     }
149 
dispatch(&self, events: u32)150     fn dispatch(&self, events: u32)
151     {
152         call_debug_enter!("EpollHandler::dispatch");
153         let mut guard = self.inner.lock().unwrap();
154         guard.dispatch(events);
155         self.var.notify_one();
156     }
157 }
158 
159 #[test]
test_add_epoll_handler()160 fn test_add_epoll_handler()
161 {
162     let handler: Arc<Handler> = Arc::default();
163     let epoll = Arc::new(EpollHandler::new());
164     assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
165 
166     let data: i32 = 13574;
167     epoll.signal(data);
168     assert!(epoll.wait(Duration::from_millis(100)));
169     info!(LOG_LABEL, "In test_add_epoll_handler, data:{}", @public(epoll.data()));
170     assert_eq!(epoll.data(), data);
171     assert!(handler.remove_epoll_handler(epoll).is_ok());
172 }
173 
hash(param: usize) -> usize174 fn hash(param: usize) -> usize
175 {
176     const HASHER: usize = 0xAAAAAAAA;
177     HASHER ^ param
178 }
179 
180 #[test]
test_post_sync_task()181 fn test_post_sync_task()
182 {
183     let handler: Arc<Handler> = Arc::default();
184     let param: usize = 0xAB1807;
185 
186     let ret = handler.post_sync_task(move || {
187         hash(param)
188     });
189     let expected = hash(param);
190     assert_eq!(ret, expected);
191 }
192 
193 #[test]
test_post_async_task()194 fn test_post_async_task()
195 {
196     let handler: Arc<Handler> = Arc::default();
197     let param: usize = 0xAB1807;
198 
199     let mut task_handle = handler.post_async_task(move || {
200         hash(param)
201     });
202     let ret = task_handle.result().unwrap();
203     let expected = hash(param);
204     assert_eq!(ret, expected);
205 }
206 
207 #[test]
test_post_perioric_task()208 fn test_post_perioric_task()
209 {
210     let handler: Arc<Handler> = Arc::default();
211     let epoll = Arc::new(EpollHandler::new());
212     let cloned_epoll = epoll.clone();
213     assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
214 
215     let _ = handler.post_perioric_task(move || {
216         static ID_RADIX: AtomicI32 = AtomicI32::new(1);
217         cloned_epoll.signal(ID_RADIX.fetch_add(1, Ordering::Relaxed));
218     }, None, Duration::from_millis(100), Some(10));
219 
220     std::thread::sleep(Duration::from_secs(1));
221     info!(LOG_LABEL, "In test_post_perioric_task, data:{}", @public(epoll.data()));
222     assert!(epoll.data() >= 10);
223     assert!(handler.remove_epoll_handler(epoll).is_ok());
224 }
225 
226 #[test]
test_post_delayed_task()227 fn test_post_delayed_task()
228 {
229     let handler: Arc<Handler> = Arc::default();
230     let epoll = Arc::new(EpollHandler::new());
231     assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
232     let data: i32 = 13547;
233     let cloned_epoll = epoll.clone();
234 
235     let _ = handler.post_delayed_task(move || {
236         cloned_epoll.signal(data);
237     }, Duration::from_millis(10));
238 
239     assert!(epoll.wait(Duration::from_millis(100)));
240     info!(LOG_LABEL, "In test_post_delayed_task, data:{}", @public(epoll.data()));
241     assert_eq!(epoll.data(), data);
242     assert!(handler.remove_epoll_handler(epoll).is_ok());
243 }
244 
245 #[test]
test_post_blocking_task()246 fn test_post_blocking_task()
247 {
248     let handler: Arc<Handler> = Arc::default();
249     let param: usize = 0xAB1807;
250 
251     let mut task_handle = handler.post_blocking_task(move || {
252         std::thread::sleep(Duration::from_millis(100));
253         hash(param)
254     });
255     let ret = task_handle.result().unwrap();
256     let expected = hash(param);
257     assert_eq!(ret, expected);
258 }
259