1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "ffrt_trace.h"
24 #include "dm/dependence_manager.h"
25 #include "core/entity.h"
26 #include "tm/queue_task.h"
27 #include "sched/scheduler.h"
28 #include "sync/sync.h"
29 #include "util/slab.h"
30 #include "sched/sched_deadline.h"
31 #include "sync/perf_counter.h"
32 #include "sync/io_poller.h"
33 #include "dfx/bbox/bbox.h"
34 #include "dfx/trace_record/ffrt_trace_record.h"
35 #include "co_routine_factory.h"
36 #include "util/ffrt_facade.h"
37 #ifdef FFRT_TASK_LOCAL_ENABLE
38 #include "pthread_ffrt.h"
39 #endif
40 #ifdef FFRT_ASYNC_STACKTRACE
41 #include "dfx/async_stack/ffrt_async_stack.h"
42 #ifdef FFRT_TASK_LOCAL_ENABLE
43 #include "pthread_ffrt.h"
44 #endif
45 #endif
46
47 using namespace ffrt;
48
CoStackCheck(CoRoutine * co)49 static inline void CoStackCheck(CoRoutine* co)
50 {
51 if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
52 FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
53 co->stkMem.size - co->ctx.regs[REG_SP]);
54 FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
55 }
56 }
57
58 extern pthread_key_t g_executeCtxTlsKey;
59 pthread_key_t g_coThreadTlsKey = 0;
60 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)61 void CoEnvDestructor(void* args)
62 {
63 auto coEnv = static_cast<CoRoutineEnv*>(args);
64 if (coEnv) {
65 delete coEnv;
66 }
67 }
68
MakeCoEnvTlsKey()69 void MakeCoEnvTlsKey()
70 {
71 pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
72 }
73
GetCoEnv()74 CoRoutineEnv* GetCoEnv()
75 {
76 CoRoutineEnv* coEnv = nullptr;
77 pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
78
79 void *curTls = pthread_getspecific(g_coThreadTlsKey);
80 if (curTls != nullptr) {
81 coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
82 } else {
83 coEnv = new CoRoutineEnv();
84 pthread_setspecific(g_coThreadTlsKey, coEnv);
85 }
86 return coEnv;
87 }
88
89 #ifdef FFRT_TASK_LOCAL_ENABLE
90 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)91 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
92 {
93 if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
94 return false;
95 }
96
97 if (task->tsd == nullptr) {
98 FFRT_LOGE("taskLocal enabled but task tsd invalid");
99 return false;
100 }
101
102 return true;
103 }
104
InitWorkerTsdValueToTask(void ** taskTsd)105 void InitWorkerTsdValueToTask(void** taskTsd)
106 {
107 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
108 auto threadTsd = pthread_gettsd();
109 for (const auto& key : updKeyMap) {
110 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
111 auto addr = threadTsd[key];
112 if (addr) {
113 taskTsd[key] = addr;
114 }
115 }
116 }
117
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)118 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
119 {
120 auto threadTsd = pthread_gettsd();
121 task->threadTsd = threadTsd;
122 pthread_settsd(task->tsd);
123 }
124
SwitchTsdToTask(ffrt::CPUEUTask * task)125 void SwitchTsdToTask(ffrt::CPUEUTask* task)
126 {
127 if (!IsTaskLocalEnable(task)) {
128 return;
129 }
130
131 InitWorkerTsdValueToTask(task->tsd);
132
133 SwitchTsdAddrToTask(task);
134
135 task->runningTid.store(pthread_self());
136 FFRT_LOGD("switch tsd to task Success");
137 }
138
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)139 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
140 {
141 if (!task->threadTsd) {
142 return false;
143 }
144 pthread_settsd(task->threadTsd);
145 task->threadTsd = nullptr;
146 return true;
147 }
148
UpdateWorkerTsdValueToThread(void ** taskTsd)149 void UpdateWorkerTsdValueToThread(void** taskTsd)
150 {
151 const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
152 auto threadTsd = pthread_gettsd();
153 for (const auto& key : updKeyMap) {
154 FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
155 auto threadVal = threadTsd[key];
156 auto taskVal = taskTsd[key];
157 if (!threadVal && taskVal) {
158 threadTsd[key] = taskVal;
159 } else {
160 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)), "mismatch key=[%d]", key);
161 FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
162 "unexpected: thread exists but task not exists, key=[%d]", key);
163 }
164 taskTsd[key] = nullptr;
165 }
166 }
167
SwitchTsdToThread(ffrt::CPUEUTask * task)168 void SwitchTsdToThread(ffrt::CPUEUTask* task)
169 {
170 if (!IsTaskLocalEnable(task)) {
171 return;
172 }
173
174 if (!SwitchTsdAddrToThread(task)) {
175 return;
176 }
177
178 UpdateWorkerTsdValueToThread(task->tsd);
179
180 task->runningTid.store(0);
181 FFRT_LOGD("switch tsd to thread Success");
182 }
183
TaskTsdRunDtors(ffrt::CPUEUTask * task)184 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
185 {
186 SwitchTsdAddrToTask(task);
187 pthread_tsd_run_dtors();
188 SwitchTsdAddrToThread(task);
189 }
190 } // namespace
191
TaskTsdDeconstruct(ffrt::CPUEUTask * task)192 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
193 {
194 if (!IsTaskLocalEnable(task)) {
195 return;
196 }
197
198 TaskTsdRunDtors(task);
199 if (task->tsd != nullptr) {
200 free(task->tsd);
201 task->tsd = nullptr;
202 task->taskLocal = false;
203 }
204 FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
205 }
206 #endif
207
CoSwitch(CoCtx * from,CoCtx * to)208 static inline void CoSwitch(CoCtx* from, CoCtx* to)
209 {
210 co2_switch_context(from, to);
211 }
212
CoExit(CoRoutine * co,bool isNormalTask)213 static inline void CoExit(CoRoutine* co, bool isNormalTask)
214 {
215 #ifdef FFRT_TASK_LOCAL_ENABLE
216 if (isNormalTask) {
217 SwitchTsdToThread(co->task);
218 }
219 #endif
220 CoStackCheck(co);
221 CoSwitch(&co->ctx, &co->thEnv->schCtx);
222 }
223
CoStartEntry(void * arg)224 static inline void CoStartEntry(void* arg)
225 {
226 CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
227 ffrt::CPUEUTask* task = co->task;
228 bool isNormalTask = false;
229 switch (task->type) {
230 case ffrt_normal_task: {
231 isNormalTask = true;
232 task->Execute();
233 break;
234 }
235 case ffrt_queue_task: {
236 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
237 // Before the batch execution is complete, head node cannot be released.
238 sTask->IncDeleteRef();
239 sTask->Execute();
240 sTask->DecDeleteRef();
241 break;
242 }
243 default: {
244 FFRT_LOGE("CoStart unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
245 break;
246 }
247 }
248
249 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
250 CoExit(co, isNormalTask);
251 }
252
CoSetStackProt(CoRoutine * co,int prot)253 static void CoSetStackProt(CoRoutine* co, int prot)
254 {
255 /* set the attribute of the page table closest to the stack top in the user stack to read-only,
256 * and 1~2 page table space will be wasted
257 */
258 size_t p_size = getpagesize();
259 uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
260 mp = (mp + p_size - 1) / p_size * p_size;
261 int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
262 FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
263 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
264 p_size, ret, prot, errno, strerror(errno));
265 }
266
AllocNewCoRoutine(size_t stackSize)267 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
268 {
269 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
270 CoRoutine* co = nullptr;
271 if (likely(stackSize == defaultStackSize)) {
272 co = ffrt::CoRoutineAllocMem(stackSize);
273 } else {
274 co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
275 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
276 if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
277 FFRT_LOGE("memory mmap failed.");
278 return nullptr;
279 }
280 }
281 if (!co) {
282 FFRT_LOGE("memory not enough");
283 return nullptr;
284 }
285 co->allocatedSize = stackSize;
286 co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
287 co->stkMem.magic = STACK_MAGIC;
288 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
289 CoSetStackProt(co, PROT_READ);
290 }
291 co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
292 return co;
293 }
294
CoMemFree(CoRoutine * co)295 static inline void CoMemFree(CoRoutine* co)
296 {
297 if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
298 CoSetStackProt(co, PROT_WRITE | PROT_READ);
299 }
300 std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
301 if (likely(co->allocatedSize == defaultStackSize)) {
302 ffrt::CoRoutineFreeMem(co);
303 } else {
304 int ret = munmap(co, co->allocatedSize);
305 if (ret != 0) {
306 FFRT_LOGE("munmap failed with errno: %d", errno);
307 }
308 }
309 }
310
CoStackFree(void)311 void CoStackFree(void)
312 {
313 if (GetCoEnv()) {
314 if (GetCoEnv()->runningCo) {
315 CoMemFree(GetCoEnv()->runningCo);
316 GetCoEnv()->runningCo = nullptr;
317 }
318 }
319 }
320
CoWorkerExit(void)321 void CoWorkerExit(void)
322 {
323 CoStackFree();
324 }
325
BindNewCoRoutione(ffrt::CPUEUTask * task)326 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
327 {
328 task->coRoutine = GetCoEnv()->runningCo;
329 task->coRoutine->task = task;
330 task->coRoutine->thEnv = GetCoEnv();
331 }
332
UnbindCoRoutione(ffrt::CPUEUTask * task)333 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
334 {
335 task->coRoutine->task = nullptr;
336 task->coRoutine = nullptr;
337 }
338
CoAlloc(ffrt::CPUEUTask * task)339 static inline int CoAlloc(ffrt::CPUEUTask* task)
340 {
341 if (task->coRoutine) { // use allocated coroutine stack
342 if (GetCoEnv()->runningCo) { // free cached stack if it exist
343 CoMemFree(GetCoEnv()->runningCo);
344 }
345 GetCoEnv()->runningCo = task->coRoutine;
346 } else {
347 if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
348 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
349 } else { // exist cached stack
350 if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
351 CoMemFree(GetCoEnv()->runningCo); // free cached stack
352 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
353 }
354 }
355 }
356 return 0;
357 }
358
359 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)360 static inline int CoCreat(ffrt::CPUEUTask* task)
361 {
362 CoAlloc(task);
363 if (GetCoEnv()->runningCo == nullptr) {
364 return -1;
365 }
366 BindNewCoRoutione(task);
367 auto co = task->coRoutine;
368 if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
369 co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
370 }
371 return 0;
372 }
373
CoSwitchInTransaction(ffrt::CPUEUTask * task)374 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
375 {
376 if (task->coRoutine->status == static_cast<int>(CoStatus::CO_NOT_FINISH)) {
377 for (auto& name : task->traceTag) {
378 FFRT_TRACE_BEGIN(name.c_str());
379 }
380 }
381 FFRT_FAKE_TRACE_MARKER(task->gid);
382 }
383
CoSwitchOutTransaction(ffrt::CPUEUTask * task)384 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
385 {
386 FFRT_FAKE_TRACE_MARKER(task->gid);
387 int traceTagNum = static_cast<int>(task->traceTag.size());
388 for (int i = 0; i < traceTagNum; ++i) {
389 FFRT_TRACE_END();
390 }
391 }
392
393 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)394 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
395 {
396 if (task->coRoutine) {
397 int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
398 if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
399 FFRT_LOGE("executed by worker suddenly, ignore backtrace");
400 return 0;
401 }
402 }
403
404 if (CoCreat(task) != 0) {
405 return -1;
406 }
407 auto co = task->coRoutine;
408
409 FFRTTraceRecord::TaskRun(task->GetQos(), task);
410
411 for (;;) {
412 ffrt::TaskLoadTracking::Begin(task);
413 #ifdef FFRT_ASYNC_STACKTRACE
414 FFRTSetStackId(task->stackId);
415 #endif
416 FFRT_TASK_BEGIN(task->label, task->gid);
417 if (task->type == ffrt_normal_task) {
418 task->UpdateState(ffrt::TaskState::RUNNING);
419 }
420 CoSwitchInTransaction(task);
421 #ifdef FFRT_TASK_LOCAL_ENABLE
422 SwitchTsdToTask(co->task);
423 #endif
424 CoSwitch(&co->thEnv->schCtx, &co->ctx);
425 FFRT_TASK_END();
426 ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
427 CoStackCheck(co);
428
429 // 1. coroutine task done, exit normally, need to exec next coroutine task
430 if (co->isTaskDone) {
431 task->UpdateState(ffrt::TaskState::EXITED);
432 co->isTaskDone = false;
433 return 0;
434 }
435
436 // 2. couroutine task block, switch to thread
437 // need suspend the coroutine task or continue to execute the coroutine task.
438 auto pending = coRoutineEnv->pending;
439 if (pending == nullptr) {
440 return 0;
441 }
442 coRoutineEnv->pending = nullptr;
443 FFRTTraceRecord::TaskCoSwitchOut(task);
444 // Fast path: skip state transition
445 if ((*pending)(task)) {
446 // The ownership of the task belongs to other host(cv/mutex/epoll etc)
447 // And the task cannot be accessed any more.
448 return 0;
449 }
450 FFRT_WAKE_TRACER(task->gid); // fast path wk
451 coRoutineEnv->runningCo = co;
452 }
453 return 0;
454 }
455
456 // called by thread work
CoYield(void)457 void CoYield(void)
458 {
459 CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
460 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
461 GetCoEnv()->runningCo = nullptr;
462 CoSwitchOutTransaction(co->task);
463 if (co->task->type == ffrt_normal_task) {
464 co->task->UpdateState(ffrt::TaskState::BLOCKED);
465 }
466 FFRT_BLOCK_MARKER(co->task->gid);
467 #ifdef FFRT_TASK_LOCAL_ENABLE
468 SwitchTsdToThread(co->task);
469 #endif
470 CoStackCheck(co);
471 CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
472 while (GetBboxEnableState() != 0) {
473 if (GetBboxEnableState() != gettid()) {
474 BboxFreeze(); // freeze non-crash thread
475 return;
476 }
477 const int IGNORE_DEPTH = 3;
478 backtrace(IGNORE_DEPTH);
479 co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
480 CoExit(co, co->task->type == ffrt_normal_task);
481 }
482 }
483
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)484 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
485 {
486 GetCoEnv()->pending = &pred;
487 CoYield();
488 }
489
CoWake(ffrt::CPUEUTask * task,bool timeOut)490 void CoWake(ffrt::CPUEUTask* task, bool timeOut)
491 {
492 if (task == nullptr) {
493 FFRT_LOGE("task is nullptr");
494 return;
495 }
496 // Fast path: state transition without lock
497 task->wakeupTimeOut = timeOut;
498 FFRT_WAKE_TRACER(task->gid);
499 switch (task->type) {
500 case ffrt_normal_task: {
501 task->UpdateState(ffrt::TaskState::READY);
502 break;
503 }
504 case ffrt_queue_task: {
505 QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
506 auto handle = sTask->GetHandler();
507 handle->TransferTask(sTask);
508 break;
509 }
510 default: {
511 FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
512 break;
513 }
514 }
515 }
516
Instance()517 CoRoutineFactory &CoRoutineFactory::Instance()
518 {
519 static CoRoutineFactory fac;
520 return fac;
521 }
522