1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
18 
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <set>
23 #include <shared_mutex>
24 #include <string>
25 #include <uv.h>
26 
27 #include "helper/concurrent_helper.h"
28 #include "napi/native_api.h"
29 #include "utils.h"
30 #include "tools/log.h"
31 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
32 #include "event_handler.h"
33 #endif
34 
35 #if defined(ENABLE_TASKPOOL_FFRT)
36 #include "c/executor_task.h"
37 #include "ffrt_inner.h"
38 #endif
39 
40 namespace Commonlibrary::Concurrent::TaskPoolModule {
41 using namespace Commonlibrary::Platform;
42 
43 enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING};
44 enum TaskType { TASK, FUNCTION_TASK, SEQRUNNER_TASK, COMMON_TASK, GROUP_COMMON_TASK, GROUP_FUNCTION_TASK };
45 
46 struct GroupInfo;
47 class Worker;
48 struct TaskInfo {
49     napi_deferred deferred = nullptr;
50     Priority priority {Priority::DEFAULT};
51     void* serializationFunction = nullptr;
52     void* serializationArguments = nullptr;
53 };
54 
55 #if defined(ENABLE_TASKPOOL_FFRT)
56 #define RECURSIVE_MUTEX ffrt::recursive_mutex
57 #else
58 #define RECURSIVE_MUTEX std::recursive_mutex
59 #endif
60 
61 struct ListenerCallBackInfo {
ListenerCallBackInfoListenerCallBackInfo62     ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env),
63         callbackRef_(callbackRef), taskError_(taskError) {}
~ListenerCallBackInfoListenerCallBackInfo64     ~ListenerCallBackInfo()
65     {
66         napi_delete_reference(env_, callbackRef_);
67     }
68     napi_env env_;
69     napi_ref callbackRef_;
70     napi_value taskError_;
71 };
72 
73 class Task {
74 public:
75     Task(napi_env env, TaskType taskType, std::string name);
76     Task() = default;
77     ~Task() = default;
78 
79     static napi_value TaskConstructor(napi_env env, napi_callback_info cbinfo);
80     static napi_value LongTaskConstructor(napi_env env, napi_callback_info cbinfo);
81     static napi_value SetTransferList(napi_env env, napi_callback_info cbinfo);
82     static napi_value SetCloneList(napi_env env, napi_callback_info cbinfo);
83     static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo);
84     static napi_value OnReceiveData(napi_env env, napi_callback_info cbinfo);
85     static napi_value SendData(napi_env env, napi_callback_info cbinfo);
86     static napi_value AddDependency(napi_env env, napi_callback_info cbinfo);
87     static napi_value RemoveDependency(napi_env env, napi_callback_info cbinfo);
88     static napi_value OnEnqueued(napi_env env, napi_callback_info cbinfo);
89     static napi_value OnStartExecution(napi_env env, napi_callback_info cbinfo);
90     static napi_value OnExecutionFailed(napi_env env, napi_callback_info cbinfo);
91     static napi_value OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo);
92     static napi_value IsDone(napi_env env, napi_callback_info cbinfo);
93     static napi_value GetTotalDuration(napi_env env, napi_callback_info info);
94     static napi_value GetCPUDuration(napi_env env, napi_callback_info info);
95     static napi_value GetIODuration(napi_env env, napi_callback_info info);
96     static napi_value GetTaskDuration(napi_env env, napi_callback_info& info, std::string durationType);
97     static napi_value GetName(napi_env env, [[maybe_unused]] napi_callback_info info);
98 
99     static Task* GenerateTask(napi_env env, napi_value task, napi_value func,
100                               napi_value name, napi_value* args, size_t argc);
101     static Task* GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type);
102     static TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
103                                       napi_value transferList, napi_value cloneList, Priority priority,
104                                       bool defaultTransfer = true, bool defaultCloneSendable = false);
105     static void TaskDestructor(napi_env env, void* data, void* hint);
106 
107     static void ThrowNoDependencyError(napi_env env);
108     static void StartExecutionCallback(const uv_async_t* req);
109     static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo);
110     static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo);
111     static void CleanupHookFunc(void* arg);
112 
113     void StoreTaskId(uint64_t taskId);
114     napi_value GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType = TaskType::COMMON_TASK,
115                                   Priority priority = Priority::DEFAULT);
116     TaskInfo* GetTaskInfo(napi_env env, napi_value task, Priority priority);
117     void UpdateTaskType(TaskType taskType);
118     void UpdatePeriodicTask();
119     bool IsRepeatableTask() const;
120     bool IsGroupTask() const;
121     bool IsGroupCommonTask() const;
122     bool IsGroupFunctionTask() const;
123     bool IsCommonTask() const;
124     bool IsSeqRunnerTask() const;
125     bool IsFunctionTask() const;
126     bool IsLongTask() const;
127     bool IsPeriodicTask() const;
128     bool IsMainThreadTask() const;
129     bool IsExecuted() const;
130     void IncreaseRefCount();
131     void DecreaseRefCount();
132     bool IsReadyToHandle() const;
133     void NotifyPendingTask();
134     void CancelPendingTask(napi_env env);
135     bool UpdateTask(uint64_t startTime, void* worker);
136     napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args);
137     void StoreTaskDuration();
138     bool CanForSequenceRunner(napi_env env);
139     bool CanForTaskGroup(napi_env env);
140     bool CanExecute(napi_env env);
141     bool CanExecuteDelayed(napi_env env);
142     bool CanExecutePeriodically(napi_env env);
143     void SetHasDependency(bool hasDependency);
144     bool HasDependency() const;
145     void TryClearHasDependency();
146     void ClearDelayedTimers();
147     void IncreaseTaskRefCount();
148     void DecreaseTaskRefCount();
149     bool ShouldDeleteTask(bool needUnref = true);
150     bool VerifyAndPostResult(Priority priority);
151     bool CheckStartExecution(Priority priority);
152     bool IsValid();
153     void SetValid(bool isValid);
154 
155 private:
156     Task(const Task &) = delete;
157     Task& operator=(const Task &) = delete;
158     Task(Task &&) = delete;
159     Task& operator=(Task &&) = delete;
160 
161     void InitHandle(napi_env env);
162 
163 public:
164     napi_env env_ = nullptr;
165     TaskType taskType_ {TaskType::TASK};
166     std::string name_ {};
167     uint64_t taskId_ {};
168     std::atomic<ExecuteState> taskState_ {ExecuteState::NOT_FOUND};
169     uint64_t groupId_ {}; // 0 for task outside taskgroup
170     uint64_t seqRunnerId_ {}; // 0 for task without seqRunner
171     TaskInfo* currentTaskInfo_ {};
172     std::list<TaskInfo*> pendingTaskInfos_ {}; // for a common task executes multiple times
173     void* result_ = nullptr;
174     uv_async_t* onResultSignal_ = nullptr;
175     std::atomic<bool> success_ {true};
176     std::atomic<uint64_t> startTime_ {};
177     std::atomic<uint64_t> cpuTime_ {};
178     std::atomic<uint64_t> ioTime_ {};
179     void* worker_ {nullptr};
180     napi_ref taskRef_ {};
181     std::atomic<uint32_t> taskRefCount_ {};
182     RECURSIVE_MUTEX taskMutex_ {};
183     bool hasDependency_ {false};
184     bool isLongTask_ {false};
185     std::atomic<bool> isValid_ {true};
186     std::atomic<uint32_t> refCount_ {false}; // when refCount_ is 0, the task pointer can be deleted
187     uv_async_t* onStartExecutionSignal_ = nullptr;
188     ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr;
189     ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr;
190     ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr;
191     ListenerCallBackInfo* onExecutionSucceededCallBackInfo_ = nullptr;
192 
193     // for periodic task
194     bool isPeriodicTask_ {false};
195     // periodic task first Generate TaskInfo
196     std::atomic<bool> isFirstTaskInfo_ {false};
197     uv_timer_t* timer_ {nullptr};
198     Priority periodicTaskPriority_ {Priority::DEFAULT};
199 
200     std::set<uv_timer_t*> delayedTimers_ {}; // task delayed timer
201 
202     bool isMainThreadTask_ {false};
203     std::atomic<bool> isRunning_ {false};
204 };
205 
206 struct CallbackInfo {
CallbackInfoCallbackInfo207     CallbackInfo(napi_env env, uint32_t count, napi_ref ref, Task* task)
208         : hostEnv(env), refCount(count), callbackRef(ref), task(task), onCallbackSignal(nullptr), worker(nullptr) {}
~CallbackInfoCallbackInfo209     ~CallbackInfo()
210     {
211         napi_delete_reference(hostEnv, callbackRef);
212 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
213         if (task == nullptr) {
214             return;
215         }
216         if (!task->IsMainThreadTask() && onCallbackSignal != nullptr) {
217             Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal);
218         }
219 #else
220         if (onCallbackSignal != nullptr) {
221             Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal);
222         }
223 #endif
224     }
225 
226     napi_env hostEnv;
227     uint32_t refCount;
228     napi_ref callbackRef;
229     Task* task;
230     uv_async_t* onCallbackSignal;
231     Worker* worker;
232 };
233 
234 struct TaskResultInfo {
TaskResultInfoTaskResultInfo235     TaskResultInfo(napi_env env, napi_env curEnv, uint64_t id, void* args) : hostEnv(env), workerEnv(curEnv),
236         taskId(id), serializationArgs(args) {}
237     ~TaskResultInfo() = default;
238 
239     napi_env hostEnv;
240     napi_env workerEnv;
241     uint64_t taskId;
242     void* serializationArgs;
243 };
244 } // namespace Commonlibrary::Concurrent::TaskPoolModule
245 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
246