1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sdependence_manager.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "tm/queue_task.h"
19 #include "util/worker_monitor.h"
20 #include "util/ffrt_facade.h"
21 #include "util/slab.h"
22
23 #ifdef FFRT_ASYNC_STACKTRACE
24 #include "dfx/async_stack/ffrt_async_stack.h"
25 #endif
26
27 namespace ffrt {
28
SDependenceManager()29 SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
30 {
31 // control construct sequences of singletons
32 #ifdef FFRT_OH_TRACE_ENABLE
33 TraceAdapter::Instance();
34 #endif
35 SimpleAllocator<CPUEUTask>::Instance();
36 SimpleAllocator<SCPUEUTask>::Instance();
37 SimpleAllocator<QueueTask>::Instance();
38 SimpleAllocator<VersionCtx>::Instance();
39 SimpleAllocator<WaitUntilEntry>::Instance();
40 CoStackAttr::Instance();
41 PollerProxy::Instance();
42 FFRTScheduler::Instance();
43 ExecuteUnit::Instance();
44 TaskState::RegisterOps(TaskState::EXITED,
45 [this](CPUEUTask* task) { return this->onTaskDone(static_cast<SCPUEUTask*>(task)), true; });
46
47 #ifdef FFRT_WORKER_MONITOR
48 WorkerMonitor::GetInstance();
49 #endif
50 #ifdef FFRT_OH_TRACE_ENABLE
51 _StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
52 _FinishTrace(HITRACE_TAG_FFRT);
53 #endif
54 QueueMonitor::GetInstance();
55 GetIOPoller();
56 DelayedWorker::GetInstance();
57 }
58
~SDependenceManager()59 SDependenceManager::~SDependenceManager()
60 {
61 }
62
RemoveRepeatedDeps(std::vector<CPUEUTask * > & in_handles,const ffrt_deps_t * ins,const ffrt_deps_t * outs,std::vector<const void * > & insNoDup,std::vector<const void * > & outsNoDup)63 void SDependenceManager::RemoveRepeatedDeps(std::vector<CPUEUTask*>& in_handles, const ffrt_deps_t* ins, const ffrt_deps_t* outs,
64 std::vector<const void *>& insNoDup, std::vector<const void *>& outsNoDup)
65 {
66 // signature去重:1)outs去重
67 if (outs) {
68 OutsDedup(outsNoDup, outs);
69 }
70
71 // signature去重:2)ins去重(不影响功能,skip);3)ins不和outs重复(当前不支持weak signature)
72 if (ins) {
73 InsDedup(in_handles, insNoDup, outsNoDup, ins);
74 }
75 }
76
onSubmit(bool has_handle,ffrt_task_handle_t & handle,ffrt_function_header_t * f,const ffrt_deps_t * ins,const ffrt_deps_t * outs,const task_attr_private * attr)77 void SDependenceManager::onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
78 const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr)
79 {
80 // 0 check outs handle
81 if (!CheckOutsHandle(outs)) {
82 FFRT_LOGE("outs contain handles error");
83 return;
84 }
85
86 // 1 Init eu and scheduler
87 auto ctx = ExecuteCtx::Cur();
88
89 // 2 Get current task's parent
90 auto parent = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
91
92 // 2.1 Create task ctx
93 SCPUEUTask* task = nullptr;
94 {
95 task = reinterpret_cast<SCPUEUTask*>(static_cast<uintptr_t>(
96 static_cast<size_t>(reinterpret_cast<uintptr_t>(f)) - OFFSETOF(SCPUEUTask, func_storage)));
97 new (task)SCPUEUTask(attr, parent, ++parent->childNum, QoS());
98 }
99 FFRT_TRACE_BEGIN(("submit|" + std::to_string(task->gid)).c_str());
100 #ifdef FFRT_ASYNC_STACKTRACE
101 {
102 task->stackId = FFRTCollectAsyncStack();
103 }
104 #endif
105 QoS qos = (attr == nullptr ? QoS() : QoS(attr->qos_));
106 FFRTTraceRecord::TaskSubmit<ffrt_normal_task>(qos, &(task->createTime), &(task->fromTid));
107
108 std::vector<const void*> insNoDup;
109 std::vector<const void*> outsNoDup;
110 RemoveRepeatedDeps(task->in_handles, ins, outs, insNoDup, outsNoDup);
111
112 #ifdef FFRT_OH_WATCHDOG_ENABLE
113 if (attr != nullptr && IsValidTimeout(task->gid, attr->timeout_)) {
114 task->isWatchdogEnable = true;
115 AddTaskToWatchdog(task->gid);
116 SendTimeoutWatchdog(task->gid, attr->timeout_, attr->delay_);
117 }
118 #endif
119 if (has_handle) {
120 task->IncDeleteRef();
121 handle = static_cast<ffrt_task_handle_t>(task);
122 outsNoDup.push_back(handle); // handle作为任务的输出signature
123 }
124 task->SetQos(qos);
125 /* The parent's number of subtasks to be completed increases by one,
126 * and decreases by one after the subtask is completed
127 */
128 task->IncChildRef();
129
130 std::vector<std::pair<VersionCtx*, NestType>> inDatas;
131 std::vector<std::pair<VersionCtx*, NestType>> outDatas;
132
133 if (!(insNoDup.empty() && outsNoDup.empty())) {
134 // 3 Put the submitted task into Entity
135 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
136
137 MapSignature2Deps(task, insNoDup, outsNoDup, inDatas, outDatas);
138
139 {
140 // 3.1 Process input dependencies
141 for (auto& i : std::as_const(inDatas)) {
142 i.first->AddConsumer(task, i.second);
143 }
144 }
145
146 {
147 // 3.2 Process output dependencies
148 for (auto& o : std::as_const(outDatas)) {
149 o.first->AddProducer(task);
150 }
151 }
152 if (task->dataRefCnt.submitDep != 0) {
153 FFRT_BLOCK_TRACER(task->gid, dep);
154 FFRT_TRACE_END();
155 return;
156 }
157 }
158
159 if (attr != nullptr) {
160 task->notifyWorker_ = attr->notifyWorker_;
161 }
162
163 task->UpdateState(TaskState::READY);
164 FFRTTraceRecord::TaskEnqueue<ffrt_normal_task>(qos);
165 FFRT_TRACE_END();
166 }
167
onWait()168 void SDependenceManager::onWait()
169 {
170 auto ctx = ExecuteCtx::Cur();
171 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
172 auto task = static_cast<SCPUEUTask*>(baseTask);
173
174 if (ThreadWaitMode(task)) {
175 std::unique_lock<std::mutex> lck(task->mutex_);
176 task->MultiDepenceAdd(Denpence::CALL_DEPENCE);
177 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
178 if (FFRT_UNLIKELY(LegacyMode(task))) {
179 task->blockType = BlockType::BLOCK_THREAD;
180 }
181 task->waitCond_.wait(lck, [task] { return task->childRefCnt == 0; });
182 return;
183 }
184
185 auto childDepFun = [&](ffrt::CPUEUTask* task) -> bool {
186 auto sTask = static_cast<SCPUEUTask*>(task);
187 std::unique_lock<std::mutex> lck(sTask->mutex_);
188 if (sTask->childRefCnt == 0) {
189 return false;
190 }
191 sTask->MultiDepenceAdd(Denpence::CALL_DEPENCE);
192 sTask->UpdateState(ffrt::TaskState::BLOCKED);
193 return true;
194 };
195 FFRT_BLOCK_TRACER(task->gid, chd);
196 CoWait(childDepFun);
197 }
198
199 #ifdef QOS_DEPENDENCY
onWait(const ffrt_deps_t * deps,int64_t deadline=-1)200 void SDependenceManager::onWait(const ffrt_deps_t* deps, int64_t deadline = -1)
201 #else
202 void SDependenceManager::onWait(const ffrt_deps_t* deps)
203 #endif
204 {
205 auto ctx = ExecuteCtx::Cur();
206 auto baseTask = (ctx->task && ctx->task->type == ffrt_normal_task) ? ctx->task : DependenceManager::Root();
207 auto task = static_cast<SCPUEUTask*>(baseTask);
208 task->dataRefCnt.waitDep = 0;
209
210 auto dataDepFun = [&]() {
211 std::vector<VersionCtx*> waitDatas;
212 waitDatas.reserve(deps->len);
213 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
214
215 for (uint32_t i = 0; i < deps->len; ++i) {
216 auto d = deps->items[i].ptr;
217 auto it = std::as_const(Entity::Instance()->vaMap).find(d);
218 if (it != Entity::Instance()->vaMap.end()) {
219 auto waitData = it->second;
220 // Find the VersionCtx of the parent task level
221 for (auto out : std::as_const(task->outs)) {
222 if (waitData->signature == out->signature) {
223 waitData = out;
224 break;
225 }
226 }
227 waitDatas.push_back(waitData);
228 }
229 }
230 #ifdef QOS_DEPENDENCY
231 if (deadline != -1) {
232 Scheduler::Instance()->onWait(waitDatas, deadline);
233 }
234 #endif
235 for (auto data : std::as_const(waitDatas)) {
236 data->AddDataWaitTaskByThis(task);
237 }
238 };
239
240 if (ThreadWaitMode(task)) {
241 dataDepFun();
242 std::unique_lock<std::mutex> lck(task->mutex_);
243 task->MultiDepenceAdd(Denpence::DATA_DEPENCE);
244 FFRT_LOGD("onWait name:%s gid=%lu", task->label.c_str(), task->gid);
245 if (FFRT_UNLIKELY(LegacyMode(task))) {
246 task->blockType = BlockType::BLOCK_THREAD;
247 }
248 task->waitCond_.wait(lck, [task] { return task->dataRefCnt.waitDep == 0; });
249 return;
250 }
251
252 auto pendDataDepFun = [&](ffrt::CPUEUTask* task) -> bool {
253 auto sTask = static_cast<SCPUEUTask*>(task);
254 dataDepFun();
255 FFRT_LOGD("onWait name:%s gid=%lu", sTask->label.c_str(), sTask->gid);
256 std::unique_lock<std::mutex> lck(sTask->mutex_);
257 if (sTask->dataRefCnt.waitDep == 0) {
258 return false;
259 }
260 sTask->MultiDepenceAdd(Denpence::DATA_DEPENCE);
261 sTask->UpdateState(ffrt::TaskState::BLOCKED);
262 return true;
263 };
264 FFRT_BLOCK_TRACER(task->gid, dat);
265 CoWait(pendDataDepFun);
266 }
267
onExecResults(const ffrt_deps_t * deps)268 int SDependenceManager::onExecResults(const ffrt_deps_t *deps)
269 {
270 return 0;
271 }
272
onTaskDone(CPUEUTask * task)273 void SDependenceManager::onTaskDone(CPUEUTask* task)
274 {
275 auto sTask = static_cast<SCPUEUTask*>(task);
276 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos());
277 FFRTTraceRecord::TaskDone<ffrt_normal_task>(task->GetQos(), task);
278 FFRT_TRACE_SCOPE(1, ontaskDone);
279 sTask->DecChildRef();
280 if (!(sTask->ins.empty() && sTask->outs.empty())) {
281 std::lock_guard<decltype(criticalMutex_)> lg(criticalMutex_);
282 FFRT_TRACE_SCOPE(1, taskDoneAfterLock);
283
284 // Production data
285 for (auto out : std::as_const(sTask->outs)) {
286 out->onProduced();
287 }
288 // Consumption data
289 for (auto in : std::as_const(sTask->ins)) {
290 in->onConsumed(sTask);
291 }
292 for (auto in : std::as_const(sTask->in_handles)) {
293 in->DecDeleteRef();
294 }
295 // VersionCtx recycling
296 Entity::Instance()->RecycleVersion();
297 }
298 if (task->isWatchdogEnable) {
299 RemoveTaskFromWatchdog(task->gid);
300 }
301 sTask->RecycleTask();
302 }
303
MapSignature2Deps(SCPUEUTask * task,const std::vector<const void * > & inDeps,const std::vector<const void * > & outDeps,std::vector<std::pair<VersionCtx *,NestType>> & inVersions,std::vector<std::pair<VersionCtx *,NestType>> & outVersions)304 void SDependenceManager::MapSignature2Deps(SCPUEUTask* task, const std::vector<const void*>& inDeps,
305 const std::vector<const void*>& outDeps, std::vector<std::pair<VersionCtx*, NestType>>& inVersions,
306 std::vector<std::pair<VersionCtx*, NestType>>& outVersions)
307 {
308 auto en = Entity::Instance();
309 // scene description:
310 for (auto signature : inDeps) {
311 VersionCtx* version = nullptr;
312 NestType type = NestType::DEFAULT;
313 // scene 1|2
314 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
315 if (parentOut->signature == signature) {
316 version = parentOut;
317 type = NestType::PARENTOUT;
318 goto add_inversion;
319 }
320 }
321 // scene 3
322 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
323 if (parentIn->signature == signature) {
324 version = parentIn;
325 type = NestType::PARENTIN;
326 goto add_inversion;
327 }
328 }
329 // scene 4
330 version = en->VA2Ctx(signature, task);
331 add_inversion:
332 inVersions.push_back({version, type});
333 }
334
335 for (auto signature : outDeps) {
336 VersionCtx* version = nullptr;
337 NestType type = NestType::DEFAULT;
338 // scene 5|6
339 for (auto parentOut : std::as_const(static_cast<SCPUEUTask*>(task->parent)->outs)) {
340 if (parentOut->signature == signature) {
341 version = parentOut;
342 type = NestType::PARENTOUT;
343 goto add_outversion;
344 }
345 }
346 // scene 7
347 #ifndef FFRT_RELEASE
348 for (auto parentIn : std::as_const(static_cast<SCPUEUTask*>(task->parent)->ins)) {
349 if (parentIn->signature == signature) {
350 FFRT_LOGE("parent's indep only cannot be child's outdep");
351 }
352 }
353 #endif
354 // scene 8
355 version = en->VA2Ctx(signature, task);
356 add_outversion:
357 outVersions.push_back({version, type});
358 }
359 }
360 } // namespace ffrt