1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34 
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38     ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39     ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40     if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41         FFRT_LOGE("Insert task failed.");
42     }
43 }
44 }
45 
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49     QoS localQos = qos;
50     int workerQos = localQos();
51     if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52         FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53         return false;
54     }
55     std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56     if (tearDown) {
57         FFRT_LOGE("CPU Worker Manager exit");
58         return false;
59     }
60 
61     auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
62     auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
63     if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
64         FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
65         return false;
66     }
67     uniqueWorker->WorkerSetup(worker);
68     auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
69     if (!result.second) {
70         FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
71         return false;
72     }
73     FFRT_PERF_WORKER_WAKE(workerQos);
74     lock.unlock();
75 #ifdef FFRT_WORKER_MONITOR
76     FFRTFacade::GetWMInstance().SubmitTask();
77 #endif
78     FFRTTraceRecord::UseFfrt();
79     return true;
80 }
81 
GetTaskCount(const QoS & qos)82 int CPUWorkerManager::GetTaskCount(const QoS& qos)
83 {
84     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
85     return sched.RQSize();
86 }
87 
GetWorkerCount(const QoS & qos)88 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
89 {
90     std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
91     return groupCtl[qos()].threads.size();
92 }
93 
94 // pick task from global queue (per qos)
PickUpTaskFromGlobalQueue(WorkerThread * thread)95 CPUEUTask* CPUWorkerManager::PickUpTaskFromGlobalQueue(WorkerThread* thread)
96 {
97     if (tearDown) {
98         return nullptr;
99     }
100 
101     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
102     auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
103     std::lock_guard lg(*lock);
104     return sched.PickNextTask();
105 }
106 
107 // pick task from local queue (per worker)
PickUpTaskFromLocalQueue(WorkerThread * thread)108 CPUEUTask* CPUWorkerManager::PickUpTaskFromLocalQueue(WorkerThread* thread)
109 {
110     if (tearDown) {
111         return nullptr;
112     }
113 
114     CPUWorker* worker = reinterpret_cast<CPUWorker*>(thread);
115     void* task = worker->localFifo.PopHead();
116     return reinterpret_cast<CPUEUTask*>(task);
117 }
118 
PickUpTaskBatch(WorkerThread * thread)119 CPUEUTask* CPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
120 {
121     if (tearDown) {
122         return nullptr;
123     }
124 
125     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
126     auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
127     std::lock_guard lg(*lock);
128     CPUEUTask* task = sched.PickNextTask();
129 
130 #ifdef FFRT_LOCAL_QUEUE_ENABLE
131     if (task == nullptr) {
132         return nullptr;
133     }
134 
135     int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
136     // when there is only one worker, the global queue is equivalent to the local queue
137     // prevents local queue tasks that cannot be executed due to blocking tasks
138     if (wakedWorkerNum <= 1) {
139         return task;
140     }
141 
142     SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
143     int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
144     for (int i = 0; i < expectedTask; i++) {
145         if (queue->GetLength() == queue->GetCapacity()) {
146             return task;
147         }
148 
149         CPUEUTask* task2local = sched.PickNextTask();
150         if (task2local == nullptr) {
151             return task;
152         }
153 
154         queue->PushTail(task2local);
155     }
156 #endif
157 
158     return task;
159 }
160 
StealTaskBatch(WorkerThread * thread)161 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
162 {
163     if (tearDown) {
164         return 0;
165     }
166 
167     std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
168     if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
169         return 0;
170     }
171 
172     AddStealingWorker(thread->GetQos());
173     std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
174         groupCtl[thread->GetQos()].threads.begin();
175     while (iter != groupCtl[thread->GetQos()].threads.end()) {
176         SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
177         unsigned int queueLen = queue->GetLength();
178         if (iter->first != thread && queueLen > 0) {
179             unsigned int popLen = queue->PopHeadToAnotherQueue(
180                 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
181             SubStealingWorker(thread->GetQos());
182             return popLen;
183         }
184         iter++;
185     }
186     SubStealingWorker(thread->GetQos());
187     return 0;
188 }
189 
TryPoll(const WorkerThread * thread,int timeout)190 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
191 {
192     if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
193         return PollerRet::RET_NULL;
194     }
195     auto& pollerMtx = pollersMtx[thread->GetQos()];
196     if (pollerMtx.try_lock()) {
197         polling_[thread->GetQos()] = 1;
198         if (timeout == -1) {
199             monitor->IntoPollWait(thread->GetQos());
200         }
201         PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
202         if (timeout == -1) {
203             monitor->OutOfPollWait(thread->GetQos());
204         }
205         polling_[thread->GetQos()] = 0;
206         pollerMtx.unlock();
207         return ret;
208     }
209     return PollerRet::RET_NULL;
210 }
211 
NotifyLocalTaskAdded(const QoS & qos)212 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
213 {
214     if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
215         monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
216     }
217 }
218 
NotifyTaskPicked(const WorkerThread * thread)219 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
220 {
221     monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
222 }
223 
WorkerRetired(WorkerThread * thread)224 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
225 {
226     pid_t pid = thread->Id();
227     int qos = static_cast<int>(thread->GetQos());
228 
229     {
230         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
231         thread->SetExited(true);
232         thread->Detach();
233         auto worker = std::move(groupCtl[qos].threads[thread]);
234         int ret = groupCtl[qos].threads.erase(thread);
235         if (ret != 1) {
236             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
237         }
238         WorkerLeaveTg(qos, pid);
239 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
240         if (IsBlockAwareInit()) {
241             ret = BlockawareUnregister();
242             if (ret != 0) {
243                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
244             }
245         }
246 #endif
247         worker = nullptr;
248     }
249 }
250 
NotifyTaskAdded(const QoS & qos)251 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
252 {
253     monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
254 }
255 
NotifyWorkers(const QoS & qos,int number)256 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
257 {
258     monitor->NotifyWorkers(qos, number);
259 }
260 
CPUWorkerManager()261 CPUWorkerManager::CPUWorkerManager()
262 {
263     groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
264 }
265 
WorkerJoinTg(const QoS & qos,pid_t pid)266 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
267 {
268     std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
269     if (qos == qos_user_interactive) {
270         (void)JoinWG(pid);
271         return;
272     }
273     auto& tgwrap = groupCtl[qos()];
274     if (!tgwrap.tg) {
275         return;
276     }
277 
278     if ((tgwrap.tgRefCount) == 0) {
279         return;
280     }
281 
282     tgwrap.tg->Join(pid);
283 }
284 
WorkerLeaveTg(const QoS & qos,pid_t pid)285 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
286 {
287     if (qos == qos_user_interactive) {
288         (void)LeaveWG(pid);
289         return;
290     }
291     auto& tgwrap = groupCtl[qos()];
292     if (!tgwrap.tg) {
293         return;
294     }
295 
296     if ((tgwrap.tgRefCount) == 0) {
297         return;
298     }
299 
300     tgwrap.tg->Leave(pid);
301 }
302 
303 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)304 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
305 {
306     return monitor->IsExceedRunningThreshold(thread->GetQos());
307 }
308 
IsBlockAwareInit()309 bool CPUWorkerManager::IsBlockAwareInit()
310 {
311     return monitor->IsBlockAwareInit();
312 }
313 #endif
314 } // namespace ffrt
315