1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
18 
19 #include <mutex>
20 
21 #if defined(ENABLE_TASKPOOL_FFRT)
22 #include "cpp/task.h"
23 #endif
24 #include "helper/concurrent_helper.h"
25 #include "helper/error_helper.h"
26 #include "helper/napi_helper.h"
27 #include "helper/object_helper.h"
28 #include "message_queue.h"
29 #include "napi/native_api.h"
30 #include "napi/native_node_api.h"
31 #include "native_engine/native_engine.h"
32 #include "qos_helper.h"
33 #include "task.h"
34 #include "task_runner.h"
35 #include "tools/log.h"
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 using namespace Commonlibrary::Concurrent::Common::Helper;
40 using namespace Commonlibrary::Platform;
41 using MsgQueue = MessageQueue<TaskResultInfo*>;
42 
43 enum class WorkerState { IDLE, RUNNING, BLOCKED };
44 
45 #if defined(ENABLE_TASKPOOL_FFRT)
46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = {
47     {Priority::IDLE, ffrt::qos_background},
48     {Priority::LOW, ffrt::qos_utility},
49     {Priority::MEDIUM, ffrt::qos_default},
50     {Priority::HIGH, ffrt::qos_user_initiated},
51 };
52 #endif
53 
54 class Worker {
55 public:
56     using DebuggerPostTask = std::function<void()>;
57 
58     static Worker* WorkerConstructor(napi_env env);
59 
60     void NotifyExecuteTask();
61 
Enqueue(napi_env env,TaskResultInfo * resultInfo)62     void Enqueue(napi_env env, TaskResultInfo* resultInfo)
63     {
64         std::lock_guard<std::mutex> lock(queueMutex_);
65         msgQueueMap_[env].EnQueue(resultInfo);
66     }
67 
Dequeue(napi_env env,MsgQueue * & queue)68     void Dequeue(napi_env env, MsgQueue*& queue)
69     {
70         std::lock_guard<std::mutex> lock(queueMutex_);
71         auto item = msgQueueMap_.find(env);
72         if (item != msgQueueMap_.end()) {
73             queue = &(item->second);
74         }
75     }
76 
77     void NotifyTaskBegin();
78     // the function will only be called when the task is finished or
79     // exits abnormally, so we can not put it in the scope directly
80     void NotifyTaskFinished();
81     static void NotifyTaskResult(napi_env env, Task* task, napi_value result);
82     static void NotifyHandleTaskResult(Task* task);
83 
84 #if defined(ENABLE_TASKPOOL_FFRT)
85     bool IsLoopActive();
86     uint64_t GetWaitTime();
87 #endif
88 
89 private:
Worker(napi_env env)90     explicit Worker(napi_env env) : hostEnv_(env) {};
91 
92     ~Worker() = default;
93 
94     Worker(const Worker &) = delete;
95     Worker& operator=(const Worker &) = delete;
96     Worker(Worker &&) = delete;
97     Worker& operator=(Worker &&) = delete;
98 
99     void NotifyIdle();
100     void NotifyWorkerCreated();
NotifyTaskRunning()101     void NotifyTaskRunning()
102     {
103         state_ = WorkerState::RUNNING;
104         startTime_ = ConcurrentHelper::GetMilliseconds();
105         runningCount_++;
106     }
107 
108 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
109     static void HandleDebuggerTask(const uv_async_t* req);
110     void DebuggerOnPostTask(std::function<void()>&& task);
111 #endif
112 
GetWorkerLoop()113     uv_loop_t* GetWorkerLoop() const
114     {
115         if (workerEnv_ != nullptr) {
116             return NapiHelper::GetLibUV(workerEnv_);
117         }
118         return nullptr;
119     }
120 
RunLoop()121     void RunLoop() const
122     {
123         uv_loop_t* loop = GetWorkerLoop();
124         if (loop != nullptr) {
125             uv_run(loop, UV_RUN_DEFAULT);
126         } else {
127             HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop");
128             return;
129         }
130     }
131 
132     // we will use the scope to manage resources automatically,
133     // including the HandleScope and NotifyRunning/NotifyIdle
134     class RunningScope {
135     public:
RunningScope(Worker * worker)136         explicit RunningScope(Worker* worker) : worker_(worker)
137         {
138             napi_open_handle_scope(worker_->workerEnv_, &scope_);
139             worker_->idleState_ = false;
140             worker->isExecutingLongTask_ = false;
141             worker_->NotifyTaskRunning();
142         }
143 
144         ~RunningScope();
145 
146     private:
147         Worker* worker_ = nullptr;
148         napi_handle_scope scope_ = nullptr;
149     };
150 
151     // use PriorityScope to manage the priority setting of workers
152     // reset qos_user_initiated when exit PriorityScope
153     class PriorityScope {
154     public:
155         PriorityScope(Worker* worker, Priority taskPriority);
~PriorityScope()156         ~PriorityScope()
157         {
158             worker_->ResetWorkerPriority();
159         }
160 
161     private:
162         Worker* worker_ = nullptr;
163     };
164 
165     void StartExecuteInThread();
166     static void ExecuteInThread(const void* data);
167     bool PrepareForWorkerInstance();
168     void ReleaseWorkerThreadContent();
169     void ResetWorkerPriority();
170     bool CheckFreeConditions();
171     bool UpdateWorkerState(WorkerState expect, WorkerState desired);
172     void StoreTaskId(uint64_t taskId);
173     bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task);
174     void UpdateExecutedInfo();
175     void UpdateLongTaskInfo(Task* task);
176     bool IsExecutingLongTask();
177     bool HasLongTask();
178     void TerminateTask(uint64_t taskId);
179 
180     static void HandleFunctionException(napi_env env, Task* task);
181     static void PerformTask(const uv_async_t* req);
182     static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data);
183     static void ReleaseWorkerHandles(const uv_async_t* req);
184     static void TriggerGCCheck(const uv_async_t* req);
185 
186 #if defined(ENABLE_TASKPOOL_FFRT)
187     void InitFfrtInfo();
188     void InitLoopHandleNum();
189 #endif
190 
191     napi_env hostEnv_ {nullptr};
192     napi_env workerEnv_ {nullptr};
193     uv_async_t* performTaskSignal_ {nullptr};
194     uv_async_t* clearWorkerSignal_ {nullptr};
195     uv_async_t* triggerGCCheckSignal_ {nullptr};
196 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
197     uv_async_t* debuggerOnPostTaskSignal_ {nullptr};
198     std::mutex debuggerMutex_;
199     std::queue<DebuggerPostTask> debuggerQueue_ {};
200 #endif
201     std::unique_ptr<TaskRunner> runner_ {nullptr};
202 
203     std::atomic<int32_t> runningCount_ = 0;
204     std::atomic<bool> idleState_ = true; // true means the worker is idle
205     std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds();
206     std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds();
207     std::atomic<WorkerState> state_ {WorkerState::IDLE};
208     std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks
209     Priority priority_ {Priority::DEFAULT};
210     pid_t tid_ = 0;
211     std::vector<uint64_t> currentTaskId_ {};
212     std::mutex currentTaskIdMutex_;
213     MessageQueue<TaskResultInfo*> hostMessageQueue_ {};
214     uint64_t lastCpuTime_ = 0;
215     uint32_t idleCount_ = 0;
216     std::atomic<bool> hasLongTask_ = false;
217     std::atomic<bool> isExecutingLongTask_ = false;
218     std::mutex longMutex_;
219     std::unordered_set<uint64_t> longTasksSet_ {};
220     std::mutex queueMutex_; // for sendData
221     std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {};
222     friend class TaskManager;
223     friend class NativeEngineTest;
224 
225 #if defined(ENABLE_TASKPOOL_FFRT)
226     void* ffrtTaskHandle_ = nullptr;
227     uint32_t initActiveHandleNum_ = 0;
228 #endif
229 };
230 } // namespace Commonlibrary::Concurrent::TaskPoolModule
231 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
232