1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32 
33 namespace {
34 #if !defined(IDLE_WORKER_DESTRUCT)
35 constexpr int waiting_seconds = 10;
36 #else
37 constexpr int waiting_seconds = 5;
38 #endif
39 }
40 
41 namespace ffrt {
42 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
43 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()44 SCPUWorkerManager::SCPUWorkerManager()
45 {
46     monitor = CPUManagerStrategy::CreateCPUMonitor(this);
47     (void)monitor->StartMonitor();
48 }
49 
~SCPUWorkerManager()50 SCPUWorkerManager::~SCPUWorkerManager()
51 {
52     tearDown = true;
53     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
54         int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
55         while (try_cnt-- > 0) {
56             pollersMtx[qos].unlock();
57             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
58             {
59                 auto& ctl = sleepCtl[qos];
60                 std::lock_guard lk(ctl.mutex);
61                 sleepCtl[qos].cv.notify_all();
62             }
63             {
64                 usleep(1000);
65                 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
66                 if (groupCtl[qos].threads.empty()) {
67                     break;
68                 }
69             }
70         }
71 
72         if (try_cnt <= 0) {
73             FFRT_LOGE("erase qos[%d] threads failed", qos);
74         }
75     }
76     delete monitor;
77 }
78 
WorkerRetiredSimplified(WorkerThread * thread)79 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
80 {
81     pid_t pid = thread->Id();
82     int qos = static_cast<int>(thread->GetQos());
83 
84     bool isEmptyQosThreads = false;
85     {
86         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
87         thread->SetExited(true);
88         thread->Detach();
89         auto worker = std::move(groupCtl[qos].threads[thread]);
90         int ret = groupCtl[qos].threads.erase(thread);
91         if (ret != 1) {
92             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
93         }
94         isEmptyQosThreads = groupCtl[qos].threads.empty();
95         WorkerLeaveTg(QoS(qos), pid);
96 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
97         if (IsBlockAwareInit()) {
98             ret = BlockawareUnregister();
99             if (ret != 0) {
100                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
101             }
102         }
103 #endif
104         worker = nullptr;
105     }
106 
107     // qos has no worker, start delay worker to monitor task
108     if (isEmptyQosThreads) {
109         std::shared_mutex& exitMtx = GetExitMtx();
110         exitMtx.lock_shared();
111         if (GetExitFlag()) {
112             exitMtx.unlock_shared();
113             return;
114         }
115         FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
116         AddDelayedTask(qos);
117         exitMtx.unlock_shared();
118     }
119 }
120 
AddDelayedTask(int qos)121 void SCPUWorkerManager::AddDelayedTask(int qos)
122 {
123     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
124     we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
125     we->cb = ([this, qos](WaitEntry* we) {
126         int taskCount = GetTaskCount(QoS(qos));
127         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
128         bool isEmpty = groupCtl[qos].threads.empty();
129         lck.unlock();
130 
131         if (!isEmpty) {
132             SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
133             FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
134             return;
135         }
136 
137         if (taskCount != 0) {
138             FFRT_LOGI("notify task, qos %d", qos);
139             FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
140         } else {
141             AddDelayedTask(qos);
142         }
143         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
144     });
145 
146     if (!DelayedWakeup(we->tp, we, we->cb)) {
147         SimpleAllocator<WaitUntilEntry>::FreeMem(we);
148         FFRT_LOGW("add delyaed task failed, qos %d", qos);
149     }
150 }
151 
WorkerIdleAction(const WorkerThread * thread)152 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
153 {
154     if (tearDown) {
155         return WorkerAction::RETIRE;
156     }
157 
158     auto& ctl = sleepCtl[thread->GetQos()];
159     std::unique_lock lk(ctl.mutex);
160     monitor->IntoSleep(thread->GetQos());
161     FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
162 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
163     BlockawareEnterSleeping();
164 #endif
165     if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
166         bool taskExistence = GetTaskCount(thread->GetQos());
167         bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
168             (polling_[thread->GetQos()] == 0);
169         return tearDown || taskExistence || needPoll;
170         })) {
171         monitor->WakeupSleep(thread->GetQos());
172         FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
173 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
174         BlockawareLeaveSleeping();
175 #endif
176         return WorkerAction::RETRY;
177     } else {
178 #if !defined(IDLE_WORKER_DESTRUCT)
179         monitor->IntoDeepSleep(thread->GetQos());
180         CoStackFree();
181         if (monitor->IsExceedDeepSleepThreshold()) {
182             ffrt::CoRoutineReleaseMem();
183         }
184         ctl.cv.wait(lk, [this, thread] {
185             return tearDown || GetTaskCount(thread->GetQos()) ||
186             reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
187             reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
188             });
189         monitor->WakeupDeepSleep(thread->GetQos());
190         return WorkerAction::RETRY;
191 #else
192         monitor->TimeoutCount(thread->GetQos());
193         return WorkerAction::RETIRE;
194 #endif
195     }
196 }
197 
WorkerPrepare(WorkerThread * thread)198 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
199 {
200     WorkerJoinTg(thread->GetQos(), thread->Id());
201 }
202 
WakeupWorkers(const QoS & qos)203 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
204 {
205     if (tearDown) {
206         FFRT_LOGE("CPU Worker Manager exit");
207         return;
208     }
209 
210     auto& ctl = sleepCtl[qos()];
211     ctl.cv.notify_one();
212     FFRT_PERF_WORKER_WAKE(static_cast<int>(qos));
213 }
214 } // namespace ffrt
215