1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod qos;
15 mod queue;
16 pub(crate) mod state;
17 use std::collections::HashMap;
18 use std::sync::Arc;
19 mod sql;
20 use qos::Qos;
21 use queue::RunningQueue;
22 use state::sql::SqlList;
23 
24 use super::events::TaskManagerEvent;
25 use crate::config::Mode;
26 use crate::error::ErrorCode;
27 use crate::info::TaskInfo;
28 use crate::manage::database::RequestDb;
29 use crate::manage::notifier::Notifier;
30 use crate::manage::task_manager::TaskManagerTx;
31 use crate::service::client::ClientManagerEntry;
32 use crate::service::run_count::RunCountManagerEntry;
33 use crate::task::config::Action;
34 use crate::task::info::State;
35 use crate::task::reason::Reason;
36 use crate::task::request_task::RequestTask;
37 use crate::utils::get_current_timestamp;
38 
39 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000;
40 
41 // Scheduler 的基本处理逻辑如下:
42 // 1. Scheduler 维护一个当前所有 运行中 和
43 //    待运行的任务优先级队列(scheduler.qos),
44 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时,
45 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange),
46 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。
47 //
48 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。
49 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running),
50 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表,
51 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务,
52 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。
53 //
54 // 注意:未处于运行状态中的任务不会停留在内存中。
55 
56 pub(crate) struct Scheduler {
57     qos: Qos,
58     running_queue: RunningQueue,
59     client_manager: ClientManagerEntry,
60     state_handler: state::Handler,
61     pub(crate) resort_scheduled: bool,
62     task_manager: TaskManagerTx,
63 }
64 
65 impl Scheduler {
init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Scheduler66     pub(crate) fn init(
67         tx: TaskManagerTx,
68         runcount_manager: RunCountManagerEntry,
69         client_manager: ClientManagerEntry,
70     ) -> Scheduler {
71         let mut state_handler = state::Handler::new(tx.clone());
72         let sql_list = state_handler.init();
73         let db = RequestDb::get_instance();
74         for sql in sql_list {
75             if let Err(e) = db.execute(&sql) {
76                 error!("TaskManager update network failed {:?}", e);
77             };
78         }
79 
80         Self {
81             qos: Qos::new(),
82             running_queue: RunningQueue::new(tx.clone(), runcount_manager, client_manager.clone()),
83             client_manager,
84             state_handler,
85             resort_scheduled: false,
86             task_manager: tx,
87         }
88     }
89 
get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>90     pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> {
91         self.running_queue.get_task(uid, task_id)
92     }
93 
tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>94     pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> {
95         self.running_queue.tasks()
96     }
97 
running_tasks(&self) -> usize98     pub(crate) fn running_tasks(&self) -> usize {
99         self.running_queue.running_tasks()
100     }
101 
dump_tasks(&self)102     pub(crate) fn dump_tasks(&self) {
103         self.running_queue.dump_tasks();
104     }
105 
restore_all_tasks(&mut self)106     pub(crate) fn restore_all_tasks(&mut self) {
107         info!("reschedule restore all tasks");
108         // Reschedule tasks based on the current `QOS` status.
109         self.schedule_if_not_scheduled();
110     }
111 
start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>112     pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
113         self.start_inner(uid, task_id, false)
114     }
115 
resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>116     pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
117         self.start_inner(uid, task_id, true)
118     }
119 
start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>120     fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> {
121         let database = RequestDb::get_instance();
122         let info = RequestDb::get_instance()
123             .get_task_info(task_id)
124             .ok_or(ErrorCode::TaskNotFound)?;
125 
126         if (is_resume && info.progress.common_data.state != State::Paused.repr)
127             || (!is_resume && info.progress.common_data.state == State::Paused.repr)
128         {
129             return Err(ErrorCode::TaskStateErr);
130         }
131         database.change_status(task_id, State::Waiting)?;
132 
133         let info = RequestDb::get_instance()
134             .get_task_info(task_id)
135             .ok_or(ErrorCode::TaskNotFound)?;
136         if is_resume {
137             Notifier::resume(&self.client_manager, info.build_notify_data());
138         }
139 
140         if info.progress.is_finish() {
141             database.update_task_state(task_id, State::Completed, Reason::Default);
142             if let Some(info) = database.get_task_info(task_id) {
143                 Notifier::complete(&self.client_manager, info.build_notify_data());
144             }
145         }
146 
147         if !self.check_config_satisfy(task_id)? {
148             return Ok(());
149         };
150         let qos_info = database
151             .get_task_qos_info(task_id)
152             .ok_or(ErrorCode::TaskNotFound)?;
153         self.qos.start_task(uid, qos_info);
154         self.schedule_if_not_scheduled();
155         Ok(())
156     }
157 
pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>158     pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
159         let database = RequestDb::get_instance();
160         database.change_status(task_id, State::Paused)?;
161 
162         if let Some(info) = database.get_task_info(task_id) {
163             Notifier::pause(&self.client_manager, info.build_notify_data());
164         }
165         self.running_queue.upload_resume.insert(task_id);
166         if self.qos.remove_task(uid, task_id) {
167             self.schedule_if_not_scheduled();
168         }
169         Ok(())
170     }
171 
remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>172     pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
173         let database = RequestDb::get_instance();
174         database.change_status(task_id, State::Removed)?;
175         let info = database
176             .get_task_info(task_id)
177             .ok_or(ErrorCode::TaskNotFound)?;
178 
179         Notifier::remove(&self.client_manager, info.build_notify_data());
180 
181         if self.qos.remove_task(uid, task_id) {
182             self.schedule_if_not_scheduled();
183         }
184         Ok(())
185     }
186 
stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>187     pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
188         let database = RequestDb::get_instance();
189         database.change_status(task_id, State::Stopped)?;
190 
191         if self.qos.remove_task(uid, task_id) {
192             self.schedule_if_not_scheduled();
193         }
194         Ok(())
195     }
196 
task_completed(&mut self, uid: u64, task_id: u32)197     pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) {
198         info!("scheduler task {} completed", task_id);
199         self.running_queue.task_finish(uid, task_id);
200 
201         let database = RequestDb::get_instance();
202         if self.qos.remove_task(uid, task_id) {
203             self.schedule_if_not_scheduled();
204         }
205 
206         if let Some(info) = database.get_task_qos_info(task_id) {
207             if info.state != State::Running.repr && info.state != State::Waiting.repr {
208                 return;
209             }
210         }
211 
212         database.update_task_state(task_id, State::Completed, Reason::Default);
213         if let Some(info) = database.get_task_info(task_id) {
214             Notifier::complete(&self.client_manager, info.build_notify_data());
215         }
216     }
217 
task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )218     pub(crate) fn task_cancel(
219         &mut self,
220         uid: u64,
221         task_id: u32,
222         mode: Mode,
223         task_count: &mut HashMap<u64, (usize, usize)>,
224     ) {
225         info!("scheduler task {} canceled", task_id);
226         self.running_queue.task_finish(uid, task_id);
227 
228         let database = RequestDb::get_instance();
229         let Some(info) = database.get_task_info(task_id) else {
230             error!("task {} not found in database", task_id);
231             return;
232         };
233         match State::from(info.progress.common_data.state) {
234             State::Running | State::Retrying => {
235                 if !self.running_queue.try_restart(uid, task_id) {
236                     info!("task {} waiting for task limits", task_id);
237                     RequestDb::get_instance().update_task_state(
238                         task_id,
239                         State::Waiting,
240                         Reason::RunningTaskMeetLimits,
241                     );
242                 }
243             }
244             State::Failed => {
245                 info!("task {} cancel with state Failed", task_id);
246                 if let Some((front, back)) = task_count.get_mut(&uid) {
247                     match mode {
248                         Mode::FrontEnd => {
249                             if *front > 0 {
250                                 *front -= 1;
251                             }
252                         }
253                         _ => {
254                             if *back > 0 {
255                                 *back -= 1;
256                             }
257                         }
258                     }
259                 }
260                 Notifier::fail(&self.client_manager, info.build_notify_data());
261                 #[cfg(feature = "oh")]
262                 {
263                     let reason = Reason::from(info.common_data.reason);
264                     Self::sys_event(info, reason);
265                 }
266             }
267             state => {
268                 info!(
269                     "task {} cancel state {:?} reason {:?}",
270                     task_id,
271                     state,
272                     Reason::from(info.common_data.reason)
273                 );
274                 self.running_queue.try_restart(uid, task_id);
275             }
276         }
277     }
278 
task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)279     pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) {
280         info!("scheduler task {} failed", task_id);
281         self.running_queue.task_finish(uid, task_id);
282 
283         let database = RequestDb::get_instance();
284 
285         if self.qos.remove_task(uid, task_id) {
286             self.schedule_if_not_scheduled();
287         }
288 
289         if let Some(info) = database.get_task_qos_info(task_id) {
290             if info.state != State::Running.repr && info.state != State::Waiting.repr {
291                 return;
292             }
293         }
294 
295         database.update_task_state(task_id, State::Failed, reason);
296 
297         if let Some(info) = database.get_task_info(task_id) {
298             Notifier::fail(&self.client_manager, info.build_notify_data());
299             #[cfg(feature = "oh")]
300             Self::sys_event(info, reason);
301         }
302     }
303     #[cfg(feature = "oh")]
sys_event(info: TaskInfo, reason: Reason)304     pub(crate) fn sys_event(info: TaskInfo, reason: Reason) {
305         use hisysevent::{build_number_param, build_str_param};
306 
307         use crate::sys_event::SysEvent;
308 
309         let index = info.progress.common_data.index;
310         let size = info.file_specs.len();
311         let action = match info.action() {
312             Action::Download => "DOWNLOAD",
313             Action::Upload => "UPLOAD",
314             _ => "UNKNOWN",
315         };
316 
317         SysEvent::task_fault()
318             .param(build_str_param!(crate::sys_event::TASKS_TYPE, action))
319             .param(build_number_param!(
320                 crate::sys_event::TOTAL_FILE_NUM,
321                 size as i32
322             ))
323             .param(build_number_param!(
324                 crate::sys_event::FAIL_FILE_NUM,
325                 (size - index) as i32
326             ))
327             .param(build_number_param!(
328                 crate::sys_event::SUCCESS_FILE_NUM,
329                 index as i32
330             ))
331             .param(build_number_param!(
332                 crate::sys_event::ERROR_INFO,
333                 reason.repr as i32
334             ))
335             .write();
336     }
337 
on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,338     pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T)
339     where
340         F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,
341     {
342         let Some(sql_list) = f(&mut self.state_handler, t) else {
343             return;
344         };
345         let db = RequestDb::get_instance();
346         for sql in sql_list {
347             if let Err(e) = db.execute(&sql) {
348                 error!("TaskManager update network failed {:?}", e);
349             };
350         }
351         self.reload_all_tasks();
352     }
353 
reload_all_tasks(&mut self)354     pub(crate) fn reload_all_tasks(&mut self) {
355         self.qos.reload_all_tasks();
356         self.schedule_if_not_scheduled();
357     }
358 
on_rss_change(&mut self, level: i32)359     pub(crate) fn on_rss_change(&mut self, level: i32) {
360         if let Some(new_rss) = self.state_handler.update_rss_level(level) {
361             self.qos.change_rss(new_rss);
362             self.schedule_if_not_scheduled();
363         }
364     }
365 
schedule_if_not_scheduled(&mut self)366     fn schedule_if_not_scheduled(&mut self) {
367         if self.resort_scheduled {
368             return;
369         }
370         self.resort_scheduled = true;
371         let task_manager = self.task_manager.clone();
372         task_manager.send_event(TaskManagerEvent::Reschedule);
373     }
374 
reschedule(&mut self)375     pub(crate) fn reschedule(&mut self) {
376         self.resort_scheduled = false;
377         let changes = self.qos.reschedule(&self.state_handler);
378         let mut qos_remove_queue = vec![];
379         self.running_queue
380             .reschedule(changes, &mut qos_remove_queue);
381         for (uid, task_id) in qos_remove_queue.iter() {
382             self.qos.apps.remove_task(*uid, *task_id);
383         }
384         if !qos_remove_queue.is_empty() {
385             self.reload_all_tasks();
386         }
387     }
388 
check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>389     pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> {
390         let database = RequestDb::get_instance();
391         let config = database
392             .get_task_config(task_id)
393             .ok_or(ErrorCode::TaskNotFound)?;
394 
395         if let Err(reason) = config.satisfy_network(self.state_handler.network()) {
396             info!(
397                 "task {} started, waiting for network {:?}",
398                 task_id,
399                 self.state_handler.network()
400             );
401 
402             database.update_task_state(task_id, State::Waiting, reason);
403             return Ok(false);
404         }
405 
406         if !config.satisfy_foreground(self.state_handler.top_uid()) {
407             info!(
408                 "task {} started, waiting for app {}",
409                 task_id, config.common_data.uid
410             );
411             database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate);
412             return Ok(false);
413         }
414         Ok(true)
415     }
416 
clear_timeout_tasks(&mut self)417     pub(crate) fn clear_timeout_tasks(&mut self) {
418         let current_time = get_current_timestamp();
419         let timeout_tasks = self
420             .tasks()
421             .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH)
422             .cloned()
423             .collect::<Vec<_>>();
424         if timeout_tasks.is_empty() {
425             return;
426         }
427         let database = RequestDb::get_instance();
428         for task in timeout_tasks {
429             if database
430                 .change_status(task.task_id(), State::Stopped)
431                 .is_ok()
432             {
433                 self.qos.apps.remove_task(task.uid(), task.task_id());
434             }
435         }
436         self.schedule_if_not_scheduled();
437     }
438 }
439 
440 impl RequestDb {
change_status(&self, task_id: u32, state: State) -> Result<(), ErrorCode>441     fn change_status(&self, task_id: u32, state: State) -> Result<(), ErrorCode> {
442         let info = RequestDb::get_instance()
443             .get_task_info(task_id)
444             .ok_or(ErrorCode::TaskNotFound)?;
445         if info.progress.common_data.state == state.repr {
446             if state == State::Removed {
447                 return Err(ErrorCode::TaskNotFound);
448             } else {
449                 return Err(ErrorCode::TaskStateErr);
450             }
451         }
452         let sql = match state {
453             State::Paused => sql::pause_task(task_id),
454             State::Running => sql::start_task(task_id),
455             State::Stopped => sql::stop_task(task_id),
456             State::Removed => sql::remove_task(task_id),
457             State::Waiting => sql::start_task(task_id),
458             _ => return Err(ErrorCode::Other),
459         };
460 
461         RequestDb::get_instance()
462             .execute(&sql)
463             .map_err(|_| ErrorCode::SystemApi)?;
464 
465         let info = RequestDb::get_instance()
466             .get_task_info(task_id)
467             .ok_or(ErrorCode::SystemApi)?;
468         if info.progress.common_data.state != state.repr {
469             Err(ErrorCode::TaskStateErr)
470         } else {
471             Ok(())
472         }
473     }
474 }
475