/* * Copyright (c) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "ffrt_inner.h" #include "internal_inc/osal.h" #include "sync/io_poller.h" #include "qos.h" #include "sched/task_scheduler.h" #include "task_attr_private.h" #include "internal_inc/config.h" #include "eu/osattr_manager.h" #include "eu/worker_thread.h" #include "dfx/log/ffrt_log_api.h" #include "dfx/trace_record/ffrt_trace_record.h" #include "dfx/watchdog/watchdog_util.h" #include "eu/func_manager.h" #include "util/ffrt_facade.h" #include "util/slab.h" #include "eu/sexecute_unit.h" #include "core/task_io.h" #include "sync/poller.h" #include "util/spmc_queue.h" #include "tm/task_factory.h" #include "tm/queue_task.h" namespace ffrt { inline void submit_impl(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f, const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr) { FFRTFacade::GetDMInstance().onSubmit(has_handle, handle, f, ins, outs, attr); } API_ATTRIBUTE((visibility("default"))) void sync_io(int fd) { ffrt_wait_fd(fd); } API_ATTRIBUTE((visibility("default"))) void set_trace_tag(const char* name) { CPUEUTask* curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask != nullptr) { curTask->SetTraceTag(name); } } API_ATTRIBUTE((visibility("default"))) void clear_trace_tag() { CPUEUTask* curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask != nullptr) { curTask->ClearTraceTag(); } } void CreateDelayDeps( ffrt_task_handle_t &handle, const ffrt_deps_t *in_deps, const ffrt_deps_t *out_deps, task_attr_private *p) { // setting dependences is not supportted for delayed task if (unlikely(((in_deps != nullptr) && (in_deps->len != 0)) || ((out_deps != nullptr) && (out_deps->len != 0)))) { FFRT_LOGE("delayed task do not support dependence, in_deps/out_deps ignored."); } // delay task uint64_t delayUs = p->delay_; std::function &&func = [delayUs]() { this_task::sleep_for(std::chrono::microseconds(delayUs)); FFRT_LOGD("submit task delay time [%d us] has ended.", delayUs); }; ffrt_function_header_t *delay_func = create_function_wrapper(std::move(func)); submit_impl(true, handle, delay_func, nullptr, nullptr, reinterpret_cast(p)); } } // namespace ffrt #ifdef __cplusplus extern "C" { #endif API_ATTRIBUTE((visibility("default"))) int ffrt_task_attr_init(ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return -1; } static_assert(sizeof(ffrt::task_attr_private) <= ffrt_task_attr_storage_size, "size must be less than ffrt_task_attr_storage_size"); new (attr)ffrt::task_attr_private(); return 0; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_destroy(ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } auto p = reinterpret_cast(attr); p->~task_attr_private(); } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_name(ffrt_task_attr_t *attr, const char *name) { if (unlikely(!attr || !name)) { FFRT_LOGE("invalid attr or name"); return; } (reinterpret_cast(attr))->name_ = name; } API_ATTRIBUTE((visibility("default"))) const char *ffrt_task_attr_get_name(const ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return nullptr; } ffrt_task_attr_t *p = const_cast(attr); return (reinterpret_cast(p))->name_.c_str(); } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_qos(ffrt_task_attr_t *attr, ffrt_qos_t qos) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } if (ffrt::GetFuncQosMap() == nullptr) { FFRT_LOGE("FuncQosMap has not regist"); return; } (reinterpret_cast(attr))->qos_ = ffrt::GetFuncQosMap()(qos); } API_ATTRIBUTE((visibility("default"))) ffrt_qos_t ffrt_task_attr_get_qos(const ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return static_cast(ffrt_qos_default); } ffrt_task_attr_t *p = const_cast(attr); return (reinterpret_cast(p))->qos_; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_delay(ffrt_task_attr_t *attr, uint64_t delay_us) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } (reinterpret_cast(attr))->delay_ = delay_us; } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_task_attr_get_delay(const ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return 0; } ffrt_task_attr_t *p = const_cast(attr); return (reinterpret_cast(p))->delay_; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_timeout(ffrt_task_attr_t *attr, uint64_t timeout_us) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } (reinterpret_cast(attr))->timeout_ = timeout_us; } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_task_attr_get_timeout(const ffrt_task_attr_t *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return 0; } ffrt_task_attr_t *p = const_cast(attr); return (reinterpret_cast(p))->timeout_; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_notify_worker(ffrt_task_attr_t* attr, bool notify) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } (reinterpret_cast(attr))->notifyWorker_ = notify; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_queue_priority(ffrt_task_attr_t* attr, ffrt_queue_priority_t priority) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } // eventhandler inner priority is one more than the kits priority int prio = static_cast(priority); if (prio < static_cast(ffrt_queue_priority_immediate) || prio > static_cast(ffrt_queue_priority_idle) + 1) { FFRT_LOGE("priority should be a valid priority"); return; } (reinterpret_cast(attr))->prio_ = priority; } API_ATTRIBUTE((visibility("default"))) ffrt_queue_priority_t ffrt_task_attr_get_queue_priority(const ffrt_task_attr_t* attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return ffrt_queue_priority_immediate; } ffrt_task_attr_t *p = const_cast(attr); return static_cast((reinterpret_cast(p))->prio_); } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_stack_size(ffrt_task_attr_t* attr, uint64_t size) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } (reinterpret_cast(attr))->stackSize_ = size; } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_task_attr_get_stack_size(const ffrt_task_attr_t* attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return 0; } return (reinterpret_cast(attr))->stackSize_; } // submit API_ATTRIBUTE((visibility("default"))) void *ffrt_alloc_auto_managed_function_storage_base(ffrt_function_kind_t kind) { if (kind == ffrt_function_kind_general) { return ffrt::TaskFactory::Alloc()->func_storage; } return ffrt::SimpleAllocator::AllocMem()->func_storage; } API_ATTRIBUTE((visibility("default"))) void ffrt_submit_base(ffrt_function_header_t *f, const ffrt_deps_t *in_deps, const ffrt_deps_t *out_deps, const ffrt_task_attr_t *attr) { if (unlikely(!f)) { FFRT_LOGE("function handler should not be empty"); return; } ffrt_task_handle_t handle; ffrt::task_attr_private *p = reinterpret_cast(const_cast(attr)); if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) { ffrt::submit_impl(false, handle, f, in_deps, out_deps, p); return; } // task after delay ffrt_task_handle_t delay_handle; uint64_t timeout = p->timeout_; p->timeout_ = 0; ffrt::CreateDelayDeps(delay_handle, in_deps, out_deps, p); p->timeout_ = timeout; std::vector deps = {{ffrt_dependence_task, delay_handle}}; ffrt_deps_t delay_deps {static_cast(deps.size()), deps.data()}; ffrt::submit_impl(false, handle, f, &delay_deps, nullptr, p); ffrt_task_handle_destroy(delay_handle); } API_ATTRIBUTE((visibility("default"))) ffrt_task_handle_t ffrt_submit_h_base(ffrt_function_header_t *f, const ffrt_deps_t *in_deps, const ffrt_deps_t *out_deps, const ffrt_task_attr_t *attr) { if (unlikely(!f)) { FFRT_LOGE("function handler should not be empty"); return nullptr; } ffrt_task_handle_t handle = nullptr; ffrt::task_attr_private *p = reinterpret_cast(const_cast(attr)); if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) { ffrt::submit_impl(true, handle, f, in_deps, out_deps, p); return handle; } // task after delay ffrt_task_handle_t delay_handle = nullptr; uint64_t timeout = p->timeout_; p->timeout_ = 0; ffrt::CreateDelayDeps(delay_handle, in_deps, out_deps, p); p->timeout_ = timeout; std::vector deps = {{ffrt_dependence_task, delay_handle}}; ffrt_deps_t delay_deps {static_cast(deps.size()), deps.data()}; ffrt::submit_impl(true, handle, f, &delay_deps, nullptr, p); ffrt_task_handle_destroy(delay_handle); return handle; } API_ATTRIBUTE((visibility("default"))) uint32_t ffrt_task_handle_inc_ref(ffrt_task_handle_t handle) { if (handle == nullptr) { FFRT_LOGE("input task handle is invalid"); return -1; } return static_cast(handle)->IncDeleteRef(); } API_ATTRIBUTE((visibility("default"))) uint32_t ffrt_task_handle_dec_ref(ffrt_task_handle_t handle) { if (handle == nullptr) { FFRT_LOGE("input task handle is invalid"); return -1; } return static_cast(handle)->DecDeleteRef(); } API_ATTRIBUTE((visibility("default"))) void ffrt_task_handle_destroy(ffrt_task_handle_t handle) { ffrt_task_handle_dec_ref(handle); } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_task_handle_get_id(ffrt_task_handle_t handle) { FFRT_COND_DO_ERR((handle == nullptr), return 0, "input task handle is invalid"); return static_cast(handle)->gid; } // wait API_ATTRIBUTE((visibility("default"))) void ffrt_wait_deps(const ffrt_deps_t *deps) { if (unlikely(!deps)) { FFRT_LOGE("deps should not be empty"); return; } std::vector v(deps->len); for (uint64_t i = 0; i < deps->len; ++i) { v[i] = deps->items[i]; } ffrt_deps_t d = { deps->len, v.data() }; ffrt::FFRTFacade::GetDMInstance().onWait(&d); } API_ATTRIBUTE((visibility("default"))) void ffrt_wait() { ffrt::FFRTFacade::GetDMInstance().onWait(); } API_ATTRIBUTE((visibility("default"))) int ffrt_set_cgroup_attr(ffrt_qos_t qos, ffrt_os_sched_attr *attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should not be empty"); return -1; } if (ffrt::GetFuncQosMap() == nullptr) { FFRT_LOGE("FuncQosMap has not regist"); return -1; } ffrt::QoS _qos = ffrt::GetFuncQosMap()(qos); return ffrt::OSAttrManager::Instance()->UpdateSchedAttr(_qos, attr); } API_ATTRIBUTE((visibility("default"))) void ffrt_restore_qos_config() { ffrt::WorkerGroupCtl *wgCtl = ffrt::FFRTFacade::GetEUInstance().GetGroupCtl(); for (auto qos = ffrt::QoS::Min(); qos < ffrt::QoS::Max(); ++qos) { std::unique_lock lck(wgCtl[qos].tgMutex); for (auto& thread : wgCtl[qos].threads) { ffrt::SetThreadAttr(thread.first, qos); } } } API_ATTRIBUTE((visibility("default"))) int ffrt_set_cpu_worker_max_num(ffrt_qos_t qos, uint32_t num) { if (ffrt::GetFuncQosMap() == nullptr) { FFRT_LOGE("FuncQosMap has not regist"); return -1; } ffrt::QoS _qos = ffrt::GetFuncQosMap()(qos); if (((qos != ffrt::qos_default) && (_qos() == ffrt::qos_default)) || (qos <= ffrt::qos_inherit)) { FFRT_LOGE("qos[%d] is invalid.", qos); return -1; } ffrt::CPUMonitor *monitor = ffrt::FFRTFacade::GetEUInstance().GetCPUMonitor(); return monitor->SetWorkerMaxNum(_qos, num); } API_ATTRIBUTE((visibility("default"))) void ffrt_notify_workers(ffrt_qos_t qos, int number) { if (qos < ffrt::QoS::Min() || qos >= ffrt::QoS::Max() || number <= 0) { FFRT_LOGE("qos [%d] or number [%d] or is invalid.", qos, number); return; } ffrt::FFRTFacade::GetEUInstance().NotifyWorkers(qos, number); } API_ATTRIBUTE((visibility("default"))) ffrt_error_t ffrt_set_worker_stack_size(ffrt_qos_t qos, size_t stack_size) { if (qos < ffrt::QoS::Min() || qos >= ffrt::QoS::Max() || stack_size < PTHREAD_STACK_MIN) { FFRT_LOGE("qos [%d] or stack size [%d] is invalid.", qos, stack_size); return ffrt_error_inval; } ffrt::WorkerGroupCtl* groupCtl = ffrt::FFRTFacade::GetEUInstance().GetGroupCtl(); std::unique_lock lck(groupCtl[qos].tgMutex); if (!groupCtl[qos].threads.empty()) { FFRT_LOGE("Stack size can be set only when there is no worker."); return ffrt_error; } int pageSize = getpagesize(); if (pageSize < 0) { FFRT_LOGE("Invalid pagesize : %d", pageSize); return ffrt_error; } groupCtl[qos].workerStackSize = (stack_size - 1 + static_cast(pageSize)) & -(static_cast(pageSize)); return ffrt_success; } API_ATTRIBUTE((visibility("default"))) int ffrt_this_task_update_qos(ffrt_qos_t qos) { if (ffrt::GetFuncQosMap() == nullptr) { FFRT_LOGE("FuncQosMap has not regist"); return 1; } ffrt::QoS _qos = ffrt::GetFuncQosMap()(qos); auto curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask == nullptr) { FFRT_LOGW("task is nullptr"); return 1; } FFRT_COND_DO_ERR((curTask->type != ffrt_normal_task), return 1, "update qos task type invalid"); if (_qos() == curTask->qos) { FFRT_LOGW("the target qos is equal to current qos, no need update"); return 0; } curTask->SetQos(_qos); ffrt_yield(); return 0; } API_ATTRIBUTE((visibility("default"))) ffrt_qos_t ffrt_this_task_get_qos() { if (ffrt::ExecuteCtx::Cur()->task == nullptr) { FFRT_LOGW("task is nullptr"); return static_cast(ffrt_qos_default); } return ffrt::ExecuteCtx::Cur()->qos(); } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_this_task_get_id() { auto curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask == nullptr) { return 0; } if (curTask->type == ffrt_normal_task) { return curTask->gid; } else if (curTask->type == ffrt_queue_task) { return reinterpret_cast(curTask)->GetHandler()->GetExecTaskId(); } return 0; } API_ATTRIBUTE((visibility("default"))) int64_t ffrt_this_queue_get_id() { auto curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask == nullptr || curTask->type != ffrt_queue_task) { // not serial queue task return -1; } ffrt::QueueTask* task = reinterpret_cast(curTask); return task->GetQueueId(); } API_ATTRIBUTE((visibility("default"))) int ffrt_skip(ffrt_task_handle_t handle) { if (!handle) { FFRT_LOGE("input ffrt task handle is invalid."); return -1; } ffrt::CPUEUTask *task = static_cast(handle); auto exp = ffrt::SkipStatus::SUBMITTED; if (__atomic_compare_exchange_n(&task->skipped, &exp, ffrt::SkipStatus::SKIPPED, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { return 0; } return 1; } API_ATTRIBUTE((visibility("default"))) void ffrt_executor_task_submit(ffrt_executor_task_t* task, const ffrt_task_attr_t* attr) { if (task == nullptr) { FFRT_LOGE("function handler should not be empty"); return; } ffrt::task_attr_private* p = reinterpret_cast(const_cast(attr)); if (likely(attr == nullptr || ffrt_task_attr_get_delay(attr) == 0)) { ffrt::FFRTFacade::GetDMInstance().onSubmitUV(task, p); return; } FFRT_LOGE("uv function does not support delay"); } API_ATTRIBUTE((visibility("default"))) void ffrt_executor_task_register_func(ffrt_executor_task_func func, ffrt_executor_task_type_t type) { ffrt::FuncManager* func_mg = ffrt::FuncManager::Instance(); func_mg->insert(type, func); } API_ATTRIBUTE((visibility("default"))) int ffrt_executor_task_cancel(ffrt_executor_task_t* task, const ffrt_qos_t qos) { if (task == nullptr) { FFRT_LOGE("function handler should not be empty"); return 0; } ffrt::QoS _qos = qos; ffrt::LinkedList* node = reinterpret_cast(&task->wq); ffrt::FFRTScheduler* sch = ffrt::FFRTFacade::GetSchedInstance(); bool ret = sch->RemoveNode(node, _qos); if (ret) { ffrt::FFRTTraceRecord::TaskCancel(qos); } return static_cast(ret); } API_ATTRIBUTE((visibility("default"))) void* ffrt_get_cur_task() { return ffrt::ExecuteCtx::Cur()->task; } API_ATTRIBUTE((visibility("default"))) bool ffrt_get_current_coroutine_stack(void** stack_addr, size_t* size) { if (stack_addr == nullptr || size == nullptr) { return false; } if (!ffrt::USE_COROUTINE) { return false; } auto curTask = ffrt::ExecuteCtx::Cur()->task; if (curTask != nullptr) { auto co = curTask->coRoutine; if (co) { *size = co->stkMem.size; *stack_addr = static_cast(reinterpret_cast(co) + sizeof(CoRoutine) - 8); return true; } } return false; } API_ATTRIBUTE((visibility("default"))) void ffrt_task_attr_set_local(ffrt_task_attr_t* attr, bool task_local) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return; } (reinterpret_cast(attr))->taskLocal_ = task_local; } API_ATTRIBUTE((visibility("default"))) bool ffrt_task_attr_get_local(ffrt_task_attr_t* attr) { if (unlikely(!attr)) { FFRT_LOGE("attr should be a valid address"); return false; } return (reinterpret_cast(attr))->taskLocal_; } API_ATTRIBUTE((visibility("default"))) pthread_t ffrt_task_get_tid(void* task_handle) { if (task_handle == nullptr) { FFRT_LOGE("invalid task handle"); return 0; } auto task = reinterpret_cast(task_handle); return task->runningTid.load(); } API_ATTRIBUTE((visibility("default"))) uint64_t ffrt_get_cur_cached_task_id() { uint64_t gid = ffrt_this_task_get_id(); if (gid == 0) { return ffrt::ExecuteCtx::Cur()->lastGid_; } return gid; } #ifdef __cplusplus } #endif