1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "napi/native_api.h"
30 #include "sequence_runner.h"
31 #include "task.h"
32 #include "task_queue.h"
33 #include "task_group.h"
34 #include "worker.h"
35 
36 namespace Commonlibrary::Concurrent::TaskPoolModule {
37 using namespace Commonlibrary::Concurrent::Common;
38 
39 static constexpr char ARGUMENTS_STR[] = "arguments";
40 static constexpr char NAME[] = "name";
41 static constexpr char FUNCTION_STR[] = "function";
42 static constexpr char GROUP_ID_STR[] = "groupId";
43 static constexpr char TASKID_STR[] = "taskId";
44 static constexpr char TASKINFO_STR[] = "taskInfo";
45 static constexpr char TRANSFERLIST_STR[] = "transferList";
46 static constexpr char CLONE_LIST_STR[] = "cloneList";
47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
49 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
50 static constexpr char TASK_IO_TIME[] = "ioDuration";
51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
54 
55 class TaskGroup;
56 
57 class TaskManager {
58 public:
59     static TaskManager& GetInstance();
60 
61     void StoreTask(uint64_t taskId, Task* task);
62     void RemoveTask(uint64_t taskId);
63     Task* GetTask(uint64_t taskId);
64     void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT);
65     void EraseWaitingTaskId(uint64_t taskId, Priority priority);
66     std::pair<uint64_t, Priority> DequeueTaskId();
67     void CancelTask(napi_env env, uint64_t taskId);
68     void CancelSeqRunnerTask(napi_env env, Task* task);
69     void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true);
70 
71     // for worker state
72     void NotifyWorkerIdle(Worker* worker);
73     void NotifyWorkerCreated(Worker* worker);
74     void NotifyWorkerRunning(Worker* worker);
75     void RemoveWorker(Worker* worker);
76     void RestoreWorker(Worker* worker);
77 
78     // for load balance
79     void InitTaskManager(napi_env env);
80     void UpdateExecutedInfo(uint64_t duration);
81     void TryTriggerExpand();
82 
83     // for taskpool state
84     uint32_t GetTaskNum();
85     uint32_t GetIdleWorkers();
86     uint32_t GetThreadNum();
87     uint32_t GetRunningWorkers();
88     uint32_t GetTimeoutWorkers();
89     void GetIdleWorkersList(uint32_t step);
90     bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size);
91 
92     // for get thread info
93     napi_value GetThreadInfos(napi_env env);
94 
95     // for get task info
96     napi_value GetTaskInfos(napi_env env);
97 
98     // for get task name
99     std::string GetTaskName(uint64_t taskId);
100 
101     // for countTrace for worker
102     void CountTraceForWorker();
103 
104     std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId);
105     void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo);
106     void IncreaseRefCount(uint64_t taskId);
107     void DecreaseRefCount(napi_env env, uint64_t taskId);
108     napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task);
109     MsgQueue* GetMessageQueue(const uv_async_t* req);
110     MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo);
111     void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo);
112 
113     // for task dependency
114     bool IsDependendByTaskId(uint64_t taskId);
115     bool IsDependentByTaskId(uint64_t dependentTaskId);
116     void NotifyDependencyTaskInfo(uint64_t taskId);
117     void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId);
118     bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet);
119     bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId);
120     bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId);
121     void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority);
122     std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId);
123     void RemovePendingTaskInfo(uint64_t taskId);
124     void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId);
125     void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId);
126     std::string GetTaskDependInfoToString(uint64_t taskId);
127 
128     bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT);
129 
130     // for duration
131     void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
132     uint64_t GetTaskDuration(uint64_t taskId, std::string durationType);
133     void RemoveTaskDuration(uint64_t taskId);
134     void StoreLongTaskInfo(uint64_t taskId, Worker* worker);
135     void RemoveLongTaskInfo(uint64_t taskId);
136     void TerminateTask(uint64_t taskId);
137     Worker* GetLongTaskInfo(uint64_t taskId);
138 
139     // for callback
140     void ReleaseCallBackInfo(Task* task);
141 
142     void UpdateSystemAppFlag();
IsSystemApp()143     bool IsSystemApp() const
144     {
145         return isSystemApp_;
146     }
EnableFfrt()147     bool EnableFfrt() const
148     {
149         return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_);
150     }
151 
152     bool CheckTask(uint64_t taskId);
153 
154 private:
155     TaskManager();
156     ~TaskManager();
157     TaskManager(const TaskManager &) = delete;
158     TaskManager& operator=(const TaskManager &) = delete;
159     TaskManager(TaskManager &&) = delete;
160     TaskManager& operator=(TaskManager &&) = delete;
161 
162     void CreateWorkers(napi_env env, uint32_t num = 1);
163     void NotifyExecuteTask();
164     void NotifyWorkerAdded(Worker* worker);
165 
166     // for load balance
167     void RunTaskManager();
168     void CheckForBlockedWorkers();
169     void TryExpand();
170     void NotifyShrink(uint32_t targetNum);
171     void TriggerShrink(uint32_t step);
172     uint32_t ComputeSuitableThreadNum();
173     uint32_t ComputeSuitableIdleNum();
174     static void NotifyExpand(const uv_async_t* req);
175     static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
176 
177     bool IsChooseIdle();
178     uint32_t GetNonIdleTaskNum();
179     std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
180     void IncreaseNumIfNoIdle(Priority priority);
181     void DecreaseNumIfNoIdle(Priority priority);
182 
183     // <taskId, Task>
184     std::unordered_map<uint64_t, Task*> tasks_ {};
185     RECURSIVE_MUTEX tasksMutex_;
186 
187     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
188     std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {};
189     std::shared_mutex dependTaskInfosMutex_;
190 
191     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
192     std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {};
193     std::shared_mutex dependentTaskInfosMutex_;
194 
195     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
196     std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {};
197     std::shared_mutex pendingTaskInfosMutex_;
198 
199     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
200     std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
201     std::shared_mutex taskDurationInfosMutex_;
202 
203     // record the longTasks and workers for efficiency
204     std::unordered_map<uint64_t, Worker*> longTasksMap_ {};
205     std::shared_mutex longTasksMutex_{};
206 
207     std::unordered_set<Worker*> workers_ {};
208     std::unordered_set<Worker*> idleWorkers_ {};
209     std::unordered_set<Worker*> timeoutWorkers_ {};
210     RECURSIVE_MUTEX workersMutex_;
211 
212     // for load balance
213     napi_env hostEnv_ = nullptr;
214     uv_loop_t* loop_ = nullptr;
215     uv_timer_t* timer_ = nullptr;
216     uv_async_t* expandHandle_ = nullptr;
217     std::atomic<bool> suspend_ = false;
218     std::atomic<uint32_t> retryCount_ = 0;
219     std::atomic<uint32_t> nonIdleTaskNum_ = 0;
220     std::atomic<uint32_t> totalExecCount_ = 0;
221     std::atomic<uint64_t> totalExecTime_ = 0;
222     std::atomic<bool> needChecking_ = false;
223     std::atomic<bool> isHandleInited_ = false;
224 
225     // for task priority
226     uint32_t highPrioExecuteCount_ = 0;
227     uint32_t mediumPrioExecuteCount_ = 0;
228     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
229     std::mutex taskQueuesMutex_;
230 
231     std::atomic<bool> isInitialized_ = false;
232     std::atomic<bool> isSystemApp_ = false;
233     int disableFfrtFlag_ = 0; // 0 means enable ffrt
234     int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt
235 
236     std::mutex callbackMutex_;
237     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
238     std::vector<Worker*> freeList_ {};
239 
240 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
241     std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {};
242 #endif
243 
244     friend class TaskGroupManager;
245     friend class NativeEngineTest;
246 };
247 
248 class TaskGroupManager {
249 public:
250     TaskGroupManager() = default;
251     ~TaskGroupManager() = default;
252 
253     static TaskGroupManager &GetInstance();
254 
255     void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId);
256     void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup);
257     void RemoveTaskGroup(uint64_t groupId);
258     TaskGroup* GetTaskGroup(uint64_t groupId);
259     void CancelGroup(napi_env env, uint64_t groupId);
260     void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group);
261     void ReleaseTaskGroupData(napi_env env, TaskGroup* group);
262     bool UpdateGroupState(uint64_t groupId);
263 
264     void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task);
265     bool TriggerSeqRunner(napi_env env, Task* lastTask);
266     void DisposeCanceledTask(napi_env env, Task* task);
267     void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner);
268     void RemoveSequenceRunner(uint64_t seqRunnerId);
269     SequenceRunner* GetSeqRunner(uint64_t seqRunnerId);
270 
271 private:
272     TaskGroupManager(const TaskGroupManager &) = delete;
273     TaskGroupManager& operator=(const TaskGroupManager &) = delete;
274     TaskGroupManager(TaskGroupManager &&) = delete;
275     TaskGroupManager& operator=(TaskGroupManager &&) = delete;
276 
277     // <groupId, TaskGroup>
278     std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {};
279     std::mutex taskGroupsMutex_;
280 
281     // <seqRunnerId, SequenceRunner>
282     std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {};
283     std::mutex seqRunnersMutex_;
284     friend class NativeEngineTest;
285 };
286 
287 class SequenceRunnerManager {
288 public:
289     SequenceRunnerManager() = default;
290     ~SequenceRunnerManager() = default;
291 
292     static SequenceRunnerManager &GetInstance();
293     SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
294                                             const std::string &name, uint32_t priority);
295     uint64_t DecreaseSeqCount(SequenceRunner* seqRunner);
296     void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner);
297     void RemoveSequenceRunner(const std::string &name);
298     bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner);
299     void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner);
300     bool IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner);
301 
302 private:
303     SequenceRunnerManager(const SequenceRunnerManager &) = delete;
304     SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete;
305     SequenceRunnerManager(SequenceRunnerManager &&) = delete;
306     SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete;
307 
308     // <<name1, seqRunner>, <name2, seqRunner>, ...>
309     std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {};
310     std::mutex globalSeqRunnerMutex_;
311 };
312 } // namespace Commonlibrary::Concurrent::TaskPoolModule
313 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
314