1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "co_routine.h"
17 #include <cstdio>
18 #include <cstdlib>
19 #include <cstring>
20 #include <securec.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include "ffrt_trace.h"
24 #include "dm/dependence_manager.h"
25 #include "core/entity.h"
26 #include "tm/queue_task.h"
27 #include "sched/scheduler.h"
28 #include "sync/sync.h"
29 #include "util/slab.h"
30 #include "sched/sched_deadline.h"
31 #include "sync/perf_counter.h"
32 #include "sync/io_poller.h"
33 #include "dfx/bbox/bbox.h"
34 #include "dfx/trace_record/ffrt_trace_record.h"
35 #include "co_routine_factory.h"
36 #include "util/ffrt_facade.h"
37 #ifdef FFRT_TASK_LOCAL_ENABLE
38 #include "pthread_ffrt.h"
39 #endif
40 #ifdef FFRT_ASYNC_STACKTRACE
41 #include "dfx/async_stack/ffrt_async_stack.h"
42 #ifdef FFRT_TASK_LOCAL_ENABLE
43 #include "pthread_ffrt.h"
44 #endif
45 #endif
46 
47 using namespace ffrt;
48 
CoStackCheck(CoRoutine * co)49 static inline void CoStackCheck(CoRoutine* co)
50 {
51     if (unlikely(co->stkMem.magic != STACK_MAGIC)) {
52         FFRT_LOGE("sp offset:%p.\n", co->stkMem.stk +
53             co->stkMem.size - co->ctx.regs[REG_SP]);
54         FFRT_LOGE("stack over flow, check local variable in you tasks or use api 'ffrt_task_attr_set_stack_size'.\n");
55     }
56 }
57 
58 extern pthread_key_t g_executeCtxTlsKey;
59 pthread_key_t g_coThreadTlsKey = 0;
60 pthread_once_t g_coThreadTlsKeyOnce = PTHREAD_ONCE_INIT;
CoEnvDestructor(void * args)61 void CoEnvDestructor(void* args)
62 {
63     auto coEnv = static_cast<CoRoutineEnv*>(args);
64     if (coEnv) {
65         delete coEnv;
66     }
67 }
68 
MakeCoEnvTlsKey()69 void MakeCoEnvTlsKey()
70 {
71     pthread_key_create(&g_coThreadTlsKey, CoEnvDestructor);
72 }
73 
GetCoEnv()74 CoRoutineEnv* GetCoEnv()
75 {
76     CoRoutineEnv* coEnv = nullptr;
77     pthread_once(&g_coThreadTlsKeyOnce, MakeCoEnvTlsKey);
78 
79     void *curTls = pthread_getspecific(g_coThreadTlsKey);
80     if (curTls != nullptr) {
81         coEnv = reinterpret_cast<CoRoutineEnv *>(curTls);
82     } else {
83         coEnv = new CoRoutineEnv();
84         pthread_setspecific(g_coThreadTlsKey, coEnv);
85     }
86     return coEnv;
87 }
88 
89 #ifdef FFRT_TASK_LOCAL_ENABLE
90 namespace {
IsTaskLocalEnable(ffrt::CPUEUTask * task)91 bool IsTaskLocalEnable(ffrt::CPUEUTask* task)
92 {
93     if ((task->type != ffrt_normal_task) || (!task->taskLocal)) {
94         return false;
95     }
96 
97     if (task->tsd == nullptr) {
98         FFRT_LOGE("taskLocal enabled but task tsd invalid");
99         return false;
100     }
101 
102     return true;
103 }
104 
InitWorkerTsdValueToTask(void ** taskTsd)105 void InitWorkerTsdValueToTask(void** taskTsd)
106 {
107     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
108     auto threadTsd = pthread_gettsd();
109     for (const auto& key : updKeyMap) {
110         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
111         auto addr = threadTsd[key];
112         if (addr) {
113             taskTsd[key] = addr;
114         }
115     }
116 }
117 
SwitchTsdAddrToTask(ffrt::CPUEUTask * task)118 void SwitchTsdAddrToTask(ffrt::CPUEUTask* task)
119 {
120     auto threadTsd = pthread_gettsd();
121     task->threadTsd = threadTsd;
122     pthread_settsd(task->tsd);
123 }
124 
SwitchTsdToTask(ffrt::CPUEUTask * task)125 void SwitchTsdToTask(ffrt::CPUEUTask* task)
126 {
127     if (!IsTaskLocalEnable(task)) {
128         return;
129     }
130 
131     InitWorkerTsdValueToTask(task->tsd);
132 
133     SwitchTsdAddrToTask(task);
134 
135     task->runningTid.store(pthread_self());
136     FFRT_LOGD("switch tsd to task Success");
137 }
138 
SwitchTsdAddrToThread(ffrt::CPUEUTask * task)139 bool SwitchTsdAddrToThread(ffrt::CPUEUTask* task)
140 {
141     if (!task->threadTsd) {
142         return false;
143     }
144     pthread_settsd(task->threadTsd);
145     task->threadTsd = nullptr;
146     return true;
147 }
148 
UpdateWorkerTsdValueToThread(void ** taskTsd)149 void UpdateWorkerTsdValueToThread(void** taskTsd)
150 {
151     const pthread_key_t updKeyMap[] = {g_executeCtxTlsKey, g_coThreadTlsKey};
152     auto threadTsd = pthread_gettsd();
153     for (const auto& key : updKeyMap) {
154         FFRT_UNLIKELY_COND_DO_ABORT(key <= 0, "key[%d] invalid", key);
155         auto threadVal = threadTsd[key];
156         auto taskVal = taskTsd[key];
157         if (!threadVal && taskVal) {
158             threadTsd[key] = taskVal;
159         } else {
160             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && taskVal && (threadVal != taskVal)), "mismatch key=[%d]", key);
161             FFRT_UNLIKELY_COND_DO_ABORT((threadVal && !taskVal),
162                                         "unexpected: thread exists but task not exists, key=[%d]", key);
163         }
164         taskTsd[key] = nullptr;
165     }
166 }
167 
SwitchTsdToThread(ffrt::CPUEUTask * task)168 void SwitchTsdToThread(ffrt::CPUEUTask* task)
169 {
170     if (!IsTaskLocalEnable(task)) {
171         return;
172     }
173 
174     if (!SwitchTsdAddrToThread(task)) {
175         return;
176     }
177 
178     UpdateWorkerTsdValueToThread(task->tsd);
179 
180     task->runningTid.store(0);
181     FFRT_LOGD("switch tsd to thread Success");
182 }
183 
TaskTsdRunDtors(ffrt::CPUEUTask * task)184 void TaskTsdRunDtors(ffrt::CPUEUTask* task)
185 {
186     SwitchTsdAddrToTask(task);
187     pthread_tsd_run_dtors();
188     SwitchTsdAddrToThread(task);
189 }
190 } // namespace
191 
TaskTsdDeconstruct(ffrt::CPUEUTask * task)192 void TaskTsdDeconstruct(ffrt::CPUEUTask* task)
193 {
194     if (!IsTaskLocalEnable(task)) {
195         return;
196     }
197 
198     TaskTsdRunDtors(task);
199     if (task->tsd != nullptr) {
200         free(task->tsd);
201         task->tsd = nullptr;
202         task->taskLocal = false;
203     }
204     FFRT_LOGD("tsd deconstruct done, task[%lu], name[%s]", task->gid, task->label.c_str());
205 }
206 #endif
207 
CoSwitch(CoCtx * from,CoCtx * to)208 static inline void CoSwitch(CoCtx* from, CoCtx* to)
209 {
210     co2_switch_context(from, to);
211 }
212 
CoExit(CoRoutine * co,bool isNormalTask)213 static inline void CoExit(CoRoutine* co, bool isNormalTask)
214 {
215 #ifdef FFRT_TASK_LOCAL_ENABLE
216     if (isNormalTask) {
217         SwitchTsdToThread(co->task);
218     }
219 #endif
220     CoStackCheck(co);
221     CoSwitch(&co->ctx, &co->thEnv->schCtx);
222 }
223 
CoStartEntry(void * arg)224 static inline void CoStartEntry(void* arg)
225 {
226     CoRoutine* co = reinterpret_cast<CoRoutine*>(arg);
227     ffrt::CPUEUTask* task = co->task;
228     bool isNormalTask = false;
229     switch (task->type) {
230         case ffrt_normal_task: {
231             isNormalTask = true;
232             task->Execute();
233             break;
234         }
235         case ffrt_queue_task: {
236             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
237             // Before the batch execution is complete, head node cannot be released.
238             sTask->IncDeleteRef();
239             sTask->Execute();
240             sTask->DecDeleteRef();
241             break;
242         }
243         default: {
244             FFRT_LOGE("CoStart unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
245             break;
246         }
247     }
248 
249     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
250     CoExit(co, isNormalTask);
251 }
252 
CoSetStackProt(CoRoutine * co,int prot)253 static void CoSetStackProt(CoRoutine* co, int prot)
254 {
255     /* set the attribute of the page table closest to the stack top in the user stack to read-only,
256      * and 1~2 page table space will be wasted
257      */
258     size_t p_size = getpagesize();
259     uint64_t mp = reinterpret_cast<uint64_t>(co->stkMem.stk);
260     mp = (mp + p_size - 1) / p_size * p_size;
261     int ret = mprotect(reinterpret_cast<void *>(static_cast<uintptr_t>(mp)), p_size, prot);
262     FFRT_UNLIKELY_COND_DO_ABORT(ret < 0, "coroutine size:%lu, mp:0x%lx, page_size:%zu,result:%d,prot:%d, err:%d,%s",
263                                 static_cast<unsigned long>(sizeof(struct CoRoutine)), static_cast<unsigned long>(mp),
264                                 p_size, ret, prot, errno, strerror(errno));
265 }
266 
AllocNewCoRoutine(size_t stackSize)267 static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
268 {
269     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
270     CoRoutine* co = nullptr;
271     if (likely(stackSize == defaultStackSize)) {
272         co = ffrt::CoRoutineAllocMem(stackSize);
273     } else {
274         co = static_cast<CoRoutine*>(mmap(nullptr, stackSize,
275             PROT_READ | PROT_WRITE,  MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
276         if (co == reinterpret_cast<CoRoutine*>(MAP_FAILED)) {
277             FFRT_LOGE("memory mmap failed.");
278             return nullptr;
279         }
280     }
281     if (!co) {
282         FFRT_LOGE("memory not enough");
283         return nullptr;
284     }
285     co->allocatedSize = stackSize;
286     co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
287     co->stkMem.magic = STACK_MAGIC;
288     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
289         CoSetStackProt(co, PROT_READ);
290     }
291     co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
292     return co;
293 }
294 
CoMemFree(CoRoutine * co)295 static inline void CoMemFree(CoRoutine* co)
296 {
297     if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
298         CoSetStackProt(co, PROT_WRITE | PROT_READ);
299     }
300     std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
301     if (likely(co->allocatedSize == defaultStackSize)) {
302         ffrt::CoRoutineFreeMem(co);
303     } else {
304         int ret = munmap(co, co->allocatedSize);
305         if (ret != 0) {
306             FFRT_LOGE("munmap failed with errno: %d", errno);
307         }
308     }
309 }
310 
CoStackFree(void)311 void CoStackFree(void)
312 {
313     if (GetCoEnv()) {
314         if (GetCoEnv()->runningCo) {
315             CoMemFree(GetCoEnv()->runningCo);
316             GetCoEnv()->runningCo = nullptr;
317         }
318     }
319 }
320 
CoWorkerExit(void)321 void CoWorkerExit(void)
322 {
323     CoStackFree();
324 }
325 
BindNewCoRoutione(ffrt::CPUEUTask * task)326 static inline void BindNewCoRoutione(ffrt::CPUEUTask* task)
327 {
328     task->coRoutine = GetCoEnv()->runningCo;
329     task->coRoutine->task = task;
330     task->coRoutine->thEnv = GetCoEnv();
331 }
332 
UnbindCoRoutione(ffrt::CPUEUTask * task)333 static inline void UnbindCoRoutione(ffrt::CPUEUTask* task)
334 {
335     task->coRoutine->task = nullptr;
336     task->coRoutine = nullptr;
337 }
338 
CoAlloc(ffrt::CPUEUTask * task)339 static inline int CoAlloc(ffrt::CPUEUTask* task)
340 {
341     if (task->coRoutine) { // use allocated coroutine stack
342         if (GetCoEnv()->runningCo) { // free cached stack if it exist
343             CoMemFree(GetCoEnv()->runningCo);
344         }
345         GetCoEnv()->runningCo = task->coRoutine;
346     } else {
347         if (!GetCoEnv()->runningCo) { // if no cached stack, alloc one
348             GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
349         } else { // exist cached stack
350             if (GetCoEnv()->runningCo->allocatedSize != task->stack_size) { // stack size not match, alloc one
351                 CoMemFree(GetCoEnv()->runningCo); // free cached stack
352                 GetCoEnv()->runningCo = AllocNewCoRoutine(task->stack_size);
353             }
354         }
355     }
356     return 0;
357 }
358 
359 // call CoCreat when task creat
CoCreat(ffrt::CPUEUTask * task)360 static inline int CoCreat(ffrt::CPUEUTask* task)
361 {
362     CoAlloc(task);
363     if (GetCoEnv()->runningCo == nullptr) {
364         return -1;
365     }
366     BindNewCoRoutione(task);
367     auto co = task->coRoutine;
368     if (co->status.load() == static_cast<int>(CoStatus::CO_UNINITIALIZED)) {
369         co2_init_context(&co->ctx, CoStartEntry, static_cast<void*>(co), co->stkMem.stk, co->stkMem.size);
370     }
371     return 0;
372 }
373 
CoSwitchInTransaction(ffrt::CPUEUTask * task)374 static inline void CoSwitchInTransaction(ffrt::CPUEUTask* task)
375 {
376     if (task->coRoutine->status == static_cast<int>(CoStatus::CO_NOT_FINISH)) {
377         for (auto& name : task->traceTag) {
378             FFRT_TRACE_BEGIN(name.c_str());
379         }
380     }
381     FFRT_FAKE_TRACE_MARKER(task->gid);
382 }
383 
CoSwitchOutTransaction(ffrt::CPUEUTask * task)384 static inline void CoSwitchOutTransaction(ffrt::CPUEUTask* task)
385 {
386     FFRT_FAKE_TRACE_MARKER(task->gid);
387     int traceTagNum = static_cast<int>(task->traceTag.size());
388     for (int i = 0; i < traceTagNum; ++i) {
389         FFRT_TRACE_END();
390     }
391 }
392 
393 // called by thread work
CoStart(ffrt::CPUEUTask * task,CoRoutineEnv * coRoutineEnv)394 int CoStart(ffrt::CPUEUTask* task, CoRoutineEnv* coRoutineEnv)
395 {
396     if (task->coRoutine) {
397         int ret = task->coRoutine->status.exchange(static_cast<int>(CoStatus::CO_RUNNING));
398         if (ret == static_cast<int>(CoStatus::CO_RUNNING) && GetBboxEnableState() != 0) {
399             FFRT_LOGE("executed by worker suddenly, ignore backtrace");
400             return 0;
401         }
402     }
403 
404     if (CoCreat(task) != 0) {
405         return -1;
406     }
407     auto co = task->coRoutine;
408 
409     FFRTTraceRecord::TaskRun(task->GetQos(), task);
410 
411     for (;;) {
412         ffrt::TaskLoadTracking::Begin(task);
413 #ifdef FFRT_ASYNC_STACKTRACE
414         FFRTSetStackId(task->stackId);
415 #endif
416         FFRT_TASK_BEGIN(task->label, task->gid);
417         if (task->type == ffrt_normal_task) {
418             task->UpdateState(ffrt::TaskState::RUNNING);
419         }
420         CoSwitchInTransaction(task);
421 #ifdef FFRT_TASK_LOCAL_ENABLE
422         SwitchTsdToTask(co->task);
423 #endif
424         CoSwitch(&co->thEnv->schCtx, &co->ctx);
425         FFRT_TASK_END();
426         ffrt::TaskLoadTracking::End(task); // Todo: deal with CoWait()
427         CoStackCheck(co);
428 
429         // 1. coroutine task done, exit normally, need to exec next coroutine task
430         if (co->isTaskDone) {
431             task->UpdateState(ffrt::TaskState::EXITED);
432             co->isTaskDone = false;
433             return 0;
434         }
435 
436         // 2. couroutine task block, switch to thread
437         // need suspend the coroutine task or continue to execute the coroutine task.
438         auto pending = coRoutineEnv->pending;
439         if (pending == nullptr) {
440             return 0;
441         }
442         coRoutineEnv->pending = nullptr;
443         FFRTTraceRecord::TaskCoSwitchOut(task);
444         // Fast path: skip state transition
445         if ((*pending)(task)) {
446             // The ownership of the task belongs to other host(cv/mutex/epoll etc)
447             // And the task cannot be accessed any more.
448             return 0;
449         }
450         FFRT_WAKE_TRACER(task->gid); // fast path wk
451         coRoutineEnv->runningCo = co;
452     }
453     return 0;
454 }
455 
456 // called by thread work
CoYield(void)457 void CoYield(void)
458 {
459     CoRoutine* co = static_cast<CoRoutine*>(GetCoEnv()->runningCo);
460     co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH));
461     GetCoEnv()->runningCo = nullptr;
462     CoSwitchOutTransaction(co->task);
463     if (co->task->type == ffrt_normal_task) {
464         co->task->UpdateState(ffrt::TaskState::BLOCKED);
465     }
466     FFRT_BLOCK_MARKER(co->task->gid);
467 #ifdef FFRT_TASK_LOCAL_ENABLE
468     SwitchTsdToThread(co->task);
469 #endif
470     CoStackCheck(co);
471     CoSwitch(&co->ctx, &GetCoEnv()->schCtx);
472     while (GetBboxEnableState() != 0) {
473         if (GetBboxEnableState() != gettid()) {
474             BboxFreeze(); // freeze non-crash thread
475             return;
476         }
477         const int IGNORE_DEPTH = 3;
478         backtrace(IGNORE_DEPTH);
479         co->status.store(static_cast<int>(CoStatus::CO_NOT_FINISH)); // recovery to old state
480         CoExit(co, co->task->type == ffrt_normal_task);
481     }
482 }
483 
CoWait(const std::function<bool (ffrt::CPUEUTask *)> & pred)484 void CoWait(const std::function<bool(ffrt::CPUEUTask*)>& pred)
485 {
486     GetCoEnv()->pending = &pred;
487     CoYield();
488 }
489 
CoWake(ffrt::CPUEUTask * task,bool timeOut)490 void CoWake(ffrt::CPUEUTask* task, bool timeOut)
491 {
492     if (task == nullptr) {
493         FFRT_LOGE("task is nullptr");
494         return;
495     }
496     // Fast path: state transition without lock
497     task->wakeupTimeOut = timeOut;
498     FFRT_WAKE_TRACER(task->gid);
499     switch (task->type) {
500         case ffrt_normal_task: {
501             task->UpdateState(ffrt::TaskState::READY);
502             break;
503         }
504         case ffrt_queue_task: {
505             QueueTask* sTask = reinterpret_cast<QueueTask*>(task);
506             auto handle = sTask->GetHandler();
507             handle->TransferTask(sTask);
508             break;
509         }
510         default: {
511             FFRT_LOGE("CoWake unsupport task[%lu], type=%d, name[%s]", task->gid, task->type, task->label.c_str());
512             break;
513         }
514     }
515 }
516 
Instance()517 CoRoutineFactory &CoRoutineFactory::Instance()
518 {
519     static CoRoutineFactory fac;
520     return fac;
521 }
522