1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "napi/native_api.h" 30 #include "sequence_runner.h" 31 #include "task.h" 32 #include "task_queue.h" 33 #include "task_group.h" 34 #include "worker.h" 35 36 namespace Commonlibrary::Concurrent::TaskPoolModule { 37 using namespace Commonlibrary::Concurrent::Common; 38 39 static constexpr char ARGUMENTS_STR[] = "arguments"; 40 static constexpr char NAME[] = "name"; 41 static constexpr char FUNCTION_STR[] = "function"; 42 static constexpr char GROUP_ID_STR[] = "groupId"; 43 static constexpr char TASKID_STR[] = "taskId"; 44 static constexpr char TASKINFO_STR[] = "taskInfo"; 45 static constexpr char TRANSFERLIST_STR[] = "transferList"; 46 static constexpr char CLONE_LIST_STR[] = "cloneList"; 47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 49 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 50 static constexpr char TASK_IO_TIME[] = "ioDuration"; 51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 54 55 class TaskGroup; 56 57 class TaskManager { 58 public: 59 static TaskManager& GetInstance(); 60 61 void StoreTask(uint64_t taskId, Task* task); 62 void RemoveTask(uint64_t taskId); 63 Task* GetTask(uint64_t taskId); 64 void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT); 65 void EraseWaitingTaskId(uint64_t taskId, Priority priority); 66 std::pair<uint64_t, Priority> DequeueTaskId(); 67 void CancelTask(napi_env env, uint64_t taskId); 68 void CancelSeqRunnerTask(napi_env env, Task* task); 69 void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true); 70 71 // for worker state 72 void NotifyWorkerIdle(Worker* worker); 73 void NotifyWorkerCreated(Worker* worker); 74 void NotifyWorkerRunning(Worker* worker); 75 void RemoveWorker(Worker* worker); 76 void RestoreWorker(Worker* worker); 77 78 // for load balance 79 void InitTaskManager(napi_env env); 80 void UpdateExecutedInfo(uint64_t duration); 81 void TryTriggerExpand(); 82 83 // for taskpool state 84 uint32_t GetTaskNum(); 85 uint32_t GetIdleWorkers(); 86 uint32_t GetThreadNum(); 87 uint32_t GetRunningWorkers(); 88 uint32_t GetTimeoutWorkers(); 89 void GetIdleWorkersList(uint32_t step); 90 bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size); 91 92 // for get thread info 93 napi_value GetThreadInfos(napi_env env); 94 95 // for get task info 96 napi_value GetTaskInfos(napi_env env); 97 98 // for get task name 99 std::string GetTaskName(uint64_t taskId); 100 101 // for countTrace for worker 102 void CountTraceForWorker(); 103 104 std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId); 105 void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo); 106 void IncreaseRefCount(uint64_t taskId); 107 void DecreaseRefCount(napi_env env, uint64_t taskId); 108 napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task); 109 MsgQueue* GetMessageQueue(const uv_async_t* req); 110 MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo); 111 void ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo); 112 113 // for task dependency 114 bool IsDependendByTaskId(uint64_t taskId); 115 bool IsDependentByTaskId(uint64_t dependentTaskId); 116 void NotifyDependencyTaskInfo(uint64_t taskId); 117 void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId); 118 bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet); 119 bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId); 120 bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId); 121 void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority); 122 std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId); 123 void RemovePendingTaskInfo(uint64_t taskId); 124 void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId); 125 void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId); 126 std::string GetTaskDependInfoToString(uint64_t taskId); 127 128 bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT); 129 130 // for duration 131 void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 132 uint64_t GetTaskDuration(uint64_t taskId, std::string durationType); 133 void RemoveTaskDuration(uint64_t taskId); 134 void StoreLongTaskInfo(uint64_t taskId, Worker* worker); 135 void RemoveLongTaskInfo(uint64_t taskId); 136 void TerminateTask(uint64_t taskId); 137 Worker* GetLongTaskInfo(uint64_t taskId); 138 139 // for callback 140 void ReleaseCallBackInfo(Task* task); 141 142 void UpdateSystemAppFlag(); IsSystemApp()143 bool IsSystemApp() const 144 { 145 return isSystemApp_; 146 } EnableFfrt()147 bool EnableFfrt() const 148 { 149 return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); 150 } 151 152 bool CheckTask(uint64_t taskId); 153 154 private: 155 TaskManager(); 156 ~TaskManager(); 157 TaskManager(const TaskManager &) = delete; 158 TaskManager& operator=(const TaskManager &) = delete; 159 TaskManager(TaskManager &&) = delete; 160 TaskManager& operator=(TaskManager &&) = delete; 161 162 void CreateWorkers(napi_env env, uint32_t num = 1); 163 void NotifyExecuteTask(); 164 void NotifyWorkerAdded(Worker* worker); 165 166 // for load balance 167 void RunTaskManager(); 168 void CheckForBlockedWorkers(); 169 void TryExpand(); 170 void NotifyShrink(uint32_t targetNum); 171 void TriggerShrink(uint32_t step); 172 uint32_t ComputeSuitableThreadNum(); 173 uint32_t ComputeSuitableIdleNum(); 174 static void NotifyExpand(const uv_async_t* req); 175 static void TriggerLoadBalance(const uv_timer_t* req = nullptr); 176 177 bool IsChooseIdle(); 178 uint32_t GetNonIdleTaskNum(); 179 std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority); 180 void IncreaseNumIfNoIdle(Priority priority); 181 void DecreaseNumIfNoIdle(Priority priority); 182 183 // <taskId, Task> 184 std::unordered_map<uint64_t, Task*> tasks_ {}; 185 RECURSIVE_MUTEX tasksMutex_; 186 187 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 188 std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {}; 189 std::shared_mutex dependTaskInfosMutex_; 190 191 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 192 std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {}; 193 std::shared_mutex dependentTaskInfosMutex_; 194 195 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 196 std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {}; 197 std::shared_mutex pendingTaskInfosMutex_; 198 199 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 200 std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 201 std::shared_mutex taskDurationInfosMutex_; 202 203 // record the longTasks and workers for efficiency 204 std::unordered_map<uint64_t, Worker*> longTasksMap_ {}; 205 std::shared_mutex longTasksMutex_{}; 206 207 std::unordered_set<Worker*> workers_ {}; 208 std::unordered_set<Worker*> idleWorkers_ {}; 209 std::unordered_set<Worker*> timeoutWorkers_ {}; 210 RECURSIVE_MUTEX workersMutex_; 211 212 // for load balance 213 napi_env hostEnv_ = nullptr; 214 uv_loop_t* loop_ = nullptr; 215 uv_timer_t* timer_ = nullptr; 216 uv_async_t* expandHandle_ = nullptr; 217 std::atomic<bool> suspend_ = false; 218 std::atomic<uint32_t> retryCount_ = 0; 219 std::atomic<uint32_t> nonIdleTaskNum_ = 0; 220 std::atomic<uint32_t> totalExecCount_ = 0; 221 std::atomic<uint64_t> totalExecTime_ = 0; 222 std::atomic<bool> needChecking_ = false; 223 std::atomic<bool> isHandleInited_ = false; 224 225 // for task priority 226 uint32_t highPrioExecuteCount_ = 0; 227 uint32_t mediumPrioExecuteCount_ = 0; 228 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 229 std::mutex taskQueuesMutex_; 230 231 std::atomic<bool> isInitialized_ = false; 232 std::atomic<bool> isSystemApp_ = false; 233 int disableFfrtFlag_ = 0; // 0 means enable ffrt 234 int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt 235 236 std::mutex callbackMutex_; 237 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 238 std::vector<Worker*> freeList_ {}; 239 240 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 241 std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {}; 242 #endif 243 244 friend class TaskGroupManager; 245 friend class NativeEngineTest; 246 }; 247 248 class TaskGroupManager { 249 public: 250 TaskGroupManager() = default; 251 ~TaskGroupManager() = default; 252 253 static TaskGroupManager &GetInstance(); 254 255 void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId); 256 void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); 257 void RemoveTaskGroup(uint64_t groupId); 258 TaskGroup* GetTaskGroup(uint64_t groupId); 259 void CancelGroup(napi_env env, uint64_t groupId); 260 void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group); 261 void ReleaseTaskGroupData(napi_env env, TaskGroup* group); 262 bool UpdateGroupState(uint64_t groupId); 263 264 void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task); 265 bool TriggerSeqRunner(napi_env env, Task* lastTask); 266 void DisposeCanceledTask(napi_env env, Task* task); 267 void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner); 268 void RemoveSequenceRunner(uint64_t seqRunnerId); 269 SequenceRunner* GetSeqRunner(uint64_t seqRunnerId); 270 271 private: 272 TaskGroupManager(const TaskGroupManager &) = delete; 273 TaskGroupManager& operator=(const TaskGroupManager &) = delete; 274 TaskGroupManager(TaskGroupManager &&) = delete; 275 TaskGroupManager& operator=(TaskGroupManager &&) = delete; 276 277 // <groupId, TaskGroup> 278 std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {}; 279 std::mutex taskGroupsMutex_; 280 281 // <seqRunnerId, SequenceRunner> 282 std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {}; 283 std::mutex seqRunnersMutex_; 284 friend class NativeEngineTest; 285 }; 286 287 class SequenceRunnerManager { 288 public: 289 SequenceRunnerManager() = default; 290 ~SequenceRunnerManager() = default; 291 292 static SequenceRunnerManager &GetInstance(); 293 SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc, 294 const std::string &name, uint32_t priority); 295 uint64_t DecreaseSeqCount(SequenceRunner* seqRunner); 296 void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner); 297 void RemoveSequenceRunner(const std::string &name); 298 bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner); 299 void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner); 300 bool IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner); 301 302 private: 303 SequenceRunnerManager(const SequenceRunnerManager &) = delete; 304 SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete; 305 SequenceRunnerManager(SequenceRunnerManager &&) = delete; 306 SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete; 307 308 // <<name1, seqRunner>, <name2, seqRunner>, ...> 309 std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {}; 310 std::mutex globalSeqRunnerMutex_; 311 }; 312 } // namespace Commonlibrary::Concurrent::TaskPoolModule 313 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 314