1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "send_task_scheduler.h"
17 #include <algorithm>
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "serial_buffer.h"
21 
22 namespace DistributedDB {
23 // In current parameters, the scheduler will hold 160 MB in extreme situation.
24 // In actual runtime situation, the scheduler will hold no more than 100 MB.
25 static constexpr uint32_t MAX_CAPACITY = 67108864; // 64 M bytes
26 static constexpr uint32_t EXTRA_CAPACITY_FOR_NORMAL_PRIORITY = 33554432; // 32 M bytes
27 static constexpr uint32_t EXTRA_CAPACITY_FOR_HIGH_PRIORITY = 67108864; // 64 M bytes
28 
~SendTaskScheduler()29 SendTaskScheduler::~SendTaskScheduler()
30 {
31     Finalize();
32 }
33 
Initialize()34 void SendTaskScheduler::Initialize()
35 {
36     priorityOrder_.clear();
37     priorityOrder_.push_back(Priority::HIGH);
38     priorityOrder_.push_back(Priority::NORMAL);
39     priorityOrder_.push_back(Priority::LOW);
40     for (const auto &prio : priorityOrder_) {
41         extraCapacityInByteByPrio_[prio] = 0;
42         taskCountByPrio_[prio] = 0;
43         taskDelayCountByPrio_[prio] = 0;
44         taskGroupByPrio_[prio] = TaskListByTarget();
45     }
46     extraCapacityInByteByPrio_[Priority::NORMAL] = EXTRA_CAPACITY_FOR_NORMAL_PRIORITY;
47     extraCapacityInByteByPrio_[Priority::HIGH] = EXTRA_CAPACITY_FOR_HIGH_PRIORITY;
48 }
49 
Finalize()50 void SendTaskScheduler::Finalize()
51 {
52     while (GetTotalTaskCount() != 0) {
53         SendTask task;
54         SendTaskInfo taskInfo;
55         uint32_t totalLength = 0;
56         int errCode = ScheduleOutSendTask(task, taskInfo, totalLength);
57         if (errCode != E_OK) {
58             LOGE("[Scheduler][Final] INTERNAL ERROR.");
59             break; // Not possible to happen
60         }
61         LOGW("[Scheduler][Finalize] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", task.dstTarget.c_str(),
62             taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
63         FinalizeLastScheduleTask();
64     }
65 }
66 
AddSendTaskIntoSchedule(const SendTask & inTask,Priority inPrio)67 int SendTaskScheduler::AddSendTaskIntoSchedule(const SendTask &inTask, Priority inPrio)
68 {
69     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
70     if (curTotalSizeByByte_ >= MAX_CAPACITY + extraCapacityInByteByPrio_[inPrio]) {
71         return -E_CONTAINER_FULL;
72     }
73 
74     uint32_t taskSizeByByte = inTask.buffer->GetSize();
75     curTotalSizeByByte_ += taskSizeByByte;
76     curTotalSizeByTask_++;
77     totalBytesByTarget_[inTask.dstTarget] += taskSizeByByte;
78     if (policyMap_.count(inTask.dstTarget) == 0) {
79         policyMap_[inTask.dstTarget] = TargetPolicy::NO_DELAY;
80     }
81     if (policyMap_[inTask.dstTarget] == TargetPolicy::DELAY) {
82         delayTaskCount_++;
83         taskDelayCountByPrio_[inPrio]++;
84     }
85 
86     taskCountByPrio_[inPrio]++;
87     taskOrderByPrio_[inPrio].push_back(inTask.dstTarget);
88     taskGroupByPrio_[inPrio][inTask.dstTarget].push_back(inTask);
89     return E_OK;
90 }
91 
ScheduleOutSendTask(SendTask & outTask,uint32_t & totalLength)92 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, uint32_t &totalLength)
93 {
94     SendTaskInfo taskInfo;
95     int errCode = ScheduleOutSendTask(outTask, taskInfo, totalLength);
96     if (errCode == E_OK) {
97         LOGI("[Scheduler][OutTask] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", outTask.dstTarget.c_str(),
98             taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
99     }
100     return errCode;
101 }
102 
ScheduleOutSendTask(SendTask & outTask,SendTaskInfo & outTaskInfo,uint32_t & totalLength)103 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, SendTaskInfo &outTaskInfo, uint32_t &totalLength)
104 {
105     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
106     if (curTotalSizeByTask_ == 0) {
107         return -E_CONTAINER_EMPTY;
108     }
109 
110     if (delayTaskCount_ == curTotalSizeByTask_) {
111         // Tasks are all in delay status
112         int errCode = ScheduleDelayTask(outTask, outTaskInfo);
113         if (errCode == E_OK) {
114             // Update last schedule location
115             totalLength = totalBytesByTarget_[outTask.dstTarget];
116             lastScheduleTarget_ = outTask.dstTarget;
117             lastSchedulePriority_ = outTaskInfo.taskPrio;
118             scheduledFlag_ = true;
119         }
120         return errCode;
121     } else {
122         // There are some tasks not in delay status
123         int errCode = ScheduleNoDelayTask(outTask, outTaskInfo);
124         if (errCode == E_OK) {
125             // Update last schedule location
126             totalLength = totalBytesByTarget_[outTask.dstTarget];
127             lastScheduleTarget_ = outTask.dstTarget;
128             lastSchedulePriority_ = outTaskInfo.taskPrio;
129             scheduledFlag_ = true;
130         }
131         return errCode;
132     }
133 }
134 
FinalizeLastScheduleTask()135 int SendTaskScheduler::FinalizeLastScheduleTask()
136 {
137     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
138     if (curTotalSizeByTask_ == 0) {
139         return -E_CONTAINER_EMPTY;
140     }
141     if (!scheduledFlag_) {
142         return -E_NOT_PERMIT;
143     }
144 
145     // Retrieve last scheduled task
146     SendTask task = taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].front();
147 
148     bool isFullBefore = (curTotalSizeByByte_ >= MAX_CAPACITY);
149     uint32_t taskSize = task.buffer->GetSize();
150     curTotalSizeByByte_ -= taskSize;
151     bool isFullAfter = (curTotalSizeByByte_ >= MAX_CAPACITY);
152 
153     totalBytesByTarget_[lastScheduleTarget_] -= taskSize;
154 
155     curTotalSizeByTask_--;
156     taskCountByPrio_[lastSchedulePriority_]--;
157     if (policyMap_[lastScheduleTarget_] == TargetPolicy::DELAY) {
158         delayTaskCount_--;
159         taskDelayCountByPrio_[lastSchedulePriority_]--;
160     }
161 
162     for (auto iter = taskOrderByPrio_[lastSchedulePriority_].begin();
163         iter != taskOrderByPrio_[lastSchedulePriority_].end(); ++iter) {
164         if (*iter == lastScheduleTarget_) {
165             taskOrderByPrio_[lastSchedulePriority_].erase(iter);
166             break;
167         }
168     }
169 
170     taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].pop_front();
171     delete task.buffer;
172     task.buffer = nullptr;
173     scheduledFlag_ = false;
174 
175     if (isFullBefore && !isFullAfter) {
176         return -E_CONTAINER_FULL_TO_NOTFULL;
177     }
178     if (curTotalSizeByTask_ == 0) {
179         return -E_CONTAINER_NOTEMPTY_TO_EMPTY;
180     }
181     if (curTotalSizeByTask_ == delayTaskCount_) {
182         return -E_CONTAINER_ONLY_DELAY_TASK;
183     }
184 
185     return E_OK;
186 }
187 
DelayTaskByTarget(const std::string & inTarget)188 int SendTaskScheduler::DelayTaskByTarget(const std::string &inTarget)
189 {
190     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
191     if (policyMap_.count(inTarget) == 0) {
192         LOGE("[Scheduler][DelayTask] Not found inTarget=%s{private}", inTarget.c_str());
193         return -E_NOT_FOUND;
194     }
195     if (policyMap_[inTarget] == TargetPolicy::DELAY) {
196         return E_OK;
197     }
198 
199     policyMap_[inTarget] = TargetPolicy::DELAY;
200     for (auto &prio : priorityOrder_) {
201         size_t count = taskGroupByPrio_[prio][inTarget].size();
202         taskDelayCountByPrio_[prio] += static_cast<uint32_t>(count);
203         delayTaskCount_ += static_cast<uint32_t>(count);
204     }
205     return E_OK;
206 }
207 
NoDelayTaskByTarget(const std::string & inTarget)208 int SendTaskScheduler::NoDelayTaskByTarget(const std::string &inTarget)
209 {
210     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
211     if (policyMap_.count(inTarget) == 0) {
212         LOGE("[Scheduler][NoDelayTask] Not found inTarget=%s{private}", inTarget.c_str());
213         return -E_NOT_FOUND;
214     }
215     if (policyMap_[inTarget] == TargetPolicy::NO_DELAY) {
216         return E_OK;
217     }
218 
219     policyMap_[inTarget] = TargetPolicy::NO_DELAY;
220     for (auto &prio : priorityOrder_) {
221         size_t count = taskGroupByPrio_[prio][inTarget].size();
222         // Logic guarantee that former not smaller than latter
223         taskDelayCountByPrio_[prio] -= static_cast<uint32_t>(count);
224         delayTaskCount_ -= static_cast<uint32_t>(count);
225     }
226     return E_OK;
227 }
228 
GetTotalTaskCount() const229 uint32_t SendTaskScheduler::GetTotalTaskCount() const
230 {
231     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
232     return curTotalSizeByTask_;
233 }
234 
GetNoDelayTaskCount() const235 uint32_t SendTaskScheduler::GetNoDelayTaskCount() const
236 {
237     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
238     // delayTaskCount_ never greater than curTotalSizeByTask_
239     return curTotalSizeByTask_ - delayTaskCount_;
240 }
241 
ScheduleDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)242 int SendTaskScheduler::ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
243 {
244     for (const auto &prio : priorityOrder_) {
245         if (taskCountByPrio_[prio] == 0) {
246             // No task of this priority
247             continue;
248         }
249         // Logic guarantee that lists access below will not be empty
250         std::string dstTarget = taskOrderByPrio_[prio].front();
251         outTask = taskGroupByPrio_[prio][dstTarget].front();
252         outTaskInfo.delayFlag = true;
253         outTaskInfo.taskPrio = prio;
254         return E_OK;
255     }
256     LOGE("[Scheduler][ScheduleDelay] INTERNAL ERROR : NO TASK.");
257     return -E_INTERNAL_ERROR;
258 }
259 
ScheduleNoDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)260 int SendTaskScheduler::ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
261 {
262     for (const auto &prio : priorityOrder_) {
263         if (taskCountByPrio_[prio] == 0 || taskCountByPrio_[prio] == taskDelayCountByPrio_[prio]) {
264             // No no_delay_task of this priority
265             continue;
266         }
267         // Logic guarantee that lists accessed below will not be empty
268         std::string dstTarget;
269         bool findFlag = false; // Not necessary in fact
270         for (auto iter = taskOrderByPrio_[prio].begin(); iter != taskOrderByPrio_[prio].end(); ++iter) {
271             // Logic guarantee that there is at least one target in orderList that is NO_DELAY
272             dstTarget = *iter;
273             if (policyMap_[dstTarget] == TargetPolicy::NO_DELAY) {
274                 findFlag = true;
275                 break;
276             }
277         }
278         if (!findFlag) {
279             LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO_DELAY NOT FOUND.");
280             return -E_INTERNAL_ERROR;
281         }
282 
283         outTask = taskGroupByPrio_[prio][dstTarget].front();
284         outTaskInfo.delayFlag = false;
285         outTaskInfo.taskPrio = prio;
286         return E_OK;
287     }
288     LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO TASK.");
289     return -E_INTERNAL_ERROR;
290 }
291 
InvalidSendTask(const std::string & target)292 void SendTaskScheduler::InvalidSendTask(const std::string &target)
293 {
294     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
295     for (const auto &priority : priorityOrder_) {
296         if (taskCountByPrio_[priority] == 0) {
297             // No task of this priority
298             continue;
299         }
300         for (auto &sendTask : taskGroupByPrio_[priority][target]) {
301             sendTask.isValid = false;
302             LOGI("[Scheduler][InvalidSendTask] invalid frameId=%" PRIu32, sendTask.frameId);
303             if ((softBusErrCodeMap_.count(target) == 0) || (softBusErrCodeMap_[target] == E_OK)) {
304                 continue;
305             }
306             LOGE("[Scheduler][InvalidSendTask] target=%.3s, errCode=%d", target.c_str(), softBusErrCodeMap_[target]);
307             if (sendTask.onEnd) {
308                 LOGI("[Scheduler][InvalidSendTask] On Send End.");
309                 sendTask.onEnd(softBusErrCodeMap_[target], false);
310                 sendTask.onEnd = nullptr;
311             }
312         }
313     }
314     softBusErrCodeMap_.erase(target);
315 }
316 
SetSoftBusErrCode(const std::string & target,int softBusErrCode)317 void SendTaskScheduler::SetSoftBusErrCode(const std::string &target, int softBusErrCode)
318 {
319     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
320     softBusErrCodeMap_[target] = softBusErrCode;
321 }
322 }