1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/cpu_monitor.h"
17 #include <iostream>
18 #include <thread>
19 #include <climits>
20 #include <unistd.h>
21 #include <securec.h>
22 #include "sched/scheduler.h"
23 #include "eu/execute_unit.h"
24 #include "dfx/log/ffrt_log_api.h"
25 #include "dfx/sysevent/sysevent.h"
26 #include "dfx/trace_record/ffrt_trace_record.h"
27 #include "internal_inc/config.h"
28 #include "internal_inc/osal.h"
29 #include "util/name_manager.h"
30 #include "sync/poller.h"
31 #include "util/ffrt_facade.h"
32 #include "util/spmc_queue.h"
33
34 namespace {
35 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
36 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
37 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
38 constexpr int JITTER_DELAY_MS = 5;
39 #endif
40 }
41
42 namespace ffrt {
CPUMonitor(CpuMonitorOps && ops)43 CPUMonitor::CPUMonitor(CpuMonitorOps&& ops) : ops(ops)
44 {
45 SetupMonitor();
46 }
47
~CPUMonitor()48 CPUMonitor::~CPUMonitor()
49 {
50 if (monitorThread != nullptr) {
51 monitorThread->join();
52 }
53 delete monitorThread;
54 monitorThread = nullptr;
55 }
56
SetupMonitor()57 void CPUMonitor::SetupMonitor()
58 {
59 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
60 ctrlQueue[qos].hardLimit = DEFAULT_HARDLIMIT;
61 ctrlQueue[qos].maxConcurrency = GlobalConfig::Instance().getCpuWorkerNum(qos);
62 setWorkerMaxNum[qos] = false;
63 }
64 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
65 memset_s(&domainInfoMonitor, sizeof(domainInfoMonitor), 0, sizeof(domainInfoMonitor));
66 wakeupCond.check_ahead = false;
67 wakeupCond.global.low = 0;
68 wakeupCond.global.high = 0;
69 for (int i = 0; i < BLOCKAWARE_DOMAIN_ID_MAX + 1; i++) {
70 wakeupCond.local[i].low = 0;
71 if (i < qosMonitorMaxNum) {
72 wakeupCond.local[i].high = UINT_MAX;
73 wakeupCond.global.low += wakeupCond.local[i].low;
74 wakeupCond.global.high = UINT_MAX;
75 } else {
76 wakeupCond.local[i].high = 0;
77 }
78 }
79 #endif
80 }
81
StartMonitor()82 void CPUMonitor::StartMonitor()
83 {
84 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
85 int ret = BlockawareInit(&keyPtr);
86 if (ret != 0) {
87 FFRT_LOGE("blockaware init fail, ret[%d], key[0x%lx]", ret, keyPtr);
88 } else {
89 blockAwareInit = true;
90 }
91 #else
92 monitorThread = nullptr;
93 #endif
94 }
95
SetWorkerMaxNum(const QoS & qos,int num)96 int CPUMonitor::SetWorkerMaxNum(const QoS& qos, int num)
97 {
98 WorkerCtrl& workerCtrl = ctrlQueue[qos()];
99 workerCtrl.lock.lock();
100 if (setWorkerMaxNum[qos()]) {
101 FFRT_LOGE("qos[%d] worker num can only been setup once", qos());
102 workerCtrl.lock.unlock();
103 return -1;
104 }
105 if (num <= 0 || num > QOS_WORKER_MAXNUM) {
106 FFRT_LOGE("qos[%d] worker num[%d] is invalid.", qos(), num);
107 workerCtrl.lock.unlock();
108 return -1;
109 }
110 workerCtrl.hardLimit = num;
111 setWorkerMaxNum[qos()] = true;
112 workerCtrl.lock.unlock();
113 return 0;
114 }
115
GetMonitorTid() const116 uint32_t CPUMonitor::GetMonitorTid() const
117 {
118 return monitorTid;
119 }
120
121 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
WakeupCond(void)122 BlockawareWakeupCond* CPUMonitor::WakeupCond(void)
123 {
124 return &wakeupCond;
125 }
126
MonitorMain()127 void CPUMonitor::MonitorMain()
128 {
129 (void)WorkerInit();
130 int ret = BlockawareLoadSnapshot(keyPtr, &domainInfoMonitor);
131 if (ret != 0) {
132 FFRT_LOGE("blockaware load snapshot fail, ret[%d]", ret);
133 return;
134 }
135 for (int i = 0; i < qosMonitorMaxNum; i++) {
136 if (domainInfoMonitor.localinfo[i].nrRunning <= wakeupCond.local[i].low) {
137 Notify(i, TaskNotifyType::TASK_ESCAPED);
138 }
139 }
140 stopMonitor = true;
141 }
142
IsExceedRunningThreshold(const QoS & qos)143 bool CPUMonitor::IsExceedRunningThreshold(const QoS& qos)
144 {
145 return blockAwareInit && (BlockawareLoadSnapshotNrRunningFast(keyPtr, qos()) > ctrlQueue[qos()].maxConcurrency);
146 }
147
IsBlockAwareInit(void)148 bool CPUMonitor::IsBlockAwareInit(void)
149 {
150 return blockAwareInit;
151 }
152 #endif
153
TimeoutCount(const QoS & qos)154 void CPUMonitor::TimeoutCount(const QoS& qos)
155 {
156 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157 std::lock_guard lk(workerCtrl.lock);
158 workerCtrl.sleepingWorkerNum--;
159 }
160
WakeupSleep(const QoS & qos,bool irqWake)161 void CPUMonitor::WakeupSleep(const QoS& qos, bool irqWake)
162 {
163 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
164 std::lock_guard lk(workerCtrl.lock);
165 if (irqWake) {
166 workerCtrl.irqEnable = false;
167 }
168 workerCtrl.sleepingWorkerNum--;
169 workerCtrl.executionNum++;
170 }
171
TotalCount(const QoS & qos)172 int CPUMonitor::TotalCount(const QoS& qos)
173 {
174 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
175 workerCtrl.lock.lock();
176 int total = workerCtrl.sleepingWorkerNum + workerCtrl.executionNum;
177 workerCtrl.lock.unlock();
178 return total;
179 }
180
RollbackDestroy(const QoS & qos,bool irqWake)181 void CPUMonitor::RollbackDestroy(const QoS& qos, bool irqWake)
182 {
183 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
184 std::lock_guard lk(workerCtrl.lock);
185 if (irqWake) {
186 workerCtrl.irqEnable = false;
187 }
188 workerCtrl.executionNum++;
189 }
190
TryDestroy(const QoS & qos)191 bool CPUMonitor::TryDestroy(const QoS& qos)
192 {
193 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
194 std::lock_guard lk(workerCtrl.lock);
195 workerCtrl.sleepingWorkerNum--;
196 return workerCtrl.sleepingWorkerNum > 0;
197 }
198
SleepingWorkerNum(const QoS & qos)199 int CPUMonitor::SleepingWorkerNum(const QoS& qos)
200 {
201 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
202 std::unique_lock lk(workerCtrl.lock);
203 return workerCtrl.sleepingWorkerNum;
204 }
205
WakedWorkerNum(const QoS & qos)206 int CPUMonitor::WakedWorkerNum(const QoS& qos)
207 {
208 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
209 std::lock_guard lk(workerCtrl.lock);
210 return workerCtrl.executionNum;
211 }
212
IntoDeepSleep(const QoS & qos)213 void CPUMonitor::IntoDeepSleep(const QoS& qos)
214 {
215 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
216 std::lock_guard lk(workerCtrl.lock);
217 workerCtrl.deepSleepingWorkerNum++;
218 }
219
WakeupDeepSleep(const QoS & qos,bool irqWake)220 void CPUMonitor::WakeupDeepSleep(const QoS& qos, bool irqWake)
221 {
222 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
223 std::lock_guard lk(workerCtrl.lock);
224 if (irqWake) {
225 workerCtrl.irqEnable = false;
226 }
227 workerCtrl.sleepingWorkerNum--;
228 workerCtrl.deepSleepingWorkerNum--;
229 workerCtrl.executionNum++;
230 }
231
OutOfPollWait(const QoS & qos)232 void CPUMonitor::OutOfPollWait(const QoS& qos)
233 {
234 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
235 std::lock_guard lk(workerCtrl.lock);
236 workerCtrl.pollWaitFlag = false;
237 }
238
IsExceedDeepSleepThreshold()239 bool CPUMonitor::IsExceedDeepSleepThreshold()
240 {
241 int totalWorker = 0;
242 int deepSleepingWorkerNum = 0;
243 for (unsigned int i = 0; i < static_cast<unsigned int>(QoS::Max()); i++) {
244 WorkerCtrl& workerCtrl = ctrlQueue[i];
245 std::lock_guard lk(workerCtrl.lock);
246 deepSleepingWorkerNum += workerCtrl.deepSleepingWorkerNum;
247 totalWorker += workerCtrl.executionNum + workerCtrl.sleepingWorkerNum;
248 }
249 return deepSleepingWorkerNum * 2 > totalWorker;
250 }
251
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)252 void CPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
253 {
254 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
255 workerCtrl.lock.lock();
256 size_t runningNum = workerCtrl.executionNum;
257 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
258 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
259 /* There is no need to update running num when executionNum < maxConcurrency */
260 if (workerCtrl.executionNum >= workerCtrl.maxConcurrency) {
261 if (blockAwareInit) {
262 auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(keyPtr, qos());
263 if (workerCtrl.executionNum >= nrBlocked) {
264 /* nrRunning may not be updated in a timely manner */
265 runningNum = workerCtrl.executionNum - nrBlocked;
266 } else {
267 FFRT_LOGE("qos [%d] nrBlocked [%u] is larger than executionNum [%d].",
268 qos(), nrBlocked, workerCtrl.executionNum);
269 }
270 }
271 }
272 #endif
273
274 bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
275 (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
276
277 if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
278 workerCtrl.lock.unlock();
279 return;
280 }
281 if ((static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
282 workerCtrl.lock.unlock();
283 ops.WakeupWorkers(qos);
284 } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
285 workerCtrl.executionNum++;
286 FFRTTraceRecord::WorkRecord((int)qos, workerCtrl.executionNum);
287 workerCtrl.lock.unlock();
288 ops.IncWorker(qos);
289 #ifdef FFRT_SEND_EVENT
290 if (!((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit))) {
291 constexpr int processNameLen = 1024;
292 static std::once_flag flag;
293 static char processName[processNameLen];
294 std::call_once(flag, []() { GetProcessName(processName, processNameLen); });
295 WorkerEscapeReport(processName, static_cast<int>(qos), totalNum);
296 }
297 #endif
298 } else {
299 if (workerCtrl.pollWaitFlag) {
300 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
301 }
302 workerCtrl.lock.unlock();
303 }
304 }
305
NotifyWorkers(const QoS & qos,int number)306 void CPUMonitor::NotifyWorkers(const QoS& qos, int number)
307 {
308 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
309 workerCtrl.lock.lock();
310
311 int increasableNumber = static_cast<int>(workerCtrl.maxConcurrency) -
312 (workerCtrl.executionNum + workerCtrl.sleepingWorkerNum);
313 int wakeupNumber = std::min(number, workerCtrl.sleepingWorkerNum);
314 for (int idx = 0; idx < wakeupNumber; idx++) {
315 ops.WakeupWorkers(qos);
316 }
317
318 int incNumber = std::min(number - wakeupNumber, increasableNumber);
319 for (int idx = 0; idx < incNumber; idx++) {
320 workerCtrl.executionNum++;
321 ops.IncWorker(qos);
322 }
323
324 workerCtrl.lock.unlock();
325 FFRT_LOGD("qos[%d] inc [%d] workers, wakeup [%d] workers", static_cast<int>(qos), incNumber, wakeupNumber);
326 }
327
328 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * p,TaskNotifyType notifyType)329 void CPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* p, TaskNotifyType notifyType)
330 {
331 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
332 size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
333 switch (notifyType) {
334 case TaskNotifyType::TASK_ADDED:
335 case TaskNotifyType::TASK_PICKED:
336 case TaskNotifyType::TASK_ESCAPED:
337 if (taskCount > 0) {
338 monitor->Poke(qos, taskCount, notifyType);
339 }
340 break;
341 case TaskNotifyType::TASK_LOCAL:
342 monitor->Poke(qos, taskCount, notifyType);
343 break;
344 default:
345 break;
346 }
347 }
348
349 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * p,TaskNotifyType notifyType)350 void CPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
351 {
352 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
353 int taskCount = monitor->ops.GetTaskCount(qos);
354 if (taskCount == 0) {
355 // no available task in global queue, skip
356 return;
357 }
358 constexpr double thresholdTaskPick = 1.0;
359 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
360 workerCtrl.lock.lock();
361
362 if (notifyType == TaskNotifyType::TASK_PICKED) {
363 int wakedWorkerCount = workerCtrl.executionNum;
364 double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
365 static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
366 if (remainingLoadRatio <= thresholdTaskPick) {
367 // for task pick, wake worker when load ratio > 1
368 workerCtrl.lock.unlock();
369 return;
370 }
371 }
372
373 if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
374 if (workerCtrl.sleepingWorkerNum == 0) {
375 FFRT_LOGI("begin to create worker, notifyType[%d]"
376 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
377 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
378 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
379 workerCtrl.executionNum++;
380 workerCtrl.lock.unlock();
381 monitor->ops.IncWorker(qos);
382 } else {
383 workerCtrl.lock.unlock();
384 monitor->ops.WakeupWorkers(qos);
385 }
386 } else {
387 if (workerCtrl.pollWaitFlag) {
388 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
389 }
390 workerCtrl.lock.unlock();
391 }
392 }
393
HandleTaskNotifyUltraConservative(const QoS & qos,void * p,TaskNotifyType notifyType)394 void CPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* p, TaskNotifyType notifyType)
395 {
396 (void)notifyType;
397 CPUMonitor* monitor = reinterpret_cast<CPUMonitor*>(p);
398 int taskCount = monitor->ops.GetTaskCount(qos);
399 if (taskCount == 0) {
400 // no available task in global queue, skip
401 return;
402 }
403
404 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
405 std::lock_guard lock(workerCtrl.lock);
406
407 int runningNum = workerCtrl.executionNum;
408 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
409 if (monitor->blockAwareInit) {
410 /* nrBlocked may not be updated in a timely manner */
411 auto nrBlocked = BlockawareLoadSnapshotNrBlockedFast(monitor->keyPtr, qos());
412 runningNum = workerCtrl.executionNum - nrBlocked;
413 if (!monitor->stopMonitor && taskCount == runningNum) {
414 BlockawareWake();
415 return;
416 }
417 }
418 #endif
419
420 if (taskCount < runningNum) {
421 return;
422 }
423
424 if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
425 if (workerCtrl.sleepingWorkerNum == 0) {
426 workerCtrl.executionNum++;
427 monitor->ops.IncWorker(qos);
428 } else {
429 monitor->ops.WakeupWorkers(qos);
430 }
431 }
432 }
433 }