1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32
33 namespace {
34 #if !defined(IDLE_WORKER_DESTRUCT)
35 constexpr int waiting_seconds = 10;
36 #else
37 constexpr int waiting_seconds = 5;
38 #endif
39 }
40
41 namespace ffrt {
42 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
43 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()44 SCPUWorkerManager::SCPUWorkerManager()
45 {
46 monitor = CPUManagerStrategy::CreateCPUMonitor(this);
47 (void)monitor->StartMonitor();
48 }
49
~SCPUWorkerManager()50 SCPUWorkerManager::~SCPUWorkerManager()
51 {
52 tearDown = true;
53 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
54 int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
55 while (try_cnt-- > 0) {
56 pollersMtx[qos].unlock();
57 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
58 {
59 auto& ctl = sleepCtl[qos];
60 std::lock_guard lk(ctl.mutex);
61 sleepCtl[qos].cv.notify_all();
62 }
63 {
64 usleep(1000);
65 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
66 if (groupCtl[qos].threads.empty()) {
67 break;
68 }
69 }
70 }
71
72 if (try_cnt <= 0) {
73 FFRT_LOGE("erase qos[%d] threads failed", qos);
74 }
75 }
76 delete monitor;
77 }
78
WorkerRetiredSimplified(WorkerThread * thread)79 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
80 {
81 pid_t pid = thread->Id();
82 int qos = static_cast<int>(thread->GetQos());
83
84 bool isEmptyQosThreads = false;
85 {
86 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
87 thread->SetExited(true);
88 thread->Detach();
89 auto worker = std::move(groupCtl[qos].threads[thread]);
90 int ret = groupCtl[qos].threads.erase(thread);
91 if (ret != 1) {
92 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
93 }
94 isEmptyQosThreads = groupCtl[qos].threads.empty();
95 WorkerLeaveTg(QoS(qos), pid);
96 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
97 if (IsBlockAwareInit()) {
98 ret = BlockawareUnregister();
99 if (ret != 0) {
100 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
101 }
102 }
103 #endif
104 worker = nullptr;
105 }
106
107 // qos has no worker, start delay worker to monitor task
108 if (isEmptyQosThreads) {
109 std::shared_mutex& exitMtx = GetExitMtx();
110 exitMtx.lock_shared();
111 if (GetExitFlag()) {
112 exitMtx.unlock_shared();
113 return;
114 }
115 FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
116 AddDelayedTask(qos);
117 exitMtx.unlock_shared();
118 }
119 }
120
AddDelayedTask(int qos)121 void SCPUWorkerManager::AddDelayedTask(int qos)
122 {
123 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
124 we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
125 we->cb = ([this, qos](WaitEntry* we) {
126 int taskCount = GetTaskCount(QoS(qos));
127 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
128 bool isEmpty = groupCtl[qos].threads.empty();
129 lck.unlock();
130
131 if (!isEmpty) {
132 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
133 FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
134 return;
135 }
136
137 if (taskCount != 0) {
138 FFRT_LOGI("notify task, qos %d", qos);
139 FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
140 } else {
141 AddDelayedTask(qos);
142 }
143 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
144 });
145
146 if (!DelayedWakeup(we->tp, we, we->cb)) {
147 SimpleAllocator<WaitUntilEntry>::FreeMem(we);
148 FFRT_LOGW("add delyaed task failed, qos %d", qos);
149 }
150 }
151
WorkerIdleAction(const WorkerThread * thread)152 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
153 {
154 if (tearDown) {
155 return WorkerAction::RETIRE;
156 }
157
158 auto& ctl = sleepCtl[thread->GetQos()];
159 std::unique_lock lk(ctl.mutex);
160 monitor->IntoSleep(thread->GetQos());
161 FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
162 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
163 BlockawareEnterSleeping();
164 #endif
165 if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
166 bool taskExistence = GetTaskCount(thread->GetQos());
167 bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
168 (polling_[thread->GetQos()] == 0);
169 return tearDown || taskExistence || needPoll;
170 })) {
171 monitor->WakeupSleep(thread->GetQos());
172 FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
173 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
174 BlockawareLeaveSleeping();
175 #endif
176 return WorkerAction::RETRY;
177 } else {
178 #if !defined(IDLE_WORKER_DESTRUCT)
179 monitor->IntoDeepSleep(thread->GetQos());
180 CoStackFree();
181 if (monitor->IsExceedDeepSleepThreshold()) {
182 ffrt::CoRoutineReleaseMem();
183 }
184 ctl.cv.wait(lk, [this, thread] {
185 return tearDown || GetTaskCount(thread->GetQos()) ||
186 reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
187 reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
188 });
189 monitor->WakeupDeepSleep(thread->GetQos());
190 return WorkerAction::RETRY;
191 #else
192 monitor->TimeoutCount(thread->GetQos());
193 return WorkerAction::RETIRE;
194 #endif
195 }
196 }
197
WorkerPrepare(WorkerThread * thread)198 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
199 {
200 WorkerJoinTg(thread->GetQos(), thread->Id());
201 }
202
WakeupWorkers(const QoS & qos)203 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
204 {
205 if (tearDown) {
206 FFRT_LOGE("CPU Worker Manager exit");
207 return;
208 }
209
210 auto& ctl = sleepCtl[qos()];
211 ctl.cv.notify_one();
212 FFRT_PERF_WORKER_WAKE(static_cast<int>(qos));
213 }
214 } // namespace ffrt
215