1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "send_task_scheduler.h"
17 #include <algorithm>
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "serial_buffer.h"
21
22 namespace DistributedDB {
23 // In current parameters, the scheduler will hold 160 MB in extreme situation.
24 // In actual runtime situation, the scheduler will hold no more than 100 MB.
25 static constexpr uint32_t MAX_CAPACITY = 67108864; // 64 M bytes
26 static constexpr uint32_t EXTRA_CAPACITY_FOR_NORMAL_PRIORITY = 33554432; // 32 M bytes
27 static constexpr uint32_t EXTRA_CAPACITY_FOR_HIGH_PRIORITY = 67108864; // 64 M bytes
28
~SendTaskScheduler()29 SendTaskScheduler::~SendTaskScheduler()
30 {
31 Finalize();
32 }
33
Initialize()34 void SendTaskScheduler::Initialize()
35 {
36 priorityOrder_.clear();
37 priorityOrder_.push_back(Priority::HIGH);
38 priorityOrder_.push_back(Priority::NORMAL);
39 priorityOrder_.push_back(Priority::LOW);
40 for (const auto &prio : priorityOrder_) {
41 extraCapacityInByteByPrio_[prio] = 0;
42 taskCountByPrio_[prio] = 0;
43 taskDelayCountByPrio_[prio] = 0;
44 taskGroupByPrio_[prio] = TaskListByTarget();
45 }
46 extraCapacityInByteByPrio_[Priority::NORMAL] = EXTRA_CAPACITY_FOR_NORMAL_PRIORITY;
47 extraCapacityInByteByPrio_[Priority::HIGH] = EXTRA_CAPACITY_FOR_HIGH_PRIORITY;
48 }
49
Finalize()50 void SendTaskScheduler::Finalize()
51 {
52 while (GetTotalTaskCount() != 0) {
53 SendTask task;
54 SendTaskInfo taskInfo;
55 uint32_t totalLength = 0;
56 int errCode = ScheduleOutSendTask(task, taskInfo, totalLength);
57 if (errCode != E_OK) {
58 LOGE("[Scheduler][Final] INTERNAL ERROR.");
59 break; // Not possible to happen
60 }
61 LOGW("[Scheduler][Finalize] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", task.dstTarget.c_str(),
62 taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
63 FinalizeLastScheduleTask();
64 }
65 }
66
AddSendTaskIntoSchedule(const SendTask & inTask,Priority inPrio)67 int SendTaskScheduler::AddSendTaskIntoSchedule(const SendTask &inTask, Priority inPrio)
68 {
69 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
70 if (curTotalSizeByByte_ >= MAX_CAPACITY + extraCapacityInByteByPrio_[inPrio]) {
71 return -E_CONTAINER_FULL;
72 }
73
74 uint32_t taskSizeByByte = inTask.buffer->GetSize();
75 curTotalSizeByByte_ += taskSizeByByte;
76 curTotalSizeByTask_++;
77 totalBytesByTarget_[inTask.dstTarget] += taskSizeByByte;
78 if (policyMap_.count(inTask.dstTarget) == 0) {
79 policyMap_[inTask.dstTarget] = TargetPolicy::NO_DELAY;
80 }
81 if (policyMap_[inTask.dstTarget] == TargetPolicy::DELAY) {
82 delayTaskCount_++;
83 taskDelayCountByPrio_[inPrio]++;
84 }
85
86 taskCountByPrio_[inPrio]++;
87 taskOrderByPrio_[inPrio].push_back(inTask.dstTarget);
88 taskGroupByPrio_[inPrio][inTask.dstTarget].push_back(inTask);
89 return E_OK;
90 }
91
ScheduleOutSendTask(SendTask & outTask,uint32_t & totalLength)92 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, uint32_t &totalLength)
93 {
94 SendTaskInfo taskInfo;
95 int errCode = ScheduleOutSendTask(outTask, taskInfo, totalLength);
96 if (errCode == E_OK) {
97 LOGI("[Scheduler][OutTask] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", outTask.dstTarget.c_str(),
98 taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
99 }
100 return errCode;
101 }
102
ScheduleOutSendTask(SendTask & outTask,SendTaskInfo & outTaskInfo,uint32_t & totalLength)103 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, SendTaskInfo &outTaskInfo, uint32_t &totalLength)
104 {
105 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
106 if (curTotalSizeByTask_ == 0) {
107 return -E_CONTAINER_EMPTY;
108 }
109
110 if (delayTaskCount_ == curTotalSizeByTask_) {
111 // Tasks are all in delay status
112 int errCode = ScheduleDelayTask(outTask, outTaskInfo);
113 if (errCode == E_OK) {
114 // Update last schedule location
115 totalLength = totalBytesByTarget_[outTask.dstTarget];
116 lastScheduleTarget_ = outTask.dstTarget;
117 lastSchedulePriority_ = outTaskInfo.taskPrio;
118 scheduledFlag_ = true;
119 }
120 return errCode;
121 } else {
122 // There are some tasks not in delay status
123 int errCode = ScheduleNoDelayTask(outTask, outTaskInfo);
124 if (errCode == E_OK) {
125 // Update last schedule location
126 totalLength = totalBytesByTarget_[outTask.dstTarget];
127 lastScheduleTarget_ = outTask.dstTarget;
128 lastSchedulePriority_ = outTaskInfo.taskPrio;
129 scheduledFlag_ = true;
130 }
131 return errCode;
132 }
133 }
134
FinalizeLastScheduleTask()135 int SendTaskScheduler::FinalizeLastScheduleTask()
136 {
137 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
138 if (curTotalSizeByTask_ == 0) {
139 return -E_CONTAINER_EMPTY;
140 }
141 if (!scheduledFlag_) {
142 return -E_NOT_PERMIT;
143 }
144
145 // Retrieve last scheduled task
146 SendTask task = taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].front();
147
148 bool isFullBefore = (curTotalSizeByByte_ >= MAX_CAPACITY);
149 uint32_t taskSize = task.buffer->GetSize();
150 curTotalSizeByByte_ -= taskSize;
151 bool isFullAfter = (curTotalSizeByByte_ >= MAX_CAPACITY);
152
153 totalBytesByTarget_[lastScheduleTarget_] -= taskSize;
154
155 curTotalSizeByTask_--;
156 taskCountByPrio_[lastSchedulePriority_]--;
157 if (policyMap_[lastScheduleTarget_] == TargetPolicy::DELAY) {
158 delayTaskCount_--;
159 taskDelayCountByPrio_[lastSchedulePriority_]--;
160 }
161
162 for (auto iter = taskOrderByPrio_[lastSchedulePriority_].begin();
163 iter != taskOrderByPrio_[lastSchedulePriority_].end(); ++iter) {
164 if (*iter == lastScheduleTarget_) {
165 taskOrderByPrio_[lastSchedulePriority_].erase(iter);
166 break;
167 }
168 }
169
170 taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].pop_front();
171 delete task.buffer;
172 task.buffer = nullptr;
173 scheduledFlag_ = false;
174
175 if (isFullBefore && !isFullAfter) {
176 return -E_CONTAINER_FULL_TO_NOTFULL;
177 }
178 if (curTotalSizeByTask_ == 0) {
179 return -E_CONTAINER_NOTEMPTY_TO_EMPTY;
180 }
181 if (curTotalSizeByTask_ == delayTaskCount_) {
182 return -E_CONTAINER_ONLY_DELAY_TASK;
183 }
184
185 return E_OK;
186 }
187
DelayTaskByTarget(const std::string & inTarget)188 int SendTaskScheduler::DelayTaskByTarget(const std::string &inTarget)
189 {
190 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
191 if (policyMap_.count(inTarget) == 0) {
192 LOGE("[Scheduler][DelayTask] Not found inTarget=%s{private}", inTarget.c_str());
193 return -E_NOT_FOUND;
194 }
195 if (policyMap_[inTarget] == TargetPolicy::DELAY) {
196 return E_OK;
197 }
198
199 policyMap_[inTarget] = TargetPolicy::DELAY;
200 for (auto &prio : priorityOrder_) {
201 size_t count = taskGroupByPrio_[prio][inTarget].size();
202 taskDelayCountByPrio_[prio] += static_cast<uint32_t>(count);
203 delayTaskCount_ += static_cast<uint32_t>(count);
204 }
205 return E_OK;
206 }
207
NoDelayTaskByTarget(const std::string & inTarget)208 int SendTaskScheduler::NoDelayTaskByTarget(const std::string &inTarget)
209 {
210 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
211 if (policyMap_.count(inTarget) == 0) {
212 LOGE("[Scheduler][NoDelayTask] Not found inTarget=%s{private}", inTarget.c_str());
213 return -E_NOT_FOUND;
214 }
215 if (policyMap_[inTarget] == TargetPolicy::NO_DELAY) {
216 return E_OK;
217 }
218
219 policyMap_[inTarget] = TargetPolicy::NO_DELAY;
220 for (auto &prio : priorityOrder_) {
221 size_t count = taskGroupByPrio_[prio][inTarget].size();
222 // Logic guarantee that former not smaller than latter
223 taskDelayCountByPrio_[prio] -= static_cast<uint32_t>(count);
224 delayTaskCount_ -= static_cast<uint32_t>(count);
225 }
226 return E_OK;
227 }
228
GetTotalTaskCount() const229 uint32_t SendTaskScheduler::GetTotalTaskCount() const
230 {
231 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
232 return curTotalSizeByTask_;
233 }
234
GetNoDelayTaskCount() const235 uint32_t SendTaskScheduler::GetNoDelayTaskCount() const
236 {
237 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
238 // delayTaskCount_ never greater than curTotalSizeByTask_
239 return curTotalSizeByTask_ - delayTaskCount_;
240 }
241
ScheduleDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)242 int SendTaskScheduler::ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
243 {
244 for (const auto &prio : priorityOrder_) {
245 if (taskCountByPrio_[prio] == 0) {
246 // No task of this priority
247 continue;
248 }
249 // Logic guarantee that lists access below will not be empty
250 std::string dstTarget = taskOrderByPrio_[prio].front();
251 outTask = taskGroupByPrio_[prio][dstTarget].front();
252 outTaskInfo.delayFlag = true;
253 outTaskInfo.taskPrio = prio;
254 return E_OK;
255 }
256 LOGE("[Scheduler][ScheduleDelay] INTERNAL ERROR : NO TASK.");
257 return -E_INTERNAL_ERROR;
258 }
259
ScheduleNoDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)260 int SendTaskScheduler::ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
261 {
262 for (const auto &prio : priorityOrder_) {
263 if (taskCountByPrio_[prio] == 0 || taskCountByPrio_[prio] == taskDelayCountByPrio_[prio]) {
264 // No no_delay_task of this priority
265 continue;
266 }
267 // Logic guarantee that lists accessed below will not be empty
268 std::string dstTarget;
269 bool findFlag = false; // Not necessary in fact
270 for (auto iter = taskOrderByPrio_[prio].begin(); iter != taskOrderByPrio_[prio].end(); ++iter) {
271 // Logic guarantee that there is at least one target in orderList that is NO_DELAY
272 dstTarget = *iter;
273 if (policyMap_[dstTarget] == TargetPolicy::NO_DELAY) {
274 findFlag = true;
275 break;
276 }
277 }
278 if (!findFlag) {
279 LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO_DELAY NOT FOUND.");
280 return -E_INTERNAL_ERROR;
281 }
282
283 outTask = taskGroupByPrio_[prio][dstTarget].front();
284 outTaskInfo.delayFlag = false;
285 outTaskInfo.taskPrio = prio;
286 return E_OK;
287 }
288 LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO TASK.");
289 return -E_INTERNAL_ERROR;
290 }
291
InvalidSendTask(const std::string & target)292 void SendTaskScheduler::InvalidSendTask(const std::string &target)
293 {
294 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
295 for (const auto &priority : priorityOrder_) {
296 if (taskCountByPrio_[priority] == 0) {
297 // No task of this priority
298 continue;
299 }
300 for (auto &sendTask : taskGroupByPrio_[priority][target]) {
301 sendTask.isValid = false;
302 LOGI("[Scheduler][InvalidSendTask] invalid frameId=%" PRIu32, sendTask.frameId);
303 if ((softBusErrCodeMap_.count(target) == 0) || (softBusErrCodeMap_[target] == E_OK)) {
304 continue;
305 }
306 LOGE("[Scheduler][InvalidSendTask] target=%.3s, errCode=%d", target.c_str(), softBusErrCodeMap_[target]);
307 if (sendTask.onEnd) {
308 LOGI("[Scheduler][InvalidSendTask] On Send End.");
309 sendTask.onEnd(softBusErrCodeMap_[target], false);
310 sendTask.onEnd = nullptr;
311 }
312 }
313 }
314 softBusErrCodeMap_.erase(target);
315 }
316
SetSoftBusErrCode(const std::string & target,int softBusErrCode)317 void SendTaskScheduler::SetSoftBusErrCode(const std::string &target, int softBusErrCode)
318 {
319 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
320 softBusErrCodeMap_[target] = softBusErrCode;
321 }
322 }