1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Tests of asynchronous scheduling.
17
18 use std::ffi::{ c_void, c_char, c_int, CString };
19 use std::io::Error;
20 use std::os::fd::RawFd;
21 use std::sync::{ Arc, Condvar, Mutex };
22 use std::sync::atomic::{ AtomicI32, Ordering };
23 use std::time::Duration;
24
25 use hilog_rust::{ debug, info, error, hilog, HiLogLabel, LogType };
26
27 use fusion_scheduler_rust::{ Handler, IEpollHandler, LIBC_EPOLLIN };
28 use fusion_utils_rust::call_debug_enter;
29
30 const LOG_LABEL: HiLogLabel = HiLogLabel {
31 log_type: LogType::LogCore,
32 domain: 0xD002220,
33 tag: "FusionSchedulerTest",
34 };
35
36 struct EpollHandlerImpl {
37 fds: [RawFd; 2],
38 data: i32,
39 }
40
41 impl EpollHandlerImpl {
signal(&self, data: i32)42 fn signal(&self, data: i32)
43 {
44 error!(LOG_LABEL, "EpollHandlerImpl::signal once");
45 let ret = unsafe {
46 libc::write(self.fds[1], std::ptr::addr_of!(data) as *const c_void, std::mem::size_of_val(&data))
47 };
48 if ret == -1 {
49 error!(LOG_LABEL, "libc::write fail");
50 }
51 }
52
fd(&self) -> RawFd53 fn fd(&self) -> RawFd
54 {
55 self.fds[0]
56 }
57
dispatch(&mut self, events: u32)58 fn dispatch(&mut self, events: u32)
59 {
60 call_debug_enter!("EpollHandlerImpl::dispatch");
61 if (events & LIBC_EPOLLIN) == LIBC_EPOLLIN {
62 let data: i32 = 0;
63
64 let ret = unsafe {
65 libc::read(self.fds[0], std::ptr::addr_of!(data) as *mut c_void, std::mem::size_of_val(&data))
66 };
67 if ret == -1 {
68 error!(LOG_LABEL, "libc::read fail");
69 }
70 info!(LOG_LABEL, "EpollHandlerImpl::dispatch({}), data:{}", @public(self.fds[0]), @public(data));
71 self.data = data;
72 }
73 }
74
data(&self) -> i3275 fn data(&self) -> i32
76 {
77 self.data
78 }
79 }
80
81 impl Drop for EpollHandlerImpl {
drop(&mut self)82 fn drop(&mut self)
83 {
84 for fd in &mut self.fds {
85 if *fd != -1 {
86 unsafe { libc::close(*fd) };
87 *fd = -1;
88 }
89 }
90 }
91 }
92
93 struct EpollHandler {
94 inner: Mutex<EpollHandlerImpl>,
95 var: Condvar,
96 }
97
98 impl EpollHandler {
new() -> Self99 fn new() -> Self
100 {
101 let mut fds: [c_int; 2] = [-1; 2];
102
103 let ret = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
104 if ret != 0 {
105 error!(LOG_LABEL, "In EpollHandler::new, libc::pipe2 fail:{:?}", @public(Error::last_os_error()));
106 }
107 debug!(LOG_LABEL, "In EpollHandler::new, fds:({},{})", @public(fds[0]), @public(fds[1]));
108 Self {
109 inner: Mutex::new(EpollHandlerImpl {
110 fds,
111 data: -1,
112 }),
113 var: Condvar::new(),
114 }
115 }
116
signal(&self, data: i32)117 fn signal(&self, data: i32)
118 {
119 let guard = self.inner.lock().unwrap();
120 guard.signal(data);
121 }
122
data(&self) -> i32123 fn data(&self) -> i32
124 {
125 let guard = self.inner.lock().unwrap();
126 guard.data()
127 }
128
wait(&self, dur: Duration) -> bool129 fn wait(&self, dur: Duration) -> bool
130 {
131 call_debug_enter!("EpollHandler::wait");
132 let guard = self.inner.lock().unwrap();
133 let (_, ret) = self.var.wait_timeout(guard, dur).unwrap();
134 if ret.timed_out() {
135 info!(LOG_LABEL, "In EpollHandler::wait, timeout");
136 false
137 } else {
138 true
139 }
140 }
141 }
142
143 impl IEpollHandler for EpollHandler {
fd(&self) -> RawFd144 fn fd(&self) -> RawFd
145 {
146 let guard = self.inner.lock().unwrap();
147 guard.fd()
148 }
149
dispatch(&self, events: u32)150 fn dispatch(&self, events: u32)
151 {
152 call_debug_enter!("EpollHandler::dispatch");
153 let mut guard = self.inner.lock().unwrap();
154 guard.dispatch(events);
155 self.var.notify_one();
156 }
157 }
158
159 #[test]
test_add_epoll_handler()160 fn test_add_epoll_handler()
161 {
162 let handler: Arc<Handler> = Arc::default();
163 let epoll = Arc::new(EpollHandler::new());
164 assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
165
166 let data: i32 = 13574;
167 epoll.signal(data);
168 assert!(epoll.wait(Duration::from_millis(100)));
169 info!(LOG_LABEL, "In test_add_epoll_handler, data:{}", @public(epoll.data()));
170 assert_eq!(epoll.data(), data);
171 assert!(handler.remove_epoll_handler(epoll).is_ok());
172 }
173
hash(param: usize) -> usize174 fn hash(param: usize) -> usize
175 {
176 const HASHER: usize = 0xAAAAAAAA;
177 HASHER ^ param
178 }
179
180 #[test]
test_post_sync_task()181 fn test_post_sync_task()
182 {
183 let handler: Arc<Handler> = Arc::default();
184 let param: usize = 0xAB1807;
185
186 let ret = handler.post_sync_task(move || {
187 hash(param)
188 });
189 let expected = hash(param);
190 assert_eq!(ret, expected);
191 }
192
193 #[test]
test_post_async_task()194 fn test_post_async_task()
195 {
196 let handler: Arc<Handler> = Arc::default();
197 let param: usize = 0xAB1807;
198
199 let mut task_handle = handler.post_async_task(move || {
200 hash(param)
201 });
202 let ret = task_handle.result().unwrap();
203 let expected = hash(param);
204 assert_eq!(ret, expected);
205 }
206
207 #[test]
test_post_perioric_task()208 fn test_post_perioric_task()
209 {
210 let handler: Arc<Handler> = Arc::default();
211 let epoll = Arc::new(EpollHandler::new());
212 let cloned_epoll = epoll.clone();
213 assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
214
215 let _ = handler.post_perioric_task(move || {
216 static ID_RADIX: AtomicI32 = AtomicI32::new(1);
217 cloned_epoll.signal(ID_RADIX.fetch_add(1, Ordering::Relaxed));
218 }, None, Duration::from_millis(100), Some(10));
219
220 std::thread::sleep(Duration::from_secs(1));
221 info!(LOG_LABEL, "In test_post_perioric_task, data:{}", @public(epoll.data()));
222 assert!(epoll.data() >= 10);
223 assert!(handler.remove_epoll_handler(epoll).is_ok());
224 }
225
226 #[test]
test_post_delayed_task()227 fn test_post_delayed_task()
228 {
229 let handler: Arc<Handler> = Arc::default();
230 let epoll = Arc::new(EpollHandler::new());
231 assert!(handler.add_epoll_handler(epoll.clone()).is_ok());
232 let data: i32 = 13547;
233 let cloned_epoll = epoll.clone();
234
235 let _ = handler.post_delayed_task(move || {
236 cloned_epoll.signal(data);
237 }, Duration::from_millis(10));
238
239 assert!(epoll.wait(Duration::from_millis(100)));
240 info!(LOG_LABEL, "In test_post_delayed_task, data:{}", @public(epoll.data()));
241 assert_eq!(epoll.data(), data);
242 assert!(handler.remove_epoll_handler(epoll).is_ok());
243 }
244
245 #[test]
test_post_blocking_task()246 fn test_post_blocking_task()
247 {
248 let handler: Arc<Handler> = Arc::default();
249 let param: usize = 0xAB1807;
250
251 let mut task_handle = handler.post_blocking_task(move || {
252 std::thread::sleep(Duration::from_millis(100));
253 hash(param)
254 });
255 let ret = task_handle.result().unwrap();
256 let expected = hash(param);
257 assert_eq!(ret, expected);
258 }
259