1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod qos; 15 mod queue; 16 pub(crate) mod state; 17 use std::collections::HashMap; 18 use std::sync::Arc; 19 mod sql; 20 use qos::Qos; 21 use queue::RunningQueue; 22 use state::sql::SqlList; 23 24 use super::events::TaskManagerEvent; 25 use crate::config::Mode; 26 use crate::error::ErrorCode; 27 use crate::info::TaskInfo; 28 use crate::manage::database::RequestDb; 29 use crate::manage::notifier::Notifier; 30 use crate::manage::task_manager::TaskManagerTx; 31 use crate::service::client::ClientManagerEntry; 32 use crate::service::run_count::RunCountManagerEntry; 33 use crate::task::config::Action; 34 use crate::task::info::State; 35 use crate::task::reason::Reason; 36 use crate::task::request_task::RequestTask; 37 use crate::utils::get_current_timestamp; 38 39 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000; 40 41 // Scheduler 的基本处理逻辑如下: 42 // 1. Scheduler 维护一个当前所有 运行中 和 43 // 待运行的任务优先级队列(scheduler.qos), 44 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时, 45 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange), 46 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。 47 // 48 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。 49 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running), 50 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表, 51 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务, 52 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。 53 // 54 // 注意:未处于运行状态中的任务不会停留在内存中。 55 56 pub(crate) struct Scheduler { 57 qos: Qos, 58 running_queue: RunningQueue, 59 client_manager: ClientManagerEntry, 60 state_handler: state::Handler, 61 pub(crate) resort_scheduled: bool, 62 task_manager: TaskManagerTx, 63 } 64 65 impl Scheduler { init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Scheduler66 pub(crate) fn init( 67 tx: TaskManagerTx, 68 runcount_manager: RunCountManagerEntry, 69 client_manager: ClientManagerEntry, 70 ) -> Scheduler { 71 let mut state_handler = state::Handler::new(tx.clone()); 72 let sql_list = state_handler.init(); 73 let db = RequestDb::get_instance(); 74 for sql in sql_list { 75 if let Err(e) = db.execute(&sql) { 76 error!("TaskManager update network failed {:?}", e); 77 }; 78 } 79 80 Self { 81 qos: Qos::new(), 82 running_queue: RunningQueue::new(tx.clone(), runcount_manager, client_manager.clone()), 83 client_manager, 84 state_handler, 85 resort_scheduled: false, 86 task_manager: tx, 87 } 88 } 89 get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>90 pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> { 91 self.running_queue.get_task(uid, task_id) 92 } 93 tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>94 pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> { 95 self.running_queue.tasks() 96 } 97 running_tasks(&self) -> usize98 pub(crate) fn running_tasks(&self) -> usize { 99 self.running_queue.running_tasks() 100 } 101 dump_tasks(&self)102 pub(crate) fn dump_tasks(&self) { 103 self.running_queue.dump_tasks(); 104 } 105 restore_all_tasks(&mut self)106 pub(crate) fn restore_all_tasks(&mut self) { 107 info!("reschedule restore all tasks"); 108 // Reschedule tasks based on the current `QOS` status. 109 self.schedule_if_not_scheduled(); 110 } 111 start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>112 pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 113 self.start_inner(uid, task_id, false) 114 } 115 resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>116 pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 117 self.start_inner(uid, task_id, true) 118 } 119 start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>120 fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> { 121 let database = RequestDb::get_instance(); 122 let info = RequestDb::get_instance() 123 .get_task_info(task_id) 124 .ok_or(ErrorCode::TaskNotFound)?; 125 126 if (is_resume && info.progress.common_data.state != State::Paused.repr) 127 || (!is_resume && info.progress.common_data.state == State::Paused.repr) 128 { 129 return Err(ErrorCode::TaskStateErr); 130 } 131 database.change_status(task_id, State::Waiting)?; 132 133 let info = RequestDb::get_instance() 134 .get_task_info(task_id) 135 .ok_or(ErrorCode::TaskNotFound)?; 136 if is_resume { 137 Notifier::resume(&self.client_manager, info.build_notify_data()); 138 } 139 140 if info.progress.is_finish() { 141 database.update_task_state(task_id, State::Completed, Reason::Default); 142 if let Some(info) = database.get_task_info(task_id) { 143 Notifier::complete(&self.client_manager, info.build_notify_data()); 144 } 145 } 146 147 if !self.check_config_satisfy(task_id)? { 148 return Ok(()); 149 }; 150 let qos_info = database 151 .get_task_qos_info(task_id) 152 .ok_or(ErrorCode::TaskNotFound)?; 153 self.qos.start_task(uid, qos_info); 154 self.schedule_if_not_scheduled(); 155 Ok(()) 156 } 157 pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>158 pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 159 let database = RequestDb::get_instance(); 160 database.change_status(task_id, State::Paused)?; 161 162 if let Some(info) = database.get_task_info(task_id) { 163 Notifier::pause(&self.client_manager, info.build_notify_data()); 164 } 165 self.running_queue.upload_resume.insert(task_id); 166 if self.qos.remove_task(uid, task_id) { 167 self.schedule_if_not_scheduled(); 168 } 169 Ok(()) 170 } 171 remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>172 pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 173 let database = RequestDb::get_instance(); 174 database.change_status(task_id, State::Removed)?; 175 let info = database 176 .get_task_info(task_id) 177 .ok_or(ErrorCode::TaskNotFound)?; 178 179 Notifier::remove(&self.client_manager, info.build_notify_data()); 180 181 if self.qos.remove_task(uid, task_id) { 182 self.schedule_if_not_scheduled(); 183 } 184 Ok(()) 185 } 186 stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>187 pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 188 let database = RequestDb::get_instance(); 189 database.change_status(task_id, State::Stopped)?; 190 191 if self.qos.remove_task(uid, task_id) { 192 self.schedule_if_not_scheduled(); 193 } 194 Ok(()) 195 } 196 task_completed(&mut self, uid: u64, task_id: u32)197 pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) { 198 info!("scheduler task {} completed", task_id); 199 self.running_queue.task_finish(uid, task_id); 200 201 let database = RequestDb::get_instance(); 202 if self.qos.remove_task(uid, task_id) { 203 self.schedule_if_not_scheduled(); 204 } 205 206 if let Some(info) = database.get_task_qos_info(task_id) { 207 if info.state != State::Running.repr && info.state != State::Waiting.repr { 208 return; 209 } 210 } 211 212 database.update_task_state(task_id, State::Completed, Reason::Default); 213 if let Some(info) = database.get_task_info(task_id) { 214 Notifier::complete(&self.client_manager, info.build_notify_data()); 215 } 216 } 217 task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )218 pub(crate) fn task_cancel( 219 &mut self, 220 uid: u64, 221 task_id: u32, 222 mode: Mode, 223 task_count: &mut HashMap<u64, (usize, usize)>, 224 ) { 225 info!("scheduler task {} canceled", task_id); 226 self.running_queue.task_finish(uid, task_id); 227 228 let database = RequestDb::get_instance(); 229 let Some(info) = database.get_task_info(task_id) else { 230 error!("task {} not found in database", task_id); 231 return; 232 }; 233 match State::from(info.progress.common_data.state) { 234 State::Running | State::Retrying => { 235 if !self.running_queue.try_restart(uid, task_id) { 236 info!("task {} waiting for task limits", task_id); 237 RequestDb::get_instance().update_task_state( 238 task_id, 239 State::Waiting, 240 Reason::RunningTaskMeetLimits, 241 ); 242 } 243 } 244 State::Failed => { 245 info!("task {} cancel with state Failed", task_id); 246 if let Some((front, back)) = task_count.get_mut(&uid) { 247 match mode { 248 Mode::FrontEnd => { 249 if *front > 0 { 250 *front -= 1; 251 } 252 } 253 _ => { 254 if *back > 0 { 255 *back -= 1; 256 } 257 } 258 } 259 } 260 Notifier::fail(&self.client_manager, info.build_notify_data()); 261 #[cfg(feature = "oh")] 262 { 263 let reason = Reason::from(info.common_data.reason); 264 Self::sys_event(info, reason); 265 } 266 } 267 state => { 268 info!( 269 "task {} cancel state {:?} reason {:?}", 270 task_id, 271 state, 272 Reason::from(info.common_data.reason) 273 ); 274 self.running_queue.try_restart(uid, task_id); 275 } 276 } 277 } 278 task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)279 pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) { 280 info!("scheduler task {} failed", task_id); 281 self.running_queue.task_finish(uid, task_id); 282 283 let database = RequestDb::get_instance(); 284 285 if self.qos.remove_task(uid, task_id) { 286 self.schedule_if_not_scheduled(); 287 } 288 289 if let Some(info) = database.get_task_qos_info(task_id) { 290 if info.state != State::Running.repr && info.state != State::Waiting.repr { 291 return; 292 } 293 } 294 295 database.update_task_state(task_id, State::Failed, reason); 296 297 if let Some(info) = database.get_task_info(task_id) { 298 Notifier::fail(&self.client_manager, info.build_notify_data()); 299 #[cfg(feature = "oh")] 300 Self::sys_event(info, reason); 301 } 302 } 303 #[cfg(feature = "oh")] sys_event(info: TaskInfo, reason: Reason)304 pub(crate) fn sys_event(info: TaskInfo, reason: Reason) { 305 use hisysevent::{build_number_param, build_str_param}; 306 307 use crate::sys_event::SysEvent; 308 309 let index = info.progress.common_data.index; 310 let size = info.file_specs.len(); 311 let action = match info.action() { 312 Action::Download => "DOWNLOAD", 313 Action::Upload => "UPLOAD", 314 _ => "UNKNOWN", 315 }; 316 317 SysEvent::task_fault() 318 .param(build_str_param!(crate::sys_event::TASKS_TYPE, action)) 319 .param(build_number_param!( 320 crate::sys_event::TOTAL_FILE_NUM, 321 size as i32 322 )) 323 .param(build_number_param!( 324 crate::sys_event::FAIL_FILE_NUM, 325 (size - index) as i32 326 )) 327 .param(build_number_param!( 328 crate::sys_event::SUCCESS_FILE_NUM, 329 index as i32 330 )) 331 .param(build_number_param!( 332 crate::sys_event::ERROR_INFO, 333 reason.repr as i32 334 )) 335 .write(); 336 } 337 on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,338 pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T) 339 where 340 F: FnOnce(&mut state::Handler, T) -> Option<SqlList>, 341 { 342 let Some(sql_list) = f(&mut self.state_handler, t) else { 343 return; 344 }; 345 let db = RequestDb::get_instance(); 346 for sql in sql_list { 347 if let Err(e) = db.execute(&sql) { 348 error!("TaskManager update network failed {:?}", e); 349 }; 350 } 351 self.reload_all_tasks(); 352 } 353 reload_all_tasks(&mut self)354 pub(crate) fn reload_all_tasks(&mut self) { 355 self.qos.reload_all_tasks(); 356 self.schedule_if_not_scheduled(); 357 } 358 on_rss_change(&mut self, level: i32)359 pub(crate) fn on_rss_change(&mut self, level: i32) { 360 if let Some(new_rss) = self.state_handler.update_rss_level(level) { 361 self.qos.change_rss(new_rss); 362 self.schedule_if_not_scheduled(); 363 } 364 } 365 schedule_if_not_scheduled(&mut self)366 fn schedule_if_not_scheduled(&mut self) { 367 if self.resort_scheduled { 368 return; 369 } 370 self.resort_scheduled = true; 371 let task_manager = self.task_manager.clone(); 372 task_manager.send_event(TaskManagerEvent::Reschedule); 373 } 374 reschedule(&mut self)375 pub(crate) fn reschedule(&mut self) { 376 self.resort_scheduled = false; 377 let changes = self.qos.reschedule(&self.state_handler); 378 let mut qos_remove_queue = vec![]; 379 self.running_queue 380 .reschedule(changes, &mut qos_remove_queue); 381 for (uid, task_id) in qos_remove_queue.iter() { 382 self.qos.apps.remove_task(*uid, *task_id); 383 } 384 if !qos_remove_queue.is_empty() { 385 self.reload_all_tasks(); 386 } 387 } 388 check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>389 pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> { 390 let database = RequestDb::get_instance(); 391 let config = database 392 .get_task_config(task_id) 393 .ok_or(ErrorCode::TaskNotFound)?; 394 395 if let Err(reason) = config.satisfy_network(self.state_handler.network()) { 396 info!( 397 "task {} started, waiting for network {:?}", 398 task_id, 399 self.state_handler.network() 400 ); 401 402 database.update_task_state(task_id, State::Waiting, reason); 403 return Ok(false); 404 } 405 406 if !config.satisfy_foreground(self.state_handler.top_uid()) { 407 info!( 408 "task {} started, waiting for app {}", 409 task_id, config.common_data.uid 410 ); 411 database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate); 412 return Ok(false); 413 } 414 Ok(true) 415 } 416 clear_timeout_tasks(&mut self)417 pub(crate) fn clear_timeout_tasks(&mut self) { 418 let current_time = get_current_timestamp(); 419 let timeout_tasks = self 420 .tasks() 421 .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH) 422 .cloned() 423 .collect::<Vec<_>>(); 424 if timeout_tasks.is_empty() { 425 return; 426 } 427 let database = RequestDb::get_instance(); 428 for task in timeout_tasks { 429 if database 430 .change_status(task.task_id(), State::Stopped) 431 .is_ok() 432 { 433 self.qos.apps.remove_task(task.uid(), task.task_id()); 434 } 435 } 436 self.schedule_if_not_scheduled(); 437 } 438 } 439 440 impl RequestDb { change_status(&self, task_id: u32, state: State) -> Result<(), ErrorCode>441 fn change_status(&self, task_id: u32, state: State) -> Result<(), ErrorCode> { 442 let info = RequestDb::get_instance() 443 .get_task_info(task_id) 444 .ok_or(ErrorCode::TaskNotFound)?; 445 if info.progress.common_data.state == state.repr { 446 if state == State::Removed { 447 return Err(ErrorCode::TaskNotFound); 448 } else { 449 return Err(ErrorCode::TaskStateErr); 450 } 451 } 452 let sql = match state { 453 State::Paused => sql::pause_task(task_id), 454 State::Running => sql::start_task(task_id), 455 State::Stopped => sql::stop_task(task_id), 456 State::Removed => sql::remove_task(task_id), 457 State::Waiting => sql::start_task(task_id), 458 _ => return Err(ErrorCode::Other), 459 }; 460 461 RequestDb::get_instance() 462 .execute(&sql) 463 .map_err(|_| ErrorCode::SystemApi)?; 464 465 let info = RequestDb::get_instance() 466 .get_task_info(task_id) 467 .ok_or(ErrorCode::SystemApi)?; 468 if info.progress.common_data.state != state.repr { 469 Err(ErrorCode::TaskStateErr) 470 } else { 471 Ok(()) 472 } 473 } 474 } 475