1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 18 19 #include <mutex> 20 21 #if defined(ENABLE_TASKPOOL_FFRT) 22 #include "cpp/task.h" 23 #endif 24 #include "helper/concurrent_helper.h" 25 #include "helper/error_helper.h" 26 #include "helper/napi_helper.h" 27 #include "helper/object_helper.h" 28 #include "message_queue.h" 29 #include "napi/native_api.h" 30 #include "napi/native_node_api.h" 31 #include "native_engine/native_engine.h" 32 #include "qos_helper.h" 33 #include "task.h" 34 #include "task_runner.h" 35 #include "tools/log.h" 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 using namespace Commonlibrary::Concurrent::Common::Helper; 40 using namespace Commonlibrary::Platform; 41 using MsgQueue = MessageQueue<TaskResultInfo*>; 42 43 enum class WorkerState { IDLE, RUNNING, BLOCKED }; 44 45 #if defined(ENABLE_TASKPOOL_FFRT) 46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = { 47 {Priority::IDLE, ffrt::qos_background}, 48 {Priority::LOW, ffrt::qos_utility}, 49 {Priority::MEDIUM, ffrt::qos_default}, 50 {Priority::HIGH, ffrt::qos_user_initiated}, 51 }; 52 #endif 53 54 class Worker { 55 public: 56 using DebuggerPostTask = std::function<void()>; 57 58 static Worker* WorkerConstructor(napi_env env); 59 60 void NotifyExecuteTask(); 61 Enqueue(napi_env env,TaskResultInfo * resultInfo)62 void Enqueue(napi_env env, TaskResultInfo* resultInfo) 63 { 64 std::lock_guard<std::mutex> lock(queueMutex_); 65 msgQueueMap_[env].EnQueue(resultInfo); 66 } 67 Dequeue(napi_env env,MsgQueue * & queue)68 void Dequeue(napi_env env, MsgQueue*& queue) 69 { 70 std::lock_guard<std::mutex> lock(queueMutex_); 71 auto item = msgQueueMap_.find(env); 72 if (item != msgQueueMap_.end()) { 73 queue = &(item->second); 74 } 75 } 76 77 void NotifyTaskBegin(); 78 // the function will only be called when the task is finished or 79 // exits abnormally, so we can not put it in the scope directly 80 void NotifyTaskFinished(); 81 static void NotifyTaskResult(napi_env env, Task* task, napi_value result); 82 static void NotifyHandleTaskResult(Task* task); 83 84 #if defined(ENABLE_TASKPOOL_FFRT) 85 bool IsLoopActive(); 86 uint64_t GetWaitTime(); 87 #endif 88 89 private: Worker(napi_env env)90 explicit Worker(napi_env env) : hostEnv_(env) {}; 91 92 ~Worker() = default; 93 94 Worker(const Worker &) = delete; 95 Worker& operator=(const Worker &) = delete; 96 Worker(Worker &&) = delete; 97 Worker& operator=(Worker &&) = delete; 98 99 void NotifyIdle(); 100 void NotifyWorkerCreated(); NotifyTaskRunning()101 void NotifyTaskRunning() 102 { 103 state_ = WorkerState::RUNNING; 104 startTime_ = ConcurrentHelper::GetMilliseconds(); 105 runningCount_++; 106 } 107 108 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 109 static void HandleDebuggerTask(const uv_async_t* req); 110 void DebuggerOnPostTask(std::function<void()>&& task); 111 #endif 112 GetWorkerLoop()113 uv_loop_t* GetWorkerLoop() const 114 { 115 if (workerEnv_ != nullptr) { 116 return NapiHelper::GetLibUV(workerEnv_); 117 } 118 return nullptr; 119 } 120 RunLoop()121 void RunLoop() const 122 { 123 uv_loop_t* loop = GetWorkerLoop(); 124 if (loop != nullptr) { 125 uv_run(loop, UV_RUN_DEFAULT); 126 } else { 127 HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop"); 128 return; 129 } 130 } 131 132 // we will use the scope to manage resources automatically, 133 // including the HandleScope and NotifyRunning/NotifyIdle 134 class RunningScope { 135 public: RunningScope(Worker * worker)136 explicit RunningScope(Worker* worker) : worker_(worker) 137 { 138 napi_open_handle_scope(worker_->workerEnv_, &scope_); 139 worker_->idleState_ = false; 140 worker->isExecutingLongTask_ = false; 141 worker_->NotifyTaskRunning(); 142 } 143 144 ~RunningScope(); 145 146 private: 147 Worker* worker_ = nullptr; 148 napi_handle_scope scope_ = nullptr; 149 }; 150 151 // use PriorityScope to manage the priority setting of workers 152 // reset qos_user_initiated when exit PriorityScope 153 class PriorityScope { 154 public: 155 PriorityScope(Worker* worker, Priority taskPriority); ~PriorityScope()156 ~PriorityScope() 157 { 158 worker_->ResetWorkerPriority(); 159 } 160 161 private: 162 Worker* worker_ = nullptr; 163 }; 164 165 void StartExecuteInThread(); 166 static void ExecuteInThread(const void* data); 167 bool PrepareForWorkerInstance(); 168 void ReleaseWorkerThreadContent(); 169 void ResetWorkerPriority(); 170 bool CheckFreeConditions(); 171 bool UpdateWorkerState(WorkerState expect, WorkerState desired); 172 void StoreTaskId(uint64_t taskId); 173 bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task); 174 void UpdateExecutedInfo(); 175 void UpdateLongTaskInfo(Task* task); 176 bool IsExecutingLongTask(); 177 bool HasLongTask(); 178 void TerminateTask(uint64_t taskId); 179 180 static void HandleFunctionException(napi_env env, Task* task); 181 static void PerformTask(const uv_async_t* req); 182 static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data); 183 static void ReleaseWorkerHandles(const uv_async_t* req); 184 static void TriggerGCCheck(const uv_async_t* req); 185 186 #if defined(ENABLE_TASKPOOL_FFRT) 187 void InitFfrtInfo(); 188 void InitLoopHandleNum(); 189 #endif 190 191 napi_env hostEnv_ {nullptr}; 192 napi_env workerEnv_ {nullptr}; 193 uv_async_t* performTaskSignal_ {nullptr}; 194 uv_async_t* clearWorkerSignal_ {nullptr}; 195 uv_async_t* triggerGCCheckSignal_ {nullptr}; 196 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 197 uv_async_t* debuggerOnPostTaskSignal_ {nullptr}; 198 std::mutex debuggerMutex_; 199 std::queue<DebuggerPostTask> debuggerQueue_ {}; 200 #endif 201 std::unique_ptr<TaskRunner> runner_ {nullptr}; 202 203 std::atomic<int32_t> runningCount_ = 0; 204 std::atomic<bool> idleState_ = true; // true means the worker is idle 205 std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds(); 206 std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds(); 207 std::atomic<WorkerState> state_ {WorkerState::IDLE}; 208 std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks 209 Priority priority_ {Priority::DEFAULT}; 210 pid_t tid_ = 0; 211 std::vector<uint64_t> currentTaskId_ {}; 212 std::mutex currentTaskIdMutex_; 213 MessageQueue<TaskResultInfo*> hostMessageQueue_ {}; 214 uint64_t lastCpuTime_ = 0; 215 uint32_t idleCount_ = 0; 216 std::atomic<bool> hasLongTask_ = false; 217 std::atomic<bool> isExecutingLongTask_ = false; 218 std::mutex longMutex_; 219 std::unordered_set<uint64_t> longTasksSet_ {}; 220 std::mutex queueMutex_; // for sendData 221 std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {}; 222 friend class TaskManager; 223 friend class NativeEngineTest; 224 225 #if defined(ENABLE_TASKPOOL_FFRT) 226 void* ffrtTaskHandle_ = nullptr; 227 uint32_t initActiveHandleNum_ = 0; 228 #endif 229 }; 230 } // namespace Commonlibrary::Concurrent::TaskPoolModule 231 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 232