1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod apps;
15 mod direction;
16 mod rss;
17 
18 use apps::SortedApps;
19 pub(crate) use direction::{QosChanges, QosDirection, QosLevel};
20 pub(crate) use rss::RssCapacity;
21 
22 use super::state;
23 use crate::manage::database::TaskQosInfo;
24 use crate::task::config::Action;
25 
26 pub(crate) struct Qos {
27     pub(crate) apps: SortedApps,
28     capacity: RssCapacity,
29 }
30 
31 impl Qos {
new() -> Self32     pub(crate) fn new() -> Self {
33         Self {
34             apps: SortedApps::init(),
35             capacity: RssCapacity::LEVEL0,
36         }
37     }
38 
39     // qos 里包含upload和download,通过empty确认哪些需要更新。
start_task(&mut self, uid: u64, task: TaskQosInfo)40     pub(crate) fn start_task(&mut self, uid: u64, task: TaskQosInfo) {
41         // Only tasks that can run automatically can be added to the qos queue.
42         self.apps.insert_task(uid, task);
43     }
44 
remove_task(&mut self, uid: u64, task_id: u32) -> bool45     pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> bool {
46         self.apps.remove_task(uid, task_id)
47     }
48 
reload_all_tasks(&mut self)49     pub(crate) fn reload_all_tasks(&mut self) {
50         self.apps.reload_all_tasks();
51     }
52 
change_rss(&mut self, rss: RssCapacity)53     pub(crate) fn change_rss(&mut self, rss: RssCapacity) {
54         self.capacity = rss;
55     }
56 }
57 
58 impl Qos {
59     // Reschedule qos queue and get directions.
reschedule(&mut self, state: &state::Handler) -> QosChanges60     pub(crate) fn reschedule(&mut self, state: &state::Handler) -> QosChanges {
61         self.apps.sort(state.top_uid(), state.top_user());
62         let mut changes = QosChanges::new();
63         changes.download = Some(self.reschedule_inner(Action::Download));
64         changes.upload = Some(self.reschedule_inner(Action::Upload));
65         changes
66     }
67 
reschedule_inner(&mut self, action: Action) -> Vec<QosDirection>68     fn reschedule_inner(&mut self, action: Action) -> Vec<QosDirection> {
69         let m1 = self.capacity.m1();
70         let m1_speed = self.capacity.m1_speed();
71         let m2 = self.capacity.m2();
72         let m2_speed = self.capacity.m2_speed();
73         let m3 = self.capacity.m3();
74         let m3_speed = self.capacity.m3_speed();
75 
76         let mut count = 0;
77         let mut app_i = 0;
78         let mut task_i = 0;
79 
80         let mut qos_vec = Vec::new();
81 
82         for (i, task) in self.apps.iter().enumerate().flat_map(|(i, app)| {
83             if !app.is_empty() {
84                 app_i = i;
85             }
86             app.iter().enumerate()
87         }) {
88             if task.action() != action {
89                 continue;
90             }
91             if count < m1 {
92                 qos_vec.push(QosDirection::new(task.uid(), task.task_id(), m1_speed));
93             } else if count < m1 + m2 {
94                 qos_vec.push(QosDirection::new(task.uid(), task.task_id(), m2_speed));
95             }
96             count += 1;
97             if count == m1 + m2 {
98                 task_i = i;
99                 break;
100             }
101         }
102 
103         // Here if the number of all uncompleted tasks is less than `m1 + m2`,
104         // we don not need to adjust `m3` position.
105         if count < m1 + m2 {
106             return qos_vec;
107         }
108 
109         // The filtering logic for fair position is executed as follows:
110         // Each app will take turns taking one task to execute until the
111         // fair position is filled.
112         let mut i = 0;
113 
114         loop {
115             let mut no_tasks_left = true;
116 
117             for tasks in self.apps.iter().skip(app_i + 1).map(|app| &app[..]) {
118                 let task = match tasks.get(i) {
119                     Some(task) => {
120                         no_tasks_left = false;
121                         task
122                     }
123                     None => continue,
124                 };
125 
126                 if task.action() != action {
127                     continue;
128                 }
129 
130                 if count < m1 + m2 + m3 {
131                     qos_vec.push(QosDirection::new(task.uid(), task.task_id(), m3_speed));
132                 } else {
133                     return qos_vec;
134                 }
135 
136                 count += 1;
137             }
138 
139             if no_tasks_left {
140                 break;
141             }
142             i += 1;
143         }
144 
145         // supplement fair position with remaining tasks
146         for task in self
147             .apps
148             .iter()
149             .skip(app_i)
150             .take(1)
151             .flat_map(|app| app.iter().skip(task_i + 1))
152         {
153             if task.action() != action {
154                 continue;
155             }
156 
157             if count < m1 + m2 + m3 {
158                 qos_vec.push(QosDirection::new(task.uid(), task.task_id(), m3_speed));
159             } else {
160                 return qos_vec;
161             }
162             count += 1;
163         }
164         qos_vec
165     }
166 }
167