1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38 ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39 ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40 if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41 FFRT_LOGE("Insert task failed.");
42 }
43 }
44 }
45
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49 QoS localQos = qos;
50 int workerQos = localQos();
51 if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52 FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53 return false;
54 }
55 std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56 if (tearDown) {
57 FFRT_LOGE("CPU Worker Manager exit");
58 return false;
59 }
60
61 auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
62 auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
63 if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
64 FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
65 return false;
66 }
67 uniqueWorker->WorkerSetup(worker);
68 auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
69 if (!result.second) {
70 FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
71 return false;
72 }
73 FFRT_PERF_WORKER_WAKE(workerQos);
74 lock.unlock();
75 #ifdef FFRT_WORKER_MONITOR
76 FFRTFacade::GetWMInstance().SubmitTask();
77 #endif
78 FFRTTraceRecord::UseFfrt();
79 return true;
80 }
81
GetTaskCount(const QoS & qos)82 int CPUWorkerManager::GetTaskCount(const QoS& qos)
83 {
84 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
85 return sched.RQSize();
86 }
87
GetWorkerCount(const QoS & qos)88 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
89 {
90 std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
91 return groupCtl[qos()].threads.size();
92 }
93
94 // pick task from global queue (per qos)
PickUpTaskFromGlobalQueue(WorkerThread * thread)95 CPUEUTask* CPUWorkerManager::PickUpTaskFromGlobalQueue(WorkerThread* thread)
96 {
97 if (tearDown) {
98 return nullptr;
99 }
100
101 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
102 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
103 std::lock_guard lg(*lock);
104 return sched.PickNextTask();
105 }
106
107 // pick task from local queue (per worker)
PickUpTaskFromLocalQueue(WorkerThread * thread)108 CPUEUTask* CPUWorkerManager::PickUpTaskFromLocalQueue(WorkerThread* thread)
109 {
110 if (tearDown) {
111 return nullptr;
112 }
113
114 CPUWorker* worker = reinterpret_cast<CPUWorker*>(thread);
115 void* task = worker->localFifo.PopHead();
116 return reinterpret_cast<CPUEUTask*>(task);
117 }
118
PickUpTaskBatch(WorkerThread * thread)119 CPUEUTask* CPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
120 {
121 if (tearDown) {
122 return nullptr;
123 }
124
125 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
126 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
127 std::lock_guard lg(*lock);
128 CPUEUTask* task = sched.PickNextTask();
129
130 #ifdef FFRT_LOCAL_QUEUE_ENABLE
131 if (task == nullptr) {
132 return nullptr;
133 }
134
135 int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
136 // when there is only one worker, the global queue is equivalent to the local queue
137 // prevents local queue tasks that cannot be executed due to blocking tasks
138 if (wakedWorkerNum <= 1) {
139 return task;
140 }
141
142 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
143 int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
144 for (int i = 0; i < expectedTask; i++) {
145 if (queue->GetLength() == queue->GetCapacity()) {
146 return task;
147 }
148
149 CPUEUTask* task2local = sched.PickNextTask();
150 if (task2local == nullptr) {
151 return task;
152 }
153
154 queue->PushTail(task2local);
155 }
156 #endif
157
158 return task;
159 }
160
StealTaskBatch(WorkerThread * thread)161 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
162 {
163 if (tearDown) {
164 return 0;
165 }
166
167 std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
168 if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
169 return 0;
170 }
171
172 AddStealingWorker(thread->GetQos());
173 std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
174 groupCtl[thread->GetQos()].threads.begin();
175 while (iter != groupCtl[thread->GetQos()].threads.end()) {
176 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
177 unsigned int queueLen = queue->GetLength();
178 if (iter->first != thread && queueLen > 0) {
179 unsigned int popLen = queue->PopHeadToAnotherQueue(
180 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
181 SubStealingWorker(thread->GetQos());
182 return popLen;
183 }
184 iter++;
185 }
186 SubStealingWorker(thread->GetQos());
187 return 0;
188 }
189
TryPoll(const WorkerThread * thread,int timeout)190 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
191 {
192 if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
193 return PollerRet::RET_NULL;
194 }
195 auto& pollerMtx = pollersMtx[thread->GetQos()];
196 if (pollerMtx.try_lock()) {
197 polling_[thread->GetQos()] = 1;
198 if (timeout == -1) {
199 monitor->IntoPollWait(thread->GetQos());
200 }
201 PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
202 if (timeout == -1) {
203 monitor->OutOfPollWait(thread->GetQos());
204 }
205 polling_[thread->GetQos()] = 0;
206 pollerMtx.unlock();
207 return ret;
208 }
209 return PollerRet::RET_NULL;
210 }
211
NotifyLocalTaskAdded(const QoS & qos)212 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
213 {
214 if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
215 monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
216 }
217 }
218
NotifyTaskPicked(const WorkerThread * thread)219 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
220 {
221 monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
222 }
223
WorkerRetired(WorkerThread * thread)224 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
225 {
226 pid_t pid = thread->Id();
227 int qos = static_cast<int>(thread->GetQos());
228
229 {
230 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
231 thread->SetExited(true);
232 thread->Detach();
233 auto worker = std::move(groupCtl[qos].threads[thread]);
234 int ret = groupCtl[qos].threads.erase(thread);
235 if (ret != 1) {
236 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
237 }
238 WorkerLeaveTg(qos, pid);
239 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
240 if (IsBlockAwareInit()) {
241 ret = BlockawareUnregister();
242 if (ret != 0) {
243 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
244 }
245 }
246 #endif
247 worker = nullptr;
248 }
249 }
250
NotifyTaskAdded(const QoS & qos)251 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
252 {
253 monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
254 }
255
NotifyWorkers(const QoS & qos,int number)256 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
257 {
258 monitor->NotifyWorkers(qos, number);
259 }
260
CPUWorkerManager()261 CPUWorkerManager::CPUWorkerManager()
262 {
263 groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
264 }
265
WorkerJoinTg(const QoS & qos,pid_t pid)266 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
267 {
268 std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
269 if (qos == qos_user_interactive) {
270 (void)JoinWG(pid);
271 return;
272 }
273 auto& tgwrap = groupCtl[qos()];
274 if (!tgwrap.tg) {
275 return;
276 }
277
278 if ((tgwrap.tgRefCount) == 0) {
279 return;
280 }
281
282 tgwrap.tg->Join(pid);
283 }
284
WorkerLeaveTg(const QoS & qos,pid_t pid)285 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
286 {
287 if (qos == qos_user_interactive) {
288 (void)LeaveWG(pid);
289 return;
290 }
291 auto& tgwrap = groupCtl[qos()];
292 if (!tgwrap.tg) {
293 return;
294 }
295
296 if ((tgwrap.tgRefCount) == 0) {
297 return;
298 }
299
300 tgwrap.tg->Leave(pid);
301 }
302
303 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)304 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
305 {
306 return monitor->IsExceedRunningThreshold(thread->GetQos());
307 }
308
IsBlockAwareInit()309 bool CPUWorkerManager::IsBlockAwareInit()
310 {
311 return monitor->IsBlockAwareInit();
312 }
313 #endif
314 } // namespace ffrt
315