1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <climits>
20 #include <unistd.h>
21 #include <securec.h>
22 #include "sched/scheduler.h"
23 #include "eu/execute_unit.h"
24 #include "dfx/log/ffrt_log_api.h"
25 #include "dfx/sysevent/sysevent.h"
26 #include "dfx/trace_record/ffrt_trace_record.h"
27 #include "internal_inc/config.h"
28 #include "internal_inc/osal.h"
29 #include "util/name_manager.h"
30 #include "sync/poller.h"
31 #include "util/ffrt_facade.h"
32 #include "util/spmc_queue.h"
33 
34 namespace {
35 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
36 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
37 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
38 constexpr int JITTER_DELAY_MS = 5;
39 #endif
40 }
41 
42 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops)43 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
44 {
45     SetupMonitor();
46 }
47 
~CPUMonitor()48 CPUMonitor::~CPUMonitor()
49 {
50     if (monitorThread != nullptr) {
51         monitorThread->join();
52     }
53     delete monitorThread;
54     monitorThread = nullptr;
55 }
56 
SetupMonitor()57 void CPUMonitor::SetupMonitor()
58 {
59     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
60         ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
61         ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
62         setWorkerMaxNum[qos] = false;
63     }
64 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
65     memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
66     wakeupCond.check_ahead = false;
67     wakeupCond.global.low = 0;
68     wakeupCond.global.high = 0;
69     for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
70         wakeupCond.local[i].low = 0;
71         if (i < qosMonitorMaxNum) {
72             wakeupCond.local[i].high = UINT_MAX;
73             wakeupCond.global.low += wakeupCond.local[i].low;
74             wakeupCond.global.high = UINT_MAX;
75         } else {
76             wakeupCond.local[i].high = 0;
77         }
78     }
79 #endif
80 }
81 
StartMonitor()82 void CPUMonitor::StartMonitor()
83 {
84 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
85     int ret = BlockawareInit(&keyPtr);
86     if (ret != 0) {
87         FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
88     } else {
89         blockAwareInit = true;
90     }
91 #else
92     monitorThread = nullptr;
93 #endif
94 }
95 
SetWorkerMaxNum(const QoS & qos,int num)96 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
97 {
98     WorkerCtrl& workerCtrl = ctrlQueue[qos()];
99     workerCtrl.lock.lock();
100     if (setWorkerMaxNum[qos()]) {
101         FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
102         workerCtrl.lock.unlock();
103         return -1;
104     }
105     if (num <= 0 || num > QOS_WORKER_MAXNUM) {
106         FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
107         workerCtrl.lock.unlock();
108         return -1;
109     }
110     workerCtrl.hardLimit = num;
111     setWorkerMaxNum[qos()] = true;
112     workerCtrl.lock.unlock();
113     return 0;
114 }
115 
GetMonitorTid() const116 uint32_t CPUMonitor::GetMonitorTid() const
117 {
118     return monitorTid;
119 }
120 
121 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)122 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
123 {
124     return &wakeupCond;
125 }
126 
MonitorMain()127 void CPUMonitor::MonitorMain()
128 {
129     (void)WorkerInit();
130     int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
131     if (ret != 0) {
132         FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
133         return;
134     }
135     for (int i = 0; i < qosMonitorMaxNum; i++) {
136         if (domainInfoMonitor.localinfo[i].nrRunning <= wakeupCond.local[i].low) {
137             Notify(i, TaskNotifyType::TASK_ESCAPED);
138         }
139     }
140     stopMonitor = true;
141 }
142 
IsExceedRunningThreshold(const QoS & qos)143 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
144 {
145     return blockAwareInit && (BlockawareLoadSnapshotNrRunningFast(keyPtr, qos()) > ctrlQueue[qos()].maxConcurrency);
146 }
147 
IsBlockAwareInit(void)148 bool CPUMonitor::IsBlockAwareInit(void)
149 {
150     return blockAwareInit;
151 }
152 #endif
153 
TimeoutCount(const QoS & qos)154 void CPUMonitor::TimeoutCount(const QoS& qos)
155 {
156     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157     std::lock_guard lk(workerCtrl.lock);
158     workerCtrl.sleepingWorkerNum--;
159 }
160 
WakeupSleep(const QoS & qos,bool irqWake)161 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
162 {
163     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
164     std::lock_guard lk(workerCtrl.lock);
165     if (irqWake) {
166         workerCtrl.irqEnable = false;
167     }
168     workerCtrl.sleepingWorkerNum--;
169     workerCtrl.executionNum++;
170 }
171 
TotalCount(const QoS & qos)172 int CPUMonitor::TotalCount(const QoS& qos)
173 {
174     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
175     workerCtrl.lock.lock();
176     int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
177     workerCtrl.lock.unlock();
178     return total;
179 }
180 
RollbackDestroy(const QoS & qos,bool irqWake)181 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
182 {
183     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
184     std::lock_guard lk(workerCtrl.lock);
185     if (irqWake) {
186         workerCtrl.irqEnable = false;
187     }
188     workerCtrl.executionNum++;
189 }
190 
TryDestroy(const QoS & qos)191 bool CPUMonitor::TryDestroy(const QoS& qos)
192 {
193     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
194     std::lock_guard lk(workerCtrl.lock);
195     workerCtrl.sleepingWorkerNum--;
196     return workerCtrl.sleepingWorkerNum > 0;
197 }
198 
SleepingWorkerNum(const QoS & qos)199 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
200 {
201     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
202     std::unique_lock lk(workerCtrl.lock);
203     return workerCtrl.sleepingWorkerNum;
204 }
205 
WakedWorkerNum(const QoS & qos)206 int CPUMonitor::WakedWorkerNum(const QoS& qos)
207 {
208     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
209     std::lock_guard lk(workerCtrl.lock);
210     return workerCtrl.executionNum;
211 }
212 
IntoDeepSleep(const QoS & qos)213 void CPUMonitor::IntoDeepSleep(const QoS& qos)
214 {
215     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
216     std::lock_guard lk(workerCtrl.lock);
217     workerCtrl.deepSleepingWorkerNum++;
218 }
219 
WakeupDeepSleep(const QoS & qos,bool irqWake)220 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
221 {
222     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
223     std::lock_guard lk(workerCtrl.lock);
224     if (irqWake) {
225         workerCtrl.irqEnable = false;
226     }
227     workerCtrl.sleepingWorkerNum--;
228     workerCtrl.deepSleepingWorkerNum--;
229     workerCtrl.executionNum++;
230 }
231 
OutOfPollWait(const QoS & qos)232 void CPUMonitor::OutOfPollWait(const QoS& qos)
233 {
234     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
235     std::lock_guard lk(workerCtrl.lock);
236     workerCtrl.pollWaitFlag = false;
237 }
238 
IsExceedDeepSleepThreshold()239 bool CPUMonitor::IsExceedDeepSleepThreshold()
240 {
241     int totalWorker = 0;
242     int deepSleepingWorkerNum = 0;
243     for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
244         WorkerCtrl& workerCtrl = ctrlQueue[i];
245         std::lock_guard lk(workerCtrl.lock);
246         deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
247         totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
248     }
249     return deepSleepingWorkerNum * 2 > totalWorker;
250 }
251 
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)252 void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
253 {
254     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
255     workerCtrl.lock.lock();
256     size_t runningNum = workerCtrl.executionNum;
257     size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
258 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
259     /* There is no need to update running num when executionNum < maxConcurrency */
260     if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
261         if (blockAwareInit) {
262             auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(keyPtr, qos());
263             if (workerCtrl.executionNum >= nrBlocked) {
264                 /* nrRunning may not be updated in a timely manner */
265                 runningNum = workerCtrl.executionNum - nrBlocked;
266             } else {
267                 FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
268                     qos(), nrBlocked, workerCtrl.executionNum);
269             }
270         }
271     }
272 #endif
273 
274     bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
275         (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
276 
277     if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
278         workerCtrl.lock.unlock();
279         return;
280     }
281     if ((static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
282         workerCtrl.lock.unlock();
283         ops.WakeupWorkers(qos);
284     } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
285         workerCtrl.executionNum++;
286         FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
287         workerCtrl.lock.unlock();
288         ops.IncWorker(qos);
289 #ifdef FFRT_SEND_EVENT
290         if (!((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit))) {
291             constexpr int processNameLen = 1024;
292             static std::once_flag flag;
293             static char processName[processNameLen];
294             std::call_once(flag, []() { GetProcessName(processName, processNameLen); });
295             WorkerEscapeReport(processName, static_cast<int>(qos), totalNum);
296         }
297 #endif
298     } else {
299         if (workerCtrl.pollWaitFlag) {
300             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
301         }
302         workerCtrl.lock.unlock();
303     }
304 }
305 
NotifyWorkers(const QoS & qos,int number)306 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
307 {
308     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
309     workerCtrl.lock.lock();
310 
311     int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
312         (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
313     int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
314     for (int idx = 0; idx < wakeupNumber; idx++) {
315         ops.WakeupWorkers(qos);
316     }
317 
318     int incNumber = std::min(number - wakeupNumber, increasableNumber);
319     for (int idx = 0; idx < incNumber; idx++) {
320         workerCtrl.executionNum++;
321         ops.IncWorker(qos);
322     }
323 
324     workerCtrl.lock.unlock();
325     FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
326 }
327 
328 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * p,TaskNotifyType notifyType)329 void CPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType)
330 {
331     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
332     size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
333     switch (notifyType) {
334         case TaskNotifyType::TASK_ADDED:
335         case TaskNotifyType::TASK_PICKED:
336         case TaskNotifyType::TASK_ESCAPED:
337             if (taskCount > 0) {
338                 monitor->Poke(qos, taskCount, notifyType);
339             }
340             break;
341         case TaskNotifyType::TASK_LOCAL:
342                 monitor->Poke(qos, taskCount, notifyType);
343             break;
344         default:
345             break;
346     }
347 }
348 
349 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * p,TaskNotifyType notifyType)350 void CPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
351 {
352     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
353     int taskCount = monitor->ops.GetTaskCount(qos);
354     if (taskCount == 0) {
355         // no available task in global queue, skip
356         return;
357     }
358     constexpr double thresholdTaskPick = 1.0;
359     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
360     workerCtrl.lock.lock();
361 
362     if (notifyType == TaskNotifyType::TASK_PICKED) {
363         int wakedWorkerCount = workerCtrl.executionNum;
364         double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
365             static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
366         if (remainingLoadRatio <= thresholdTaskPick) {
367             // for task pick, wake worker when load ratio > 1
368             workerCtrl.lock.unlock();
369             return;
370         }
371     }
372 
373     if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
374         if (workerCtrl.sleepingWorkerNum == 0) {
375             FFRT_LOGI("begin to create worker, notifyType[%d]"
376                 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
377                 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
378                 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
379             workerCtrl.executionNum++;
380             workerCtrl.lock.unlock();
381             monitor->ops.IncWorker(qos);
382         } else {
383             workerCtrl.lock.unlock();
384             monitor->ops.WakeupWorkers(qos);
385         }
386     } else {
387         if (workerCtrl.pollWaitFlag) {
388             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
389         }
390         workerCtrl.lock.unlock();
391     }
392 }
393 
HandleTaskNotifyUltraConservative(const QoS & qos,void * p,TaskNotifyType notifyType)394 void CPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
395 {
396     (void)notifyType;
397     CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
398     int taskCount = monitor->ops.GetTaskCount(qos);
399     if (taskCount == 0) {
400         // no available task in global queue, skip
401         return;
402     }
403 
404     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
405     std::lock_guard lock(workerCtrl.lock);
406 
407     int runningNum = workerCtrl.executionNum;
408 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
409     if (monitor->blockAwareInit) {
410         /* nrBlocked may not be updated in a timely manner */
411         auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(monitor->keyPtr, qos());
412         runningNum = workerCtrl.executionNum - nrBlocked;
413         if (!monitor->stopMonitor && taskCount == runningNum) {
414             BlockawareWake();
415             return;
416         }
417     }
418 #endif
419 
420     if (taskCount < runningNum) {
421         return;
422     }
423 
424     if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
425         if (workerCtrl.sleepingWorkerNum == 0) {
426             workerCtrl.executionNum++;
427             monitor->ops.IncWorker(qos);
428         } else {
429             monitor->ops.WakeupWorkers(qos);
430         }
431     }
432 }
433 }