1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "tm/queue_task.h"
19 #include "util/worker_monitor.h"
20 #include "util/ffrt_facade.h"
21 #include "util/slab.h"
22 
23 #ifdef FFRT_ASYNC_STACKTRACE
24 #include "dfx/async_stack/ffrt_async_stack.h"
25 #endif
26 
27 namespace ffrt {
28 
SDependenceManager()29 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
30 {
31     // control construct sequences of singletons
32 #ifdef FFRT_OH_TRACE_ENABLE
33     TraceAdapter::Instance();
34 #endif
35     SimpleAllocator<CPUEUTask>::Instance();
36     SimpleAllocator<SCPUEUTask>::Instance();
37     SimpleAllocator<QueueTask>::Instance();
38     SimpleAllocator<VersionCtx>::Instance();
39     SimpleAllocator<WaitUntilEntry>::Instance();
40     CoStackAttr::Instance();
41     PollerProxy::Instance();
42     FFRTScheduler::Instance();
43     ExecuteUnit::Instance();
44     TaskState::RegisterOps(TaskState::EXITED,
45         [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
46 
47 #ifdef FFRT_WORKER_MONITOR
48     WorkerMonitor::GetInstance();
49 #endif
50 #ifdef FFRT_OH_TRACE_ENABLE
51     _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
52     _FinishTrace(HITRACE_TAG_FFRT);
53 #endif
54     QueueMonitor::GetInstance();
55     GetIOPoller();
56     DelayedWorker::GetInstance();
57 }
58 
~SDependenceManager()59 SDependenceManager::~SDependenceManager()
60 {
61 }
62 
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)63 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
64     std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
65 {
66     // signature去重:1)outs去重
67     if (outs) {
68         OutsDedup(outsNoDup, outs);
69     }
70 
71     // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
72     if (ins) {
73         InsDedup(in_handles, insNoDup, outsNoDup, ins);
74     }
75 }
76 
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)77 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
78     const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
79 {
80     // 0 check outs handle
81     if (!CheckOutsHandle(outs)) {
82         FFRT_LOGE("outs contain handles error");
83         return;
84     }
85 
86     // 1 Init eu and scheduler
87     auto ctx = ExecuteCtx::Cur();
88 
89     // 2 Get current task's parent
90     auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
91 
92     // 2.1 Create task ctx
93     SCPUEUTask* task = nullptr;
94     {
95         task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
96             static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
97         new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
98     }
99     FFRT_TRACE_BEGIN(("submit|" + std::to_string(task->gid)).c_str());
100 #ifdef FFRT_ASYNC_STACKTRACE
101     {
102         task->stackId = FFRTCollectAsyncStack();
103     }
104 #endif
105     QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
106     FFRTTraceRecord::TaskSubmit<ffrt_normal_task>(qos, &(task->createTime), &(task->fromTid));
107 
108     std::vector<const void*> insNoDup;
109     std::vector<const void*> outsNoDup;
110     RemoveRepeatedDeps(task->in_handles, ins, outs, insNoDup, outsNoDup);
111 
112 #ifdef FFRT_OH_WATCHDOG_ENABLE
113     if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
114         task->isWatchdogEnable = true;
115         AddTaskToWatchdog(task->gid);
116         SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
117     }
118 #endif
119     if (has_handle) {
120         task->IncDeleteRef();
121         handle = static_cast<ffrt_task_handle_t>(task);
122         outsNoDup.push_back(handle); // handle作为任务的输出signature
123     }
124     task->SetQos(qos);
125     /* The parent's number of subtasks to be completed increases by one,
126         * and decreases by one after the subtask is completed
127         */
128     task->IncChildRef();
129 
130     std::vector<std::pair<VersionCtx*, NestType>> inDatas;
131     std::vector<std::pair<VersionCtx*, NestType>> outDatas;
132 
133     if (!(insNoDup.empty() && outsNoDup.empty())) {
134         // 3 Put the submitted task into Entity
135         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
136 
137         MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
138 
139         {
140             // 3.1 Process input dependencies
141             for (auto& i : std::as_const(inDatas)) {
142                 i.first->AddConsumer(task, i.second);
143             }
144         }
145 
146         {
147             // 3.2 Process output dependencies
148             for (auto& o : std::as_const(outDatas)) {
149                 o.first->AddProducer(task);
150             }
151         }
152         if (task->dataRefCnt.submitDep != 0) {
153             FFRT_BLOCK_TRACER(task->gid, dep);
154             FFRT_TRACE_END();
155             return;
156         }
157     }
158 
159     if (attr != nullptr) {
160         task->notifyWorker_ = attr->notifyWorker_;
161     }
162 
163     task->UpdateState(TaskState::READY);
164     FFRTTraceRecord::TaskEnqueue<ffrt_normal_task>(qos);
165     FFRT_TRACE_END();
166 }
167 
onWait()168 void SDependenceManager::onWait()
169 {
170     auto ctx = ExecuteCtx::Cur();
171     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
172     auto task = static_cast<SCPUEUTask*>(baseTask);
173 
174     if (ThreadWaitMode(task)) {
175         std::unique_lock<std::mutex> lck(task->mutex_);
176         task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
177         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
178         if (FFRT_UNLIKELY(LegacyMode(task))) {
179             task->blockType = BlockType::BLOCK_THREAD;
180         }
181         task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
182         return;
183     }
184 
185     auto childDepFun = [&](ffrt::CPUEUTask* task) -> bool {
186         auto sTask = static_cast<SCPUEUTask*>(task);
187         std::unique_lock<std::mutex> lck(sTask->mutex_);
188         if (sTask->childRefCnt == 0) {
189             return false;
190         }
191         sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
192         sTask->UpdateState(ffrt::TaskState::BLOCKED);
193         return true;
194     };
195     FFRT_BLOCK_TRACER(task->gid, chd);
196     CoWait(childDepFun);
197 }
198 
199 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)200 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
201 #else
202 void SDependenceManager::onWait(const ffrt_deps_t* deps)
203 #endif
204 {
205     auto ctx = ExecuteCtx::Cur();
206     auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
207     auto task = static_cast<SCPUEUTask*>(baseTask);
208     task->dataRefCnt.waitDep = 0;
209 
210     auto dataDepFun = [&]() {
211         std::vector<VersionCtx*> waitDatas;
212         waitDatas.reserve(deps->len);
213         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
214 
215         for (uint32_t i = 0; i < deps->len; ++i) {
216             auto d = deps->items[i].ptr;
217             auto it = std::as_const(Entity::Instance()->vaMap).find(d);
218             if (it != Entity::Instance()->vaMap.end()) {
219                 auto waitData = it->second;
220                 // Find the VersionCtx of the parent task level
221                 for (auto out : std::as_const(task->outs)) {
222                     if (waitData->signature == out->signature) {
223                         waitData = out;
224                         break;
225                     }
226                 }
227                 waitDatas.push_back(waitData);
228             }
229         }
230 #ifdef QOS_DEPENDENCY
231         if (deadline != -1) {
232             Scheduler::Instance()->onWait(waitDatas, deadline);
233         }
234 #endif
235         for (auto data : std::as_const(waitDatas)) {
236             data->AddDataWaitTaskByThis(task);
237         }
238     };
239 
240     if (ThreadWaitMode(task)) {
241         dataDepFun();
242         std::unique_lock<std::mutex> lck(task->mutex_);
243         task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
244         FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
245         if (FFRT_UNLIKELY(LegacyMode(task))) {
246             task->blockType = BlockType::BLOCK_THREAD;
247         }
248         task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
249         return;
250     }
251 
252     auto pendDataDepFun = [&](ffrt::CPUEUTask* task) -> bool {
253         auto sTask = static_cast<SCPUEUTask*>(task);
254         dataDepFun();
255         FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
256         std::unique_lock<std::mutex> lck(sTask->mutex_);
257         if (sTask->dataRefCnt.waitDep == 0) {
258             return false;
259         }
260         sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
261         sTask->UpdateState(ffrt::TaskState::BLOCKED);
262         return true;
263     };
264     FFRT_BLOCK_TRACER(task->gid, dat);
265     CoWait(pendDataDepFun);
266 }
267 
onExecResults(const ffrt_deps_t * deps)268 int SDependenceManager::onExecResults(const ffrt_deps_t *deps)
269 {
270     return 0;
271 }
272 
onTaskDone(CPUEUTask * task)273 void SDependenceManager::onTaskDone(CPUEUTask* task)
274 {
275     auto sTask = static_cast<SCPUEUTask*>(task);
276     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
277     FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(),  task);
278     FFRT_TRACE_SCOPE(1, ontaskDone);
279     sTask->DecChildRef();
280     if (!(sTask->ins.empty() && sTask->outs.empty())) {
281         std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
282         FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
283 
284         // Production data
285         for (auto out : std::as_const(sTask->outs)) {
286             out->onProduced();
287         }
288         // Consumption data
289         for (auto in : std::as_const(sTask->ins)) {
290             in->onConsumed(sTask);
291         }
292         for (auto in : std::as_const(sTask->in_handles)) {
293             in->DecDeleteRef();
294         }
295         // VersionCtx recycling
296         Entity::Instance()->RecycleVersion();
297     }
298     if (task->isWatchdogEnable) {
299         RemoveTaskFromWatchdog(task->gid);
300     }
301     sTask->RecycleTask();
302 }
303 
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)304 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
305     const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
306     std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
307 {
308     auto en = Entity::Instance();
309     // scene description:
310     for (auto signature : inDeps) {
311         VersionCtx* version = nullptr;
312         NestType type = NestType::DEFAULT;
313         // scene 1|2
314         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
315             if (parentOut->signature == signature) {
316                 version = parentOut;
317                 type = NestType::PARENTOUT;
318                 goto add_inversion;
319             }
320         }
321         // scene 3
322         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
323             if (parentIn->signature == signature) {
324                 version = parentIn;
325                 type = NestType::PARENTIN;
326                 goto add_inversion;
327             }
328         }
329         // scene 4
330         version = en->VA2Ctx(signature, task);
331     add_inversion:
332         inVersions.push_back({version, type});
333     }
334 
335     for (auto signature : outDeps) {
336         VersionCtx* version = nullptr;
337         NestType type = NestType::DEFAULT;
338         // scene 5|6
339         for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
340             if (parentOut->signature == signature) {
341                 version = parentOut;
342                 type = NestType::PARENTOUT;
343                 goto add_outversion;
344             }
345         }
346         // scene 7
347 #ifndef FFRT_RELEASE
348         for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
349             if (parentIn->signature == signature) {
350                 FFRT_LOGE("parent's indep only cannot be child's outdep");
351             }
352         }
353 #endif
354         // scene 8
355         version = en->VA2Ctx(signature, task);
356     add_outversion:
357         outVersions.push_back({version, type});
358     }
359 }
360 } // namespace ffrt