1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cpu_worker.h"
17 #include "eu/worker_thread.h"
18 #include "ffrt_trace.h"
19 #include "sched/scheduler.h"
20 #include "eu/cpu_manager_strategy.h"
21 #include "dfx/bbox/bbox.h"
22 #include "eu/func_manager.h"
23 #include "dm/dependence_manager.h"
24 #include "dfx/perf/ffrt_perf.h"
25 #include "sync/poller.h"
26 #include "util/spmc_queue.h"
27 #include "util/ffrt_facade.h"
28 #include "tm/cpu_task.h"
29 #include "tm/queue_task.h"
30 #ifdef FFRT_ASYNC_STACKTRACE
31 #include "dfx/async_stack/ffrt_async_stack.h"
32 #endif
33 #include "eu/cpuworker_manager.h"
34 namespace {
35 int PLACE_HOLDER = 0;
36 const unsigned int TRY_POLL_FREQ = 51;
37 constexpr int CO_CREATE_RETRY_INTERVAL = 500 * 1000;
38 }
39 
40 namespace ffrt {
Run(CPUEUTask * task,CoRoutineEnv * coRoutineEnv,CPUWorker * worker)41 void CPUWorker::Run(CPUEUTask* task, CoRoutineEnv* coRoutineEnv, CPUWorker* worker)
42 {
43     if constexpr(USE_COROUTINE) {
44         while (CoStart(task, coRoutineEnv) != 0) {
45             usleep(CO_CREATE_RETRY_INTERVAL);
46         }
47         return;
48     }
49 
50     switch (task->type) {
51         case ffrt_normal_task: {
52 #ifdef FFRT_ASYNC_STACKTRACE
53             FFRTSetStackId(task->stackId);
54 #endif
55             task->Execute();
56             break;
57         }
58         case ffrt_queue_task: {
59             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
60 #ifdef FFRT_ASYNC_STACKTRACE
61             FFRTSetStackId(sTask->stackId);
62 #endif
63             sTask->IncDeleteRef();
64             sTask->Execute();
65             sTask->DecDeleteRef();
66             break;
67         }
68         default: {
69             FFRT_LOGE("run unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
70             break;
71         }
72     }
73 }
74 
Run(ffrt_executor_task_t * task,ffrt_qos_t qos)75 void CPUWorker::Run(ffrt_executor_task_t* task, ffrt_qos_t qos)
76 {
77     if (task == nullptr) {
78         FFRT_LOGE("task is nullptr");
79         return;
80     }
81     ffrt_executor_task_func func = nullptr;
82     ffrt_executor_task_type_t type = static_cast<ffrt_executor_task_type_t>(task->type);
83     if (type == ffrt_io_task) {
84         func = FuncManager::Instance()->getFunc(ffrt_io_task);
85     } else {
86         func = FuncManager::Instance()->getFunc(ffrt_uv_task);
87     }
88     if (func == nullptr) {
89         FFRT_LOGE("Static func is nullptr");
90         return;
91     }
92     FFRTTraceRecord::TaskExecute<ffrt_uv_task>(qos);
93     FFRT_EXECUTOR_TASK_BEGIN(task);
94     func(task, qos);
95     FFRT_EXECUTOR_TASK_END();
96     if (type != ffrt_io_task) {
97         FFRT_EXECUTOR_TASK_FINISH_MARKER(task); // task finish marker for uv task
98     }
99     FFRTTraceRecord::TaskDone<ffrt_uv_task>(qos);
100 }
101 
WrapDispatch(void * worker)102 void* CPUWorker::WrapDispatch(void* worker)
103 {
104     reinterpret_cast<CPUWorker*>(worker)->NativeConfig();
105     Dispatch(reinterpret_cast<CPUWorker*>(worker));
106     return nullptr;
107 }
108 
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker)109 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker)
110 {
111     ExecuteCtx* ctx = ExecuteCtx::Cur();
112     CoRoutineEnv* coRoutineEnv = GetCoEnv();
113     RunTask(curtask, worker, ctx, coRoutineEnv);
114 }
115 
RunTask(ffrt_executor_task_t * curtask,CPUWorker * worker,ExecuteCtx * ctx,CoRoutineEnv * coRoutineEnv)116 void CPUWorker::RunTask(ffrt_executor_task_t* curtask, CPUWorker* worker, ExecuteCtx* ctx, CoRoutineEnv* coRoutineEnv)
117 {
118     CPUEUTask* task = reinterpret_cast<CPUEUTask*>(curtask);
119     worker->curTask = task;
120     worker->curTaskType_ = task->type;
121     switch (curtask->type) {
122         case ffrt_normal_task:
123         case ffrt_queue_task: {
124 #ifdef WORKER_CACHE_TASKNAMEID
125             worker->curTaskLabel_ = task->label;
126             worker->curTaskGid_ = task->gid;
127 #endif
128             ctx->task = task;
129             ctx->lastGid_ = task->gid;
130             Run(task, coRoutineEnv, worker);
131             ctx->task = nullptr;
132             break;
133         }
134         default: {
135             ctx->exec_task = curtask;
136             Run(curtask, static_cast<ffrt_qos_t>(worker->GetQos()));
137             ctx->exec_task = nullptr;
138             break;
139         }
140     }
141     worker->curTask = nullptr;
142     worker->curTaskType_ = ffrt_invalid_task;
143 }
144 
RunTaskLifo(ffrt_executor_task_t * task,CPUWorker * worker)145 void CPUWorker::RunTaskLifo(ffrt_executor_task_t* task, CPUWorker* worker)
146 {
147     RunTask(task, worker);
148 
149     unsigned int lifoCount = 0;
150     while (worker->priority_task != nullptr && worker->priority_task != &PLACE_HOLDER) {
151         lifoCount++;
152         ffrt_executor_task_t* priorityTask = reinterpret_cast<ffrt_executor_task_t*>(worker->priority_task);
153         // set a placeholder to prevent the task from being placed in the priority again
154         worker->priority_task = (lifoCount > worker->budget) ? &PLACE_HOLDER : nullptr;
155 
156         RunTask(priorityTask, worker);
157     }
158 }
159 
GetTask(CPUWorker * worker)160 void* CPUWorker::GetTask(CPUWorker* worker)
161 {
162 #ifdef FFRT_LOCAL_QUEUE_ENABLE
163     // periodically pick up tasks from the global queue to prevent global queue starvation
164     if (worker->tick % worker->global_interval == 0) {
165         worker->tick = 0;
166         CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
167         // the worker is not notified when the task attribute is set not to notify worker
168         if (task != nullptr) {
169             if (task->type == ffrt_normal_task && !task->notifyWorker_) {
170                 task->notifyWorker_ = true;
171                 return task;
172             }
173             worker->ops.NotifyTaskPicked(worker);
174         }
175         return task;
176     }
177 
178     // preferentially pick up tasks from the priority unless the priority is empty or occupied
179     if (worker->priority_task != nullptr) {
180         void* task = worker->priority_task;
181         worker->priority_task = nullptr;
182         if (task != &PLACE_HOLDER) {
183             return task;
184         }
185     }
186 
187     return worker->localFifo.PopHead();
188 #else
189     CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
190     if (task != nullptr) {
191         worker->ops.NotifyTaskPicked(worker);
192     }
193 
194     return task;
195 #endif
196 }
197 
TryPoll(CPUWorker * worker,int timeout)198 PollerRet CPUWorker::TryPoll(CPUWorker* worker, int timeout)
199 {
200     PollerRet ret = worker->ops.TryPoll(worker, timeout);
201     if (ret == PollerRet::RET_TIMER) {
202         worker->tick = 0;
203     }
204 
205     return ret;
206 }
207 
LocalEmpty(CPUWorker * worker)208 bool CPUWorker::LocalEmpty(CPUWorker* worker)
209 {
210     return ((worker->priority_task == nullptr) && (worker->localFifo.GetLength() == 0));
211 }
212 
Dispatch(CPUWorker * worker)213 void CPUWorker::Dispatch(CPUWorker* worker)
214 {
215 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
216     if (worker->ops.IsBlockAwareInit()) {
217         int ret = BlockawareRegister(worker->GetDomainId());
218         if (ret != 0) {
219             FFRT_LOGE("blockaware register fail, ret[%d]", ret);
220         }
221     }
222 #endif
223     auto ctx = ExecuteCtx::Cur();
224     ctx->localFifo = &(worker->localFifo);
225     ctx->priority_task_ptr = &(worker->priority_task);
226     ctx->qos = worker->GetQos();
227 
228     worker->ops.WorkerPrepare(worker);
229 #ifndef OHOS_STANDARD_SYSTEM
230     FFRT_LOGI("qos[%d] thread start succ", static_cast<int>(worker->GetQos()));
231 #endif
232     FFRT_PERF_WORKER_AWAKE(static_cast<int>(worker->GetQos()));
233     worker->ops.WorkerLooper(worker);
234     CoWorkerExit();
235     worker->ops.WorkerRetired(worker);
236 }
237 
238 // work looper which inherited from history
WorkerLooperDefault(WorkerThread * p)239 void CPUWorker::WorkerLooperDefault(WorkerThread* p)
240 {
241     CPUWorker* worker = reinterpret_cast<CPUWorker*>(p);
242     for (;;) {
243         // get task in the order of priority -> local queue -> global queue
244         void* local_task = GetTask(worker);
245         worker->tick++;
246         if (local_task) {
247             if (worker->tick % TRY_POLL_FREQ == 0) {
248                 worker->ops.TryPoll(worker, 0);
249             }
250             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(local_task);
251             RunTaskLifo(work, worker);
252             continue;
253         }
254 
255         PollerRet ret = TryPoll(worker, 0);
256         if (ret != PollerRet::RET_NULL) {
257             continue;
258         }
259 
260 #ifdef FFRT_LOCAL_QUEUE_ENABLE
261         // pick up tasks from global queue
262         CPUEUTask* task = worker->ops.PickUpTaskBatch(worker);
263         // the worker is not notified when the task attribute is set not to notify worker
264         if (task != nullptr) {
265             if (task->type == ffrt_normal_task && !task->notifyWorker_) {
266                 task->notifyWorker_ = true;
267             } else {
268                 worker->ops.NotifyTaskPicked(worker);
269             }
270             ffrt_executor_task_t* work = reinterpret_cast<ffrt_executor_task_t*>(task);
271             RunTask(work, worker);
272             continue;
273         }
274 
275         // check the epoll status again to prevent fd or timer events from being missed
276         ret = TryPoll(worker, 0);
277         if (ret != PollerRet::RET_NULL) {
278             continue;
279         }
280 
281         if (worker->localFifo.GetLength() == 0) {
282             worker->ops.StealTaskBatch(worker);
283         }
284 
285         if (!LocalEmpty(worker)) {
286             worker->tick = 1;
287             continue;
288         }
289 #endif
290 
291         // enable a worker to enter the epoll wait -1 state and continuously listen to fd or timer events
292         // only one worker enters this state at a QoS level
293         ret = TryPoll(worker, -1);
294         if (ret != PollerRet::RET_NULL) {
295             continue;
296         }
297 
298         auto action = worker->ops.WaitForNewAction(worker);
299         if (action == WorkerAction::RETRY) {
300             worker->tick = 0;
301             continue;
302         } else if (action == WorkerAction::RETIRE) {
303             break;
304         }
305     }
306 }
307 } // namespace ffrt
308