1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_strategy.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #include "dfx/perf/ffrt_perf.h"
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #include "util/ffrt_facade.h"
28 #include "tm/cpu_task.h"
29 #include "tm/queue_task.h"
30 #ifdef FFRT_ASYNC_STACKTRACE
31 #include "dfx/async_stack/ffrt_async_stack.h"
32 #endif
33 #include "eu/cpuworker_manager.h"
34 namespace {
35 int PLACE_HOLDER = 0;
36 const unsigned int TRY_POLL_FREQ = 51;
37 constexpr int CO_CREATE_RETRY_INTERVAL = 500 * 1000;
38 }
39
40 namespace ffrt {
Run(CPUEUTask * task,CoRoutineEnv * coRoutineEnv,CPUWorker * worker)41 void CPUWorker::Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker)
42 {
43 if constexpr(USE_COROUTINE) {
44 while (CoStart(task, coRoutineEnv) != 0) {
45 usleep(CO_CREATE_RETRY_INTERVAL);
46 }
47 return;
48 }
49
50 switch (task->type) {
51 case ffrt_normal_task: {
52 #ifdef FFRT_ASYNC_STACKTRACE
53 FFRTSetStackId(task->stackId);
54 #endif
55 task->Execute();
56 break;
57 }
58 case ffrt_queue_task: {
59 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
60 #ifdef FFRT_ASYNC_STACKTRACE
61 FFRTSetStackId(sTask->stackId);
62 #endif
63 sTask->IncDeleteRef();
64 sTask->Execute();
65 sTask->DecDeleteRef();
66 break;
67 }
68 default: {
69 FFRT_LOGE("run unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
70 break;
71 }
72 }
73 }
74
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)75 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
76 {
77 if (task == nullptr) {
78 FFRT_LOGE("task is nullptr");
79 return;
80 }
81 ffrt_executor_task_func func = nullptr;
82 ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
83 if (type == ffrt_io_task) {
84 func = FuncManager::Instance()->getFunc(ffrt_io_task);
85 } else {
86 func = FuncManager::Instance()->getFunc(ffrt_uv_task);
87 }
88 if (func == nullptr) {
89 FFRT_LOGE("Static func is nullptr");
90 return;
91 }
92 FFRTTraceRecord::TaskExecute<ffrt_uv_task>(qos);
93 FFRT_EXECUTOR_TASK_BEGIN(task);
94 func(task, qos);
95 FFRT_EXECUTOR_TASK_END();
96 if (type != ffrt_io_task) {
97 FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
98 }
99 FFRTTraceRecord::TaskDone<ffrt_uv_task>(qos);
100 }
101
WrapDispatch(void * worker)102 void* CPUWorker::WrapDispatch(void* worker)
103 {
104 reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
105 Dispatch(reinterpret_cast<CPUWorker*>(worker));
106 return nullptr;
107 }
108
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker)109 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
110 {
111 ExecuteCtx* ctx = ExecuteCtx::Cur();
112 CoRoutineEnv* coRoutineEnv = GetCoEnv();
113 RunTask(curtask, worker, ctx, coRoutineEnv);
114 }
115
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,ExecuteCtx * ctx,CoRoutineEnv * coRoutineEnv)116 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv)
117 {
118 CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
119 worker->curTask = task;
120 worker->curTaskType_ = task->type;
121 switch (curtask->type) {
122 case ffrt_normal_task:
123 case ffrt_queue_task: {
124 #ifdef WORKER_CACHE_TASKNAMEID
125 worker->curTaskLabel_ = task->label;
126 worker->curTaskGid_ = task->gid;
127 #endif
128 ctx->task = task;
129 ctx->lastGid_ = task->gid;
130 Run(task, coRoutineEnv, worker);
131 ctx->task = nullptr;
132 break;
133 }
134 default: {
135 ctx->exec_task = curtask;
136 Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
137 ctx->exec_task = nullptr;
138 break;
139 }
140 }
141 worker->curTask = nullptr;
142 worker->curTaskType_ = ffrt_invalid_task;
143 }
144
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker)145 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker)
146 {
147 RunTask(task, worker);
148
149 unsigned int lifoCount = 0;
150 while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
151 lifoCount++;
152 ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
153 // set a placeholder to prevent the task from being placed in the priority again
154 worker->priority_task = (lifoCount > worker->budget) ? &PLACE_HOLDER : nullptr;
155
156 RunTask(priorityTask, worker);
157 }
158 }
159
GetTask(CPUWorker * worker)160 void* CPUWorker::GetTask(CPUWorker* worker)
161 {
162 #ifdef FFRT_LOCAL_QUEUE_ENABLE
163 // periodically pick up tasks from the global queue to prevent global queue starvation
164 if (worker->tick % worker->global_interval == 0) {
165 worker->tick = 0;
166 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
167 // the worker is not notified when the task attribute is set not to notify worker
168 if (task != nullptr) {
169 if (task->type == ffrt_normal_task && !task->notifyWorker_) {
170 task->notifyWorker_ = true;
171 return task;
172 }
173 worker->ops.NotifyTaskPicked(worker);
174 }
175 return task;
176 }
177
178 // preferentially pick up tasks from the priority unless the priority is empty or occupied
179 if (worker->priority_task != nullptr) {
180 void* task = worker->priority_task;
181 worker->priority_task = nullptr;
182 if (task != &PLACE_HOLDER) {
183 return task;
184 }
185 }
186
187 return worker->localFifo.PopHead();
188 #else
189 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
190 if (task != nullptr) {
191 worker->ops.NotifyTaskPicked(worker);
192 }
193
194 return task;
195 #endif
196 }
197
TryPoll(CPUWorker * worker,int timeout)198 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
199 {
200 PollerRet ret = worker->ops.TryPoll(worker, timeout);
201 if (ret == PollerRet::RET_TIMER) {
202 worker->tick = 0;
203 }
204
205 return ret;
206 }
207
LocalEmpty(CPUWorker * worker)208 bool CPUWorker::LocalEmpty(CPUWorker* worker)
209 {
210 return ((worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0));
211 }
212
Dispatch(CPUWorker * worker)213 void CPUWorker::Dispatch(CPUWorker* worker)
214 {
215 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
216 if (worker->ops.IsBlockAwareInit()) {
217 int ret = BlockawareRegister(worker->GetDomainId());
218 if (ret != 0) {
219 FFRT_LOGE("blockaware register fail, ret[%d]", ret);
220 }
221 }
222 #endif
223 auto ctx = ExecuteCtx::Cur();
224 ctx->localFifo = &(worker->localFifo);
225 ctx->priority_task_ptr = &(worker->priority_task);
226 ctx->qos = worker->GetQos();
227
228 worker->ops.WorkerPrepare(worker);
229 #ifndef OHOS_STANDARD_SYSTEM
230 FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
231 #endif
232 FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
233 worker->ops.WorkerLooper(worker);
234 CoWorkerExit();
235 worker->ops.WorkerRetired(worker);
236 }
237
238 // work looper which inherited from history
WorkerLooperDefault(WorkerThread * p)239 void CPUWorker::WorkerLooperDefault(WorkerThread* p)
240 {
241 CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
242 for (;;) {
243 // get task in the order of priority -> local queue -> global queue
244 void* local_task = GetTask(worker);
245 worker->tick++;
246 if (local_task) {
247 if (worker->tick % TRY_POLL_FREQ == 0) {
248 worker->ops.TryPoll(worker, 0);
249 }
250 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
251 RunTaskLifo(work, worker);
252 continue;
253 }
254
255 PollerRet ret = TryPoll(worker, 0);
256 if (ret != PollerRet::RET_NULL) {
257 continue;
258 }
259
260 #ifdef FFRT_LOCAL_QUEUE_ENABLE
261 // pick up tasks from global queue
262 CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
263 // the worker is not notified when the task attribute is set not to notify worker
264 if (task != nullptr) {
265 if (task->type == ffrt_normal_task && !task->notifyWorker_) {
266 task->notifyWorker_ = true;
267 } else {
268 worker->ops.NotifyTaskPicked(worker);
269 }
270 ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
271 RunTask(work, worker);
272 continue;
273 }
274
275 // check the epoll status again to prevent fd or timer events from being missed
276 ret = TryPoll(worker, 0);
277 if (ret != PollerRet::RET_NULL) {
278 continue;
279 }
280
281 if (worker->localFifo.GetLength() == 0) {
282 worker->ops.StealTaskBatch(worker);
283 }
284
285 if (!LocalEmpty(worker)) {
286 worker->tick = 1;
287 continue;
288 }
289 #endif
290
291 // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
292 // only one worker enters this state at a QoS level
293 ret = TryPoll(worker, -1);
294 if (ret != PollerRet::RET_NULL) {
295 continue;
296 }
297
298 auto action = worker->ops.WaitForNewAction(worker);
299 if (action == WorkerAction::RETRY) {
300 worker->tick = 0;
301 continue;
302 } else if (action == WorkerAction::RETIRE) {
303 break;
304 }
305 }
306 }
307 } // namespace ffrt
308