1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21 
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
34 #include "helper/concurrent_helper.h"
35 #include "helper/error_helper.h"
36 #include "helper/hitrace_helper.h"
37 #include "taskpool.h"
38 #include "tools/log.h"
39 #include "worker.h"
40 
41 namespace Commonlibrary::Concurrent::TaskPoolModule {
42 using namespace OHOS::JsSysModule;
43 
44 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
45 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
46 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
47 static constexpr uint32_t STEP_SIZE = 2;
48 static constexpr uint32_t DEFAULT_THREADS = 3;
49 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
50 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
51 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
52 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
53 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
54 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
55 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
56 
57 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
58 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
59     {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
60     {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
61     {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
62     {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
63 };
64 #endif
65 
66 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()67 TaskManager& TaskManager::GetInstance()
68 {
69     static TaskManager manager;
70     return manager;
71 }
72 
TaskManager()73 TaskManager::TaskManager()
74 {
75     for (size_t i = 0; i < taskQueues_.size(); i++) {
76         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
77         taskQueues_[i] = std::move(taskQueue);
78     }
79 }
80 
~TaskManager()81 TaskManager::~TaskManager()
82 {
83     HILOG_INFO("taskpool:: ~TaskManager");
84     if (timer_ == nullptr) {
85         HILOG_ERROR("taskpool:: timer_ is nullptr");
86     } else {
87         uv_timer_stop(timer_);
88         ConcurrentHelper::UvHandleClose(timer_);
89         ConcurrentHelper::UvHandleClose(expandHandle_);
90     }
91 
92     if (loop_ != nullptr) {
93         uv_stop(loop_);
94     }
95 
96     {
97         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
98         for (auto& worker : workers_) {
99             delete worker;
100         }
101         workers_.clear();
102     }
103 
104     {
105         std::lock_guard<std::mutex> lock(callbackMutex_);
106         for (auto& [_, callbackPtr] : callbackTable_) {
107             if (callbackPtr == nullptr) {
108                 continue;
109             }
110             callbackPtr.reset();
111         }
112         callbackTable_.clear();
113     }
114 
115     {
116         std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
117         for (auto& [_, task] : tasks_) {
118             delete task;
119             task = nullptr;
120         }
121         tasks_.clear();
122     }
123     CountTraceForWorker();
124 }
125 
CountTraceForWorker()126 void TaskManager::CountTraceForWorker()
127 {
128     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
129     int64_t threadNum = static_cast<int64_t>(workers_.size());
130     int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
131     int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
132     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
133     HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
134     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
135     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
136 }
137 
GetThreadInfos(napi_env env)138 napi_value TaskManager::GetThreadInfos(napi_env env)
139 {
140     napi_value threadInfos = nullptr;
141     napi_create_array(env, &threadInfos);
142     {
143         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
144         int32_t i = 0;
145         for (auto& worker : workers_) {
146             if (worker->workerEnv_ == nullptr) {
147                 continue;
148             }
149             napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150             napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
151 
152             napi_value taskId = nullptr;
153             napi_create_array(env, &taskId);
154             int32_t j = 0;
155             {
156                 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157                 for (auto& currentId : worker->currentTaskId_) {
158                     napi_value id = NapiHelper::CreateUint32(env, currentId);
159                     napi_set_element(env, taskId, j, id);
160                     j++;
161                 }
162             }
163             napi_value threadInfo = nullptr;
164             napi_create_object(env, &threadInfo);
165             napi_set_named_property(env, threadInfo, "tid", tid);
166             napi_set_named_property(env, threadInfo, "priority", priority);
167             napi_set_named_property(env, threadInfo, "taskIds", taskId);
168             napi_set_element(env, threadInfos, i, threadInfo);
169             i++;
170         }
171     }
172     return threadInfos;
173 }
174 
GetTaskInfos(napi_env env)175 napi_value TaskManager::GetTaskInfos(napi_env env)
176 {
177     napi_value taskInfos = nullptr;
178     napi_create_array(env, &taskInfos);
179     {
180         std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
181         int32_t i = 0;
182         for (const auto& [_, task] : tasks_) {
183             if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
184                 task->taskState_ == ExecuteState::FINISHED) {
185                 continue;
186             }
187             napi_value taskInfoValue = NapiHelper::CreateObject(env);
188             std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
189             napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
190             napi_value name = nullptr;
191             napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
192             napi_set_named_property(env, taskInfoValue, "name", name);
193             ExecuteState state = task->taskState_;
194             uint64_t duration = 0;
195             if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
196                 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
197             }
198             napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
199             napi_set_named_property(env, taskInfoValue, "taskId", taskId);
200             napi_set_named_property(env, taskInfoValue, "state", stateValue);
201             napi_value durationValue = NapiHelper::CreateUint32(env, duration);
202             napi_set_named_property(env, taskInfoValue, "duration", durationValue);
203             napi_set_element(env, taskInfos, i, taskInfoValue);
204             i++;
205         }
206     }
207     return taskInfos;
208 }
209 
UpdateExecutedInfo(uint64_t duration)210 void TaskManager::UpdateExecutedInfo(uint64_t duration)
211 {
212     totalExecTime_ += duration;
213     totalExecCount_++;
214 }
215 
ComputeSuitableThreadNum()216 uint32_t TaskManager::ComputeSuitableThreadNum()
217 {
218     uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
219     return targetNum;
220 }
221 
ComputeSuitableIdleNum()222 uint32_t TaskManager::ComputeSuitableIdleNum()
223 {
224     uint32_t targetNum = 0;
225     if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
226         // this branch is used for avoiding time-consuming tasks that may block the taskpool
227         targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
228     } else if (totalExecCount_ != 0) {
229         auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
230         uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
231         targetNum = std::min(result, GetNonIdleTaskNum());
232     }
233     return targetNum;
234 }
235 
CheckForBlockedWorkers()236 void TaskManager::CheckForBlockedWorkers()
237 {
238     // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
239     // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
240     // else we will choose the longer one
241     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
242     bool needChecking = false;
243     bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
244     uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
245     for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
246         auto worker = *iter;
247         // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248         // if the worker is executing the longTask, we will not do the check
249         if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250             (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251             !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
252             continue;
253         }
254         // When executing the promise task, the worker state may not be updated and will be
255         // marked as 'BLOCKED', so we should exclude this situation.
256         // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
257         // the task like I/O in uv threads, we should also exclude this situation.
258         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259         if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
260             if (!workerEngine->HasWaitingRequest()) {
261                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
262             } else {
263                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264                 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
265             }
266             continue;
267         }
268 
269         HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270         // If the current worker has a longTask and is not executing, we will only interrupt it.
271         if (worker->HasLongTask()) {
272             continue;
273         }
274         needChecking = true;
275         idleWorkers_.erase(worker);
276         timeoutWorkers_.insert(worker);
277     }
278     // should trigger the check when we have marked and removed workers
279     if (UNLIKELY(needChecking)) {
280         TryExpand();
281     }
282 }
283 
TryTriggerExpand()284 void TaskManager::TryTriggerExpand()
285 {
286     // post the signal to notify the monitor thread to expand
287     if (UNLIKELY(!isHandleInited_)) {
288         NotifyExecuteTask();
289         needChecking_ = true;
290         HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
291         return;
292     }
293     uv_async_send(expandHandle_);
294 }
295 
296 #if defined(OHOS_PLATFORM)
297 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)298 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
299 {
300     char path[128]; // 128: buffer for path
301     pid_t pid = getpid();
302     ssize_t bytesLen = -1;
303     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
304     if (ret < 0) {
305         HILOG_ERROR("snprintf_s failed");
306         return false;
307     }
308     int fd = open(path, O_RDONLY | O_NONBLOCK);
309     if (UNLIKELY(fd == -1)) {
310         return false;
311     }
312     bytesLen = read(fd, buf, size - 1);
313     close(fd);
314     if (bytesLen <= 0) {
315         HILOG_ERROR("taskpool:: failed to read %{public}s", path);
316         return false;
317     }
318     buf[bytesLen] = '\0';
319     return true;
320 }
321 
GetIdleWorkers()322 uint32_t TaskManager::GetIdleWorkers()
323 {
324     char buf[4096]; // 4096: buffer for thread info
325     uint32_t idleCount = 0;
326     std::unordered_set<pid_t> tids {};
327     {
328         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
329         for (auto& worker : idleWorkers_) {
330 #if defined(ENABLE_TASKPOOL_FFRT)
331             if (worker->ffrtTaskHandle_ != nullptr) {
332                 if (worker->GetWaitTime() > 0) {
333                     idleCount++;
334                 }
335                 continue;
336             }
337 #endif
338             tids.emplace(worker->tid_);
339         }
340     }
341     // The ffrt thread does not read thread info
342     for (auto tid : tids) {
343         if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
344             continue;
345         }
346         char state;
347         if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
348             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
349             return 0;
350         }
351         if (state == 'S') {
352             idleCount++;
353         }
354     }
355     return idleCount;
356 }
357 
GetIdleWorkersList(uint32_t step)358 void TaskManager::GetIdleWorkersList(uint32_t step)
359 {
360     char buf[4096]; // 4096: buffer for thread info
361     for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363         if (worker->ffrtTaskHandle_ != nullptr) {
364             uint64_t workerWaitTime = worker->GetWaitTime();
365             bool isWorkerLoopActive = worker->IsLoopActive();
366             if (workerWaitTime == 0) {
367                 continue;
368             }
369             uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
370                 std::chrono::steady_clock::now().time_since_epoch()).count());
371             if (!isWorkerLoopActive) {
372                 freeList_.emplace_back(worker);
373             } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
374                 freeList_.emplace_back(worker);
375                 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
376             } else {
377                 HILOG_INFO("taskpool:: worker uv alive, and will be free in 2 intervals if not wake.");
378             }
379             continue;
380         }
381 #endif
382         if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
383             continue;
384         }
385         char state;
386         uint64_t utime;
387         if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
388             &state, sizeof(state), &utime) != 2) { // 2: state and utime
389             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
390             return;
391         }
392         if (state != 'S' || utime != worker->lastCpuTime_) {
393             worker->idleCount_ = 0;
394             worker->lastCpuTime_ = utime;
395             continue;
396         }
397         if (++worker->idleCount_ >= IDLE_THRESHOLD) {
398             freeList_.emplace_back(worker);
399         }
400     }
401 }
402 
TriggerShrink(uint32_t step)403 void TaskManager::TriggerShrink(uint32_t step)
404 {
405     GetIdleWorkersList(step);
406     step = std::min(step, static_cast<uint32_t>(freeList_.size()));
407     uint32_t count = 0;
408     for (size_t i = 0; i < freeList_.size(); i++) {
409         auto worker = freeList_[i];
410         if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
411             continue;
412         }
413         auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
414         if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
415             continue;
416         }
417         idleWorkers_.erase(worker);
418         HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
419         uv_async_send(worker->clearWorkerSignal_);
420         if (++count == step) {
421             break;
422         }
423     }
424     freeList_.clear();
425 }
426 #else
GetIdleWorkers()427 uint32_t TaskManager::GetIdleWorkers()
428 {
429     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
430     return idleWorkers_.size();
431 }
432 
TriggerShrink(uint32_t step)433 void TaskManager::TriggerShrink(uint32_t step)
434 {
435     for (uint32_t i = 0; i < step; i++) {
436         // try to free the worker that idle time meets the requirement
437         auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
438             auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
439             return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 && !worker->HasLongTask();
440         });
441         // remove it from all sets
442         if (iter != idleWorkers_.end()) {
443             auto worker = *iter;
444             idleWorkers_.erase(worker);
445             HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
446             uv_async_send(worker->clearWorkerSignal_);
447         }
448     }
449 }
450 #endif
451 
NotifyShrink(uint32_t targetNum)452 void TaskManager::NotifyShrink(uint32_t targetNum)
453 {
454     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
455     uint32_t workerCount = workers_.size();
456     uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
457     if (minThread == 0) {
458         HILOG_INFO("taskpool:: the system now is under low memory");
459     }
460     if (workerCount > minThread && workerCount > targetNum) {
461         targetNum = std::max(minThread, targetNum);
462         uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
463         TriggerShrink(step);
464     }
465     // remove all timeout workers
466     for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
467         if (workers_.find(*iter) == workers_.end()) {
468             HILOG_WARN("taskpool:: current worker maybe release");
469             iter = timeoutWorkers_.erase(iter);
470         } else if ((*iter)->runningCount_ == 0) {
471             HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
472             uv_async_send((*iter)->clearWorkerSignal_);
473             timeoutWorkers_.erase(iter++);
474             return;
475         } else {
476             iter++;
477         }
478     }
479     uint32_t idleNum = idleWorkers_.size();
480     // System memory state is moderate and the worker has exeuted tasks, we will try to release it
481     if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
482         auto worker = *(idleWorkers_.begin());
483         if (worker == nullptr || worker->clearWorkerSignal_ == nullptr) {
484             return;
485         }
486         if (worker->HasLongTask()) { // worker that has longTask should not be released
487             return;
488         }
489         if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
490             TriggerShrink(DEFAULT_MIN_THREADS);
491             return;
492         }
493     }
494 
495     // Create a worker for performance
496     if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
497         CreateWorkers(hostEnv_);
498     }
499     // stop the timer
500     if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
501         suspend_ = true;
502         uv_timer_stop(timer_);
503         HILOG_DEBUG("taskpool:: timer will be suspended");
504     }
505 }
506 
TriggerLoadBalance(const uv_timer_t * req)507 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
508 {
509     TaskManager& taskManager = TaskManager::GetInstance();
510     taskManager.CheckForBlockedWorkers();
511     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
512     taskManager.NotifyShrink(targetNum);
513     taskManager.CountTraceForWorker();
514 }
515 
TryExpand()516 void TaskManager::TryExpand()
517 {
518     // dispatch task in the TaskPoolManager thread
519     NotifyExecuteTask();
520     // do not trigger when there are more idleWorkers than tasks
521     uint32_t idleNum = GetIdleWorkers();
522     if (idleNum > GetNonIdleTaskNum()) {
523         return;
524     }
525     needChecking_ = false; // do not need to check
526     uint32_t targetNum = ComputeSuitableIdleNum();
527     uint32_t workerCount = 0;
528     uint32_t idleCount = 0;
529     uint32_t timeoutWorkers = 0;
530     {
531         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
532         idleCount = idleWorkers_.size();
533         workerCount = workers_.size();
534         timeoutWorkers = timeoutWorkers_.size();
535     }
536     uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
537     maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
538     if (workerCount < maxThreads && idleCount < targetNum) {
539         uint32_t step = std::min(maxThreads, targetNum) - idleCount;
540         // Prevent the total number of expanded threads from exceeding maxThreads
541         if (step + workerCount > maxThreads) {
542             step = maxThreads - workerCount;
543         }
544         CreateWorkers(hostEnv_, step);
545         HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
546             maxThreads, step, GetThreadNum());
547     }
548     if (UNLIKELY(suspend_)) {
549         suspend_ = false;
550         uv_timer_again(timer_);
551     }
552 }
553 
NotifyExpand(const uv_async_t * req)554 void TaskManager::NotifyExpand(const uv_async_t* req)
555 {
556     TaskManager& taskManager = TaskManager::GetInstance();
557     taskManager.TryExpand();
558 }
559 
RunTaskManager()560 void TaskManager::RunTaskManager()
561 {
562     loop_ = uv_loop_new();
563     if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
564         HILOG_FATAL("taskpool:: new loop failed.");
565         return;
566     }
567     ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
568     timer_ = new uv_timer_t;
569     uv_timer_init(loop_, timer_);
570     uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
571     isHandleInited_ = true;
572 #if defined IOS_PLATFORM || defined MAC_PLATFORM
573     pthread_setname_np("OS_TaskManager");
574 #else
575     pthread_setname_np(pthread_self(), "OS_TaskManager");
576 #endif
577     if (UNLIKELY(needChecking_)) {
578         needChecking_ = false;
579         uv_async_send(expandHandle_);
580     }
581     uv_run(loop_, UV_RUN_DEFAULT);
582     if (loop_ != nullptr) {
583         uv_loop_delete(loop_);
584     }
585 }
586 
CancelTask(napi_env env,uint64_t taskId)587 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
588 {
589     // 1. Cannot find taskInfo by executeId, throw error
590     // 2. Find executing taskInfo, skip it
591     // 3. Find waiting taskInfo, cancel it
592     // 4. Find canceled taskInfo, skip it
593     std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
594     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
595     HITRACE_HELPER_METER_NAME(strTrace);
596     Task* task = GetTask(taskId);
597     if (task == nullptr) {
598         std::string errMsg = "taskpool:: the task may not exist";
599         HILOG_ERROR("%{public}s", errMsg.c_str());
600         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
601         return;
602     }
603     if (task->taskState_ == ExecuteState::CANCELED) {
604         HILOG_DEBUG("taskpool:: task has been canceled");
605         return;
606     }
607     if (task->IsGroupCommonTask()) {
608         // when task is a group common task, still check the state
609         if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
610             task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
611             std::string errMsg = "taskpool:: task is not executed or has been executed";
612             HILOG_ERROR("%{public}s", errMsg.c_str());
613             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
614             return;
615         }
616         TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
617         if (taskGroup == nullptr) {
618             return;
619         }
620         return taskGroup->CancelGroupTask(env, task->taskId_);
621     }
622 
623     std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
624     if (task->IsPeriodicTask()) {
625         napi_reference_unref(env, task->taskRef_, nullptr);
626         task->CancelPendingTask(env);
627         uv_timer_stop(task->timer_);
628         ConcurrentHelper::UvHandleClose(task->timer_);
629         return;
630     } else if (task->IsSeqRunnerTask()) {
631         CancelSeqRunnerTask(env, task);
632         return;
633     }
634     if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
635         task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
636         task->taskState_ == ExecuteState::ENDING) {
637         std::string errMsg = "taskpool:: task is not executed or has been executed";
638         HILOG_ERROR("%{public}s", errMsg.c_str());
639         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
640         return;
641     }
642 
643     task->ClearDelayedTimers();
644     ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
645     task->CancelPendingTask(env);
646     if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
647         reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
648         task->DecreaseTaskRefCount();
649         DecreaseRefCount(env, task->taskId_);
650         EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
651         napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
652         napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
653         napi_reference_unref(env, task->taskRef_, nullptr);
654         delete task->currentTaskInfo_;
655         task->currentTaskInfo_ = nullptr;
656     }
657 }
658 
CancelSeqRunnerTask(napi_env env,Task * task)659 void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
660 {
661     if (task->taskState_ == ExecuteState::FINISHED) {
662         std::string errMsg = "taskpool:: sequenceRunner task has been executed";
663         HILOG_ERROR("%{public}s", errMsg.c_str());
664         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
665     } else {
666         task->taskState_ = ExecuteState::CANCELED;
667     }
668 }
669 
NotifyWorkerIdle(Worker * worker)670 void TaskManager::NotifyWorkerIdle(Worker* worker)
671 {
672     {
673         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
674         if (worker->state_ == WorkerState::BLOCKED) {
675             return;
676         }
677         idleWorkers_.insert(worker);
678     }
679     if (GetTaskNum() != 0) {
680         NotifyExecuteTask();
681     }
682     CountTraceForWorker();
683 }
684 
NotifyWorkerCreated(Worker * worker)685 void TaskManager::NotifyWorkerCreated(Worker* worker)
686 {
687     NotifyWorkerIdle(worker);
688 }
689 
NotifyWorkerAdded(Worker * worker)690 void TaskManager::NotifyWorkerAdded(Worker* worker)
691 {
692     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
693     workers_.insert(worker);
694     HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
695 }
696 
NotifyWorkerRunning(Worker * worker)697 void TaskManager::NotifyWorkerRunning(Worker* worker)
698 {
699     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
700     idleWorkers_.erase(worker);
701     CountTraceForWorker();
702 }
703 
GetRunningWorkers()704 uint32_t TaskManager::GetRunningWorkers()
705 {
706     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
707     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
708         return worker->runningCount_ != 0;
709     });
710 }
711 
GetTimeoutWorkers()712 uint32_t TaskManager::GetTimeoutWorkers()
713 {
714     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
715     return timeoutWorkers_.size();
716 }
717 
GetTaskNum()718 uint32_t TaskManager::GetTaskNum()
719 {
720     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
721     uint32_t sum = 0;
722     for (const auto& elements : taskQueues_) {
723         sum += elements->GetTaskNum();
724     }
725     return sum;
726 }
727 
GetNonIdleTaskNum()728 uint32_t TaskManager::GetNonIdleTaskNum()
729 {
730     return nonIdleTaskNum_;
731 }
732 
IncreaseNumIfNoIdle(Priority priority)733 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
734 {
735     if (priority != Priority::IDLE) {
736         ++nonIdleTaskNum_;
737     }
738 }
739 
DecreaseNumIfNoIdle(Priority priority)740 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
741 {
742     if (priority != Priority::IDLE) {
743         --nonIdleTaskNum_;
744     }
745 }
746 
GetThreadNum()747 uint32_t TaskManager::GetThreadNum()
748 {
749     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
750     return workers_.size();
751 }
752 
EnqueueTaskId(uint64_t taskId,Priority priority)753 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
754 {
755     {
756         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
757         IncreaseNumIfNoIdle(priority);
758         taskQueues_[priority]->EnqueueTaskId(taskId);
759     }
760     TryTriggerExpand();
761     Task* task = GetTask(taskId);
762     if (task == nullptr) {
763         HILOG_FATAL("taskpool:: task is nullptr");
764         return;
765     }
766     task->IncreaseTaskRefCount();
767     if (task->onEnqueuedCallBackInfo_ != nullptr) {
768         task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
769     }
770 }
771 
EraseWaitingTaskId(uint64_t taskId,Priority priority)772 void TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
773 {
774     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
775     if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
776         HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
777     }
778 }
779 
DequeueTaskId()780 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
781 {
782     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
783     auto& highTaskQueue = taskQueues_[Priority::HIGH];
784     if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
785         highPrioExecuteCount_++;
786         return GetTaskByPriority(highTaskQueue, Priority::HIGH);
787     }
788     highPrioExecuteCount_ = 0;
789 
790     auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
791     if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
792         mediumPrioExecuteCount_++;
793         return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
794     }
795     mediumPrioExecuteCount_ = 0;
796 
797     auto& lowTaskQueue = taskQueues_[Priority::LOW];
798     if (!lowTaskQueue->IsEmpty()) {
799         return GetTaskByPriority(lowTaskQueue, Priority::LOW);
800     }
801 
802     auto& idleTaskQueue = taskQueues_[Priority::IDLE];
803     if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
804         return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
805     }
806     return std::make_pair(0, Priority::LOW);
807 }
808 
IsChooseIdle()809 bool TaskManager::IsChooseIdle()
810 {
811     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
812     for (auto& worker : workers_) {
813         if (worker->state_ == WorkerState::IDLE) {
814             // If worker->state_ is WorkerState::IDLE, it means that the worker is free
815             continue;
816         }
817         // If there is a worker running a task, do not take the idle task.
818         return false;
819     }
820     // Only when all workers are free, will idle task be taken.
821     return true;
822 }
823 
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)824 std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
825     Priority priority)
826 {
827     uint64_t taskId = taskQueue->DequeueTaskId();
828     if (IsDependendByTaskId(taskId)) {
829         EnqueuePendingTaskInfo(taskId, priority);
830         return std::make_pair(0, priority);
831     }
832     DecreaseNumIfNoIdle(priority);
833     return std::make_pair(taskId, priority);
834 }
835 
NotifyExecuteTask()836 void TaskManager::NotifyExecuteTask()
837 {
838     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
839     if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
840         // When there are only idle tasks and workers executing them, it is not triggered
841         return;
842     }
843 
844     for (auto& worker : idleWorkers_) {
845         worker->NotifyExecuteTask();
846     }
847 }
848 
InitTaskManager(napi_env env)849 void TaskManager::InitTaskManager(napi_env env)
850 {
851     HITRACE_HELPER_METER_NAME("InitTaskManager");
852     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
853 #if defined(ENABLE_TASKPOOL_FFRT)
854         globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
855         if (!globalEnableFfrtFlag_) {
856             UpdateSystemAppFlag();
857             if (IsSystemApp()) {
858                 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
859             }
860         }
861         if (EnableFfrt()) {
862             HILOG_INFO("taskpool:: apps use ffrt");
863         } else {
864             HILOG_INFO("taskpool:: apps do not use ffrt");
865         }
866 #endif
867 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
868         mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
869             OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
870 #endif
871         auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
872         if (mainThreadEngine == nullptr) {
873             HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
874             return;
875         }
876         hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
877         // Add a reserved thread for taskpool
878         CreateWorkers(hostEnv_);
879         // Create a timer to manage worker threads
880         std::thread workerManager([this] {this->RunTaskManager();});
881         workerManager.detach();
882     }
883 }
884 
CreateWorkers(napi_env env,uint32_t num)885 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
886 {
887     HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
888     for (uint32_t i = 0; i < num; i++) {
889         auto worker = Worker::WorkerConstructor(env);
890         NotifyWorkerAdded(worker);
891     }
892     CountTraceForWorker();
893 }
894 
RemoveWorker(Worker * worker)895 void TaskManager::RemoveWorker(Worker* worker)
896 {
897     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
898     idleWorkers_.erase(worker);
899     timeoutWorkers_.erase(worker);
900     workers_.erase(worker);
901 }
902 
RestoreWorker(Worker * worker)903 void TaskManager::RestoreWorker(Worker* worker)
904 {
905     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
906     if (UNLIKELY(suspend_)) {
907         suspend_ = false;
908         uv_timer_again(timer_);
909     }
910     if (worker->state_ == WorkerState::BLOCKED) {
911         // since the worker is blocked, we should add it to the timeout set
912         timeoutWorkers_.insert(worker);
913         return;
914     }
915     // Since the worker may be executing some tasks in IO thread, we should add it to the
916     // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
917     HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
918     idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
919     if (GetTaskNum() != 0) {
920         NotifyExecuteTask();
921     }
922 }
923 
924 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)925 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
926 {
927     std::lock_guard<std::mutex> lock(callbackMutex_);
928     callbackTable_[taskId] = callbackInfo;
929 }
930 
GetCallbackInfo(uint64_t taskId)931 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
932 {
933     std::lock_guard<std::mutex> lock(callbackMutex_);
934     auto iter = callbackTable_.find(taskId);
935     if (iter == callbackTable_.end() || iter->second == nullptr) {
936         HILOG_ERROR("taskpool:: the callback does not exist");
937         return nullptr;
938     }
939     return iter->second;
940 }
941 
IncreaseRefCount(uint64_t taskId)942 void TaskManager::IncreaseRefCount(uint64_t taskId)
943 {
944     if (taskId == 0) { // do not support func
945         return;
946     }
947     std::lock_guard<std::mutex> lock(callbackMutex_);
948     auto iter = callbackTable_.find(taskId);
949     if (iter == callbackTable_.end() || iter->second == nullptr) {
950         return;
951     }
952     iter->second->refCount++;
953 }
954 
DecreaseRefCount(napi_env env,uint64_t taskId)955 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
956 {
957     if (taskId == 0) { // do not support func
958         return;
959     }
960     std::lock_guard<std::mutex> lock(callbackMutex_);
961     auto iter = callbackTable_.find(taskId);
962     if (iter == callbackTable_.end() || iter->second == nullptr) {
963         return;
964     }
965 
966     auto task = reinterpret_cast<Task*>(taskId);
967     if (!task->IsValid()) {
968         callbackTable_.erase(iter);
969         return;
970     }
971 
972     iter->second->refCount--;
973     if (iter->second->refCount == 0) {
974         callbackTable_.erase(iter);
975     }
976 }
977 
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)978 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
979 {
980     std::lock_guard<std::mutex> lock(callbackMutex_);
981     callbackInfo->worker = nullptr;
982 }
983 
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)984 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
985 {
986     HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
987     std::lock_guard<std::mutex> lock(callbackMutex_);
988     auto iter = callbackTable_.find(task->taskId_);
989     if (iter == callbackTable_.end() || iter->second == nullptr) {
990         HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
991         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
992         delete resultInfo;
993         return nullptr;
994     }
995     Worker* worker = static_cast<Worker*>(task->worker_);
996     worker->Enqueue(task->env_, resultInfo);
997     auto callbackInfo = iter->second;
998     callbackInfo->refCount++;
999     callbackInfo->worker = worker;
1000     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1001     workerEngine->IncreaseListeningCounter();
1002 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1003     if (task->IsMainThreadTask()) {
1004         HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1005         auto onCallbackTask = [callbackInfo]() {
1006             TaskPool::ExecuteCallbackTask(callbackInfo.get());
1007         };
1008         TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1009     } else {
1010         callbackInfo->onCallbackSignal->data = callbackInfo.get();
1011         uv_async_send(callbackInfo->onCallbackSignal);
1012     }
1013 #else
1014     callbackInfo->onCallbackSignal->data = callbackInfo.get();
1015     uv_async_send(callbackInfo->onCallbackSignal);
1016 #endif
1017     return nullptr;
1018 }
1019 
GetMessageQueue(const uv_async_t * req)1020 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1021 {
1022     std::lock_guard<std::mutex> lock(callbackMutex_);
1023     auto info = static_cast<CallbackInfo*>(req->data);
1024     if (info == nullptr || info->worker == nullptr) {
1025         HILOG_WARN("taskpool:: info or worker is nullptr");
1026         return nullptr;
1027     }
1028     auto worker = info->worker;
1029     MsgQueue* queue = nullptr;
1030     worker->Dequeue(info->hostEnv, queue);
1031     return queue;
1032 }
1033 
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1034 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1035 {
1036     std::lock_guard<std::mutex> lock(callbackMutex_);
1037     if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1038         HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1039         return nullptr;
1040     }
1041     auto worker = callbackInfo->worker;
1042     MsgQueue* queue = nullptr;
1043     worker->Dequeue(callbackInfo->hostEnv, queue);
1044     return queue;
1045 }
1046 // ---------------------------------- SendData ---------------------------------------
1047 
NotifyDependencyTaskInfo(uint64_t taskId)1048 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
1049 {
1050     HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1051     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1052     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1053     auto iter = dependentTaskInfos_.find(taskId);
1054     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1055         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1056         return;
1057     }
1058     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1059         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1060         RemoveDependencyById(taskId, *taskIdIter);
1061         taskIdIter = iter->second.erase(taskIdIter);
1062         if (taskInfo.first != 0) {
1063             EnqueueTaskId(taskInfo.first, taskInfo.second);
1064         }
1065     }
1066 }
1067 
RemoveDependencyById(uint64_t dependentTaskId,uint64_t taskId)1068 void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
1069 {
1070     HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1071     // remove dependency after task execute
1072     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1073     auto dependTaskIter = dependTaskInfos_.find(taskId);
1074     if (dependTaskIter != dependTaskInfos_.end()) {
1075         auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1076         if (dependTaskInnerIter != dependTaskIter->second.end()) {
1077             dependTaskIter->second.erase(dependTaskInnerIter);
1078         }
1079     }
1080 }
1081 
IsDependendByTaskId(uint64_t taskId)1082 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
1083 {
1084     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1085     auto iter = dependTaskInfos_.find(taskId);
1086     if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1087         return false;
1088     }
1089     return true;
1090 }
1091 
IsDependentByTaskId(uint64_t dependentTaskId)1092 bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
1093 {
1094     std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1095     auto iter = dependentTaskInfos_.find(dependentTaskId);
1096     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1097         return false;
1098     }
1099     return true;
1100 }
1101 
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)1102 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
1103 {
1104     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1105     StoreDependentTaskInfo(taskIdSet, taskId);
1106     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1107     auto iter = dependTaskInfos_.find(taskId);
1108     if (iter == dependTaskInfos_.end()) {
1109         for (const auto& dependentId : taskIdSet) {
1110             auto idIter = dependTaskInfos_.find(dependentId);
1111             if (idIter == dependTaskInfos_.end()) {
1112                 continue;
1113             }
1114             if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1115                 return false;
1116             }
1117         }
1118         dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1119         return true;
1120     }
1121 
1122     for (const auto& dependentId : iter->second) {
1123         auto idIter = dependTaskInfos_.find(dependentId);
1124         if (idIter == dependTaskInfos_.end()) {
1125             continue;
1126         }
1127         if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1128             return false;
1129         }
1130     }
1131     iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1132     return true;
1133 }
1134 
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)1135 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
1136 {
1137     for (const auto& id : idSet) {
1138         if (id == taskId) {
1139             return false;
1140         }
1141         auto iter = dependentIdSet.find(id);
1142         if (iter != dependentIdSet.end()) {
1143             continue;
1144         }
1145         auto dIter = dependTaskInfos_.find(id);
1146         if (dIter == dependTaskInfos_.end()) {
1147             continue;
1148         }
1149         if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1150             return false;
1151         }
1152     }
1153     return true;
1154 }
1155 
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)1156 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
1157 {
1158     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1159     RemoveDependentTaskInfo(dependentId, taskId);
1160     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1161     auto iter = dependTaskInfos_.find(taskId);
1162     if (iter == dependTaskInfos_.end()) {
1163         return false;
1164     }
1165     auto dependIter = iter->second.find(dependentId);
1166     if (dependIter ==  iter->second.end()) {
1167         return false;
1168     }
1169     iter->second.erase(dependIter);
1170     return true;
1171 }
1172 
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)1173 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
1174 {
1175     if (taskId == 0) {
1176         return;
1177     }
1178     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1179     pendingTaskInfos_.emplace(taskId, priority);
1180 }
1181 
DequeuePendingTaskInfo(uint64_t taskId)1182 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
1183 {
1184     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1185     if (pendingTaskInfos_.empty()) {
1186         return std::make_pair(0, Priority::DEFAULT);
1187     }
1188     std::pair<uint64_t, Priority> result;
1189     for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1190         if (it->first == taskId) {
1191             result = std::make_pair(it->first, it->second);
1192             it = pendingTaskInfos_.erase(it);
1193             break;
1194         }
1195     }
1196     return result;
1197 }
1198 
RemovePendingTaskInfo(uint64_t taskId)1199 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
1200 {
1201     HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1202     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1203     pendingTaskInfos_.erase(taskId);
1204 }
1205 
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)1206 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
1207 {
1208     HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1209     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1210     for (const auto& id : dependentTaskIdSet) {
1211         auto iter = dependentTaskInfos_.find(id);
1212         if (iter == dependentTaskInfos_.end()) {
1213             std::set<uint64_t> set{taskId};
1214             dependentTaskInfos_.emplace(id, std::move(set));
1215         } else {
1216             iter->second.emplace(taskId);
1217         }
1218     }
1219 }
1220 
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)1221 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
1222 {
1223     HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1224     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1225     auto iter = dependentTaskInfos_.find(dependentTaskId);
1226     if (iter == dependentTaskInfos_.end()) {
1227         return;
1228     }
1229     auto taskIter = iter->second.find(taskId);
1230     if (taskIter == iter->second.end()) {
1231         return;
1232     }
1233     iter->second.erase(taskIter);
1234 }
1235 
GetTaskDependInfoToString(uint64_t taskId)1236 std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
1237 {
1238     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1239     std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1240     auto iter = dependTaskInfos_.find(taskId);
1241     if (iter != dependTaskInfos_.end()) {
1242         for (const auto& id : iter->second) {
1243             str += " " + std::to_string(id);
1244         }
1245     }
1246     return str;
1247 }
1248 
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1249 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1250 {
1251     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1252     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1253     auto iter = taskDurationInfos_.find(taskId);
1254     if (iter == taskDurationInfos_.end()) {
1255         std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1256         taskDurationInfos_.emplace(taskId, std::move(durationData));
1257     } else {
1258         if (totalDuration != 0) {
1259             iter->second.first = totalDuration;
1260         }
1261         if (cpuDuration != 0) {
1262             iter->second.second = cpuDuration;
1263         }
1264     }
1265 }
1266 
GetTaskDuration(uint64_t taskId,std::string durationType)1267 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
1268 {
1269     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1270     auto iter = taskDurationInfos_.find(taskId);
1271     if (iter == taskDurationInfos_.end()) {
1272         return 0;
1273     }
1274     if (durationType == TASK_TOTAL_TIME) {
1275         return iter->second.first;
1276     } else if (durationType == TASK_CPU_TIME) {
1277         return iter->second.second;
1278     } else if (iter->second.first == 0) {
1279         return 0;
1280     }
1281     return iter->second.first - iter->second.second;
1282 }
1283 
GetTaskName(uint64_t taskId)1284 std::string TaskManager::GetTaskName(uint64_t taskId)
1285 {
1286     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1287     auto iter = tasks_.find(taskId);
1288     if (iter == tasks_.end()) {
1289         return "";
1290     }
1291     return iter->second->name_;
1292 }
1293 
RemoveTaskDuration(uint64_t taskId)1294 void TaskManager::RemoveTaskDuration(uint64_t taskId)
1295 {
1296     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1297     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1298     auto iter = taskDurationInfos_.find(taskId);
1299     if (iter != taskDurationInfos_.end()) {
1300         taskDurationInfos_.erase(iter);
1301     }
1302 }
1303 
StoreLongTaskInfo(uint64_t taskId,Worker * worker)1304 void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1305 {
1306     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1307     longTasksMap_.emplace(taskId, worker);
1308 }
1309 
RemoveLongTaskInfo(uint64_t taskId)1310 void TaskManager::RemoveLongTaskInfo(uint64_t taskId)
1311 {
1312     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1313     longTasksMap_.erase(taskId);
1314 }
1315 
GetLongTaskInfo(uint64_t taskId)1316 Worker* TaskManager::GetLongTaskInfo(uint64_t taskId)
1317 {
1318     std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1319     auto iter = longTasksMap_.find(taskId);
1320     return iter != longTasksMap_.end() ? iter->second : nullptr;
1321 }
1322 
TerminateTask(uint64_t taskId)1323 void TaskManager::TerminateTask(uint64_t taskId)
1324 {
1325     HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1326     auto worker = GetLongTaskInfo(taskId);
1327     if (UNLIKELY(worker == nullptr)) {
1328         return;
1329     }
1330     worker->TerminateTask(taskId);
1331     RemoveLongTaskInfo(taskId);
1332 }
1333 
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1334 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1335 {
1336     uint64_t taskId = task->taskId_;
1337     if (shouldDeleteTask) {
1338         RemoveTask(taskId);
1339     }
1340     if (task->onResultSignal_ != nullptr) {
1341         if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
1342             ConcurrentHelper::UvHandleClose(task->onResultSignal_);
1343         } else {
1344             delete task->onResultSignal_;
1345         }
1346         task->onResultSignal_ = nullptr;
1347     }
1348 
1349     if (task->currentTaskInfo_ != nullptr) {
1350         delete task->currentTaskInfo_;
1351         task->currentTaskInfo_ = nullptr;
1352     }
1353 
1354     task->CancelPendingTask(env);
1355 
1356     task->ClearDelayedTimers();
1357 
1358     if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1359         return;
1360     }
1361     DecreaseRefCount(env, taskId);
1362     RemoveTaskDuration(taskId);
1363     RemovePendingTaskInfo(taskId);
1364     ReleaseCallBackInfo(task);
1365     {
1366         std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1367         for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1368             if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1369                 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1370             } else {
1371                 ++dependentTaskIter;
1372             }
1373         }
1374     }
1375     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1376     auto dependTaskIter = dependTaskInfos_.find(taskId);
1377     if (dependTaskIter != dependTaskInfos_.end()) {
1378         dependTaskInfos_.erase(dependTaskIter);
1379     }
1380 }
1381 
ReleaseCallBackInfo(Task * task)1382 void TaskManager::ReleaseCallBackInfo(Task* task)
1383 {
1384     HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1385     if (task->onEnqueuedCallBackInfo_ != nullptr) {
1386         delete task->onEnqueuedCallBackInfo_;
1387         task->onEnqueuedCallBackInfo_ = nullptr;
1388     }
1389 
1390     if (task->onStartExecutionCallBackInfo_ != nullptr) {
1391         delete task->onStartExecutionCallBackInfo_;
1392         task->onStartExecutionCallBackInfo_ = nullptr;
1393     }
1394 
1395     if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1396         delete task->onExecutionFailedCallBackInfo_;
1397         task->onExecutionFailedCallBackInfo_ = nullptr;
1398     }
1399 
1400     if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1401         delete task->onExecutionSucceededCallBackInfo_;
1402         task->onExecutionSucceededCallBackInfo_ = nullptr;
1403     }
1404 
1405 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1406     if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1407         if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1408             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1409         } else {
1410             delete task->onStartExecutionSignal_;
1411         }
1412         task->onStartExecutionSignal_ = nullptr;
1413     }
1414 #else
1415     if (task->onStartExecutionSignal_ != nullptr) {
1416         if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1417             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1418         } else {
1419             delete task->onStartExecutionSignal_;
1420         }
1421         task->onStartExecutionSignal_ = nullptr;
1422     }
1423 #endif
1424 }
1425 
StoreTask(uint64_t taskId,Task * task)1426 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1427 {
1428     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1429     tasks_.emplace(taskId, task);
1430 }
1431 
RemoveTask(uint64_t taskId)1432 void TaskManager::RemoveTask(uint64_t taskId)
1433 {
1434     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1435     tasks_.erase(taskId);
1436 }
1437 
GetTask(uint64_t taskId)1438 Task* TaskManager::GetTask(uint64_t taskId)
1439 {
1440     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1441     auto iter = tasks_.find(taskId);
1442     if (iter == tasks_.end()) {
1443         return nullptr;
1444     }
1445     return iter->second;
1446 }
1447 
1448 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1449 void TaskManager::UpdateSystemAppFlag()
1450 {
1451     auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1452     if (abilityManager == nullptr) {
1453         HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1454         return;
1455     }
1456     auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1457     if (bundleObj == nullptr) {
1458         HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1459         return;
1460     }
1461     auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1462     if (bundleMgr == nullptr) {
1463         HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1464         return;
1465     }
1466     OHOS::AppExecFwk::BundleInfo bundleInfo;
1467     if (bundleMgr->GetBundleInfoForSelf(
1468         static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1469         != OHOS::ERR_OK) {
1470         HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1471         return;
1472     }
1473     isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1474 }
1475 #endif
1476 
1477 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1478 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1479 {
1480     return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1481 }
1482 #endif
1483 
CheckTask(uint64_t taskId)1484 bool TaskManager::CheckTask(uint64_t taskId)
1485 {
1486     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1487     auto item = tasks_.find(taskId);
1488     return item != tasks_.end();
1489 }
1490 
1491 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1492 TaskGroupManager& TaskGroupManager::GetInstance()
1493 {
1494     static TaskGroupManager groupManager;
1495     return groupManager;
1496 }
1497 
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1498 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1499 {
1500     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1501     auto groupIter = taskGroups_.find(groupId);
1502     if (groupIter == taskGroups_.end()) {
1503         HILOG_DEBUG("taskpool:: taskGroup has been released");
1504         return;
1505     }
1506     auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1507     if (taskGroup == nullptr) {
1508         HILOG_ERROR("taskpool:: taskGroup is null");
1509         return;
1510     }
1511     taskGroup->taskRefs_.push_back(taskRef);
1512     taskGroup->taskNum_++;
1513     taskGroup->taskIds_.push_back(taskId);
1514 }
1515 
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1516 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1517 {
1518     HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
1519     TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1520     {
1521         std::lock_guard<RECURSIVE_MUTEX> lock(group->taskGroupMutex_);
1522         if (group->isValid_) {
1523             for (uint64_t taskId : group->taskIds_) {
1524                 Task* task = TaskManager::GetInstance().GetTask(taskId);
1525                 if (task == nullptr || !task->IsValid()) {
1526                     continue;
1527                 }
1528                 napi_reference_unref(task->env_, task->taskRef_, nullptr);
1529             }
1530         }
1531 
1532         if (group->currentGroupInfo_ != nullptr) {
1533             delete group->currentGroupInfo_;
1534         }
1535     }
1536     group->CancelPendingGroup(env);
1537 }
1538 
CancelGroup(napi_env env,uint64_t groupId)1539 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1540 {
1541     std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
1542     HITRACE_HELPER_METER_NAME(strTrace);
1543     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
1544     TaskGroup* taskGroup = GetTaskGroup(groupId);
1545     if (taskGroup == nullptr) {
1546         HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1547         return;
1548     }
1549     if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1550         return;
1551     }
1552     std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
1553     if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
1554         taskGroup->groupState_ == ExecuteState::FINISHED) {
1555         std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
1556         HILOG_ERROR("%{public}s", errMsg.c_str());
1557         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
1558         return;
1559     }
1560     ExecuteState groupState = taskGroup->groupState_;
1561     taskGroup->groupState_ = ExecuteState::CANCELED;
1562     taskGroup->CancelPendingGroup(env);
1563     if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
1564         for (uint64_t taskId : taskGroup->taskIds_) {
1565             CancelGroupTask(env, taskId, taskGroup);
1566         }
1567         if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
1568             napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1569             taskGroup->RejectResult(env, error);
1570             return;
1571         }
1572     }
1573     if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
1574         auto engine = reinterpret_cast<NativeEngine*>(env);
1575         for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
1576             engine->DecreaseSubEnvCounter();
1577         }
1578         napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1579         taskGroup->RejectResult(env, error);
1580     }
1581 }
1582 
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1583 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1584 {
1585     HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
1586     auto task = TaskManager::GetInstance().GetTask(taskId);
1587     if (task == nullptr) {
1588         HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1589         return;
1590     }
1591     std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1592     if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
1593         reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1594         task->DecreaseTaskRefCount();
1595         TaskManager::GetInstance().DecreaseRefCount(env, taskId);
1596         TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1597         delete task->currentTaskInfo_;
1598         task->currentTaskInfo_ = nullptr;
1599         if (group->currentGroupInfo_ != nullptr) {
1600             group->currentGroupInfo_->finishedTaskNum++;
1601         }
1602     }
1603     task->taskState_ = ExecuteState::CANCELED;
1604 }
1605 
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1606 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1607 {
1608     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1609     seqRunners_.emplace(seqRunnerId, seqRunner);
1610 }
1611 
RemoveSequenceRunner(uint64_t seqRunnerId)1612 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1613 {
1614     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1615     seqRunners_.erase(seqRunnerId);
1616 }
1617 
GetSeqRunner(uint64_t seqRunnerId)1618 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1619 {
1620     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1621     auto iter = seqRunners_.find(seqRunnerId);
1622     if (iter != seqRunners_.end()) {
1623         return iter->second;
1624     }
1625     HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
1626     return nullptr;
1627 }
1628 
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1629 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1630 {
1631     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1632     auto iter = seqRunners_.find(seqRunnerId);
1633     if (iter == seqRunners_.end()) {
1634         HILOG_ERROR("seqRunner:: seqRunner not found.");
1635         return;
1636     } else {
1637         std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1638         iter->second->seqRunnerTasks_.push(task);
1639     }
1640 }
1641 
TriggerSeqRunner(napi_env env,Task * lastTask)1642 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1643 {
1644     uint64_t seqRunnerId = lastTask->seqRunnerId_;
1645     SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1646     if (seqRunner == nullptr) {
1647         HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1648         return false;
1649     }
1650     if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
1651         HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
1652         return false;
1653     }
1654     if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1655         HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1656         return false;
1657     }
1658     {
1659         std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1660         if (seqRunner->seqRunnerTasks_.empty()) {
1661             HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
1662             seqRunner->currentTaskId_ = 0;
1663             return true;
1664         }
1665         Task* task = seqRunner->seqRunnerTasks_.front();
1666         seqRunner->seqRunnerTasks_.pop();
1667         while (task->taskState_ == ExecuteState::CANCELED) {
1668             DisposeCanceledTask(env, task);
1669             if (seqRunner->seqRunnerTasks_.empty()) {
1670                 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
1671                             std::to_string(seqRunnerId).c_str());
1672                 seqRunner->currentTaskId_ = 0;
1673                 return true;
1674             }
1675             task = seqRunner->seqRunnerTasks_.front();
1676             seqRunner->seqRunnerTasks_.pop();
1677         }
1678         seqRunner->currentTaskId_ = task->taskId_;
1679         task->IncreaseRefCount();
1680         task->taskState_ = ExecuteState::WAITING;
1681         HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
1682                     std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
1683         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1684     }
1685     return true;
1686 }
1687 
DisposeCanceledTask(napi_env env,Task * task)1688 void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
1689 {
1690     napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
1691     napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
1692     reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1693     napi_reference_unref(env, task->taskRef_, nullptr);
1694     delete task->currentTaskInfo_;
1695     task->currentTaskInfo_ = nullptr;
1696 }
1697 
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1698 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1699 {
1700     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1701     taskGroups_.emplace(groupId, taskGroup);
1702 }
1703 
RemoveTaskGroup(uint64_t groupId)1704 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1705 {
1706     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1707     taskGroups_.erase(groupId);
1708 }
1709 
GetTaskGroup(uint64_t groupId)1710 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1711 {
1712     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1713     auto groupIter = taskGroups_.find(groupId);
1714     if (groupIter == taskGroups_.end()) {
1715         return nullptr;
1716     }
1717     return reinterpret_cast<TaskGroup*>(groupIter->second);
1718 }
1719 
UpdateGroupState(uint64_t groupId)1720 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
1721 {
1722     HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
1723     // During the modification process of the group, prevent other sub threads from performing other
1724     // operations on the group pointer, which may cause the modification to fail.
1725     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1726     auto groupIter = taskGroups_.find(groupId);
1727     if (groupIter == taskGroups_.end()) {
1728         return false;
1729     }
1730     TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
1731     if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
1732         HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
1733         return false;
1734     }
1735     group->groupState_ = ExecuteState::RUNNING;
1736     return true;
1737 }
1738 
1739 // ----------------------------------- SequenceRunnerManager ----------------------------------------
GetInstance()1740 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
1741 {
1742     static SequenceRunnerManager sequenceRunnerManager;
1743     return sequenceRunnerManager;
1744 }
1745 
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)1746 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
1747                                                                const std::string &name, uint32_t priority)
1748 {
1749     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1750     SequenceRunner *seqRunner = nullptr;
1751     auto iter = globalSeqRunner_.find(name);
1752     if (iter == globalSeqRunner_.end()) {
1753         seqRunner = new SequenceRunner();
1754         // refresh priority default values on first creation
1755         if (argc == 2) { // 2: The number of parameters is 2.
1756             seqRunner->priority_ = static_cast<Priority>(priority);
1757         }
1758         seqRunner->isGlobalRunner_ = true;
1759         seqRunner->seqName_ = name;
1760         globalSeqRunner_.emplace(name, seqRunner);
1761     } else {
1762         seqRunner = iter->second;
1763         if (priority != seqRunner->priority_) {
1764             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
1765             return nullptr;
1766         }
1767     }
1768     seqRunner->count_++;
1769     auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
1770     if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
1771         napi_ref gloableSeqRunnerRef = nullptr;
1772         napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
1773         seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
1774     }
1775 
1776     return seqRunner;
1777 }
1778 
TriggerGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1779 bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1780 {
1781     if (seqRunner->isGlobalRunner_) {
1782         std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1783         auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1784         if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1785             return false;
1786         }
1787         napi_reference_unref(env, iter->second, nullptr);
1788     } else {
1789         napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1790     }
1791     return true;
1792 }
1793 
DecreaseSeqCount(SequenceRunner * seqRunner)1794 uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
1795 {
1796     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1797     return --(seqRunner->count_);
1798 }
1799 
RemoveGlobalSeqRunnerRef(napi_env env,SequenceRunner * seqRunner)1800 void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
1801 {
1802     std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
1803     auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1804     if (iter != seqRunner->globalSeqRunnerRef_.end()) {
1805         napi_delete_reference(env, iter->second);
1806         seqRunner->globalSeqRunnerRef_.erase(iter);
1807     }
1808 }
1809 
RemoveSequenceRunner(const std::string & name)1810 void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
1811 {
1812     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1813     auto iter = globalSeqRunner_.find(name.c_str());
1814     if (iter != globalSeqRunner_.end()) {
1815         globalSeqRunner_.erase(iter->first);
1816     }
1817 }
1818 
GlobalSequenceRunnerDestructor(napi_env env,SequenceRunner * seqRunner)1819 void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
1820 {
1821     RemoveGlobalSeqRunnerRef(env, seqRunner);
1822     if (DecreaseSeqCount(seqRunner) == 0) {
1823         RemoveSequenceRunner(seqRunner->seqName_);
1824         TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
1825         delete seqRunner;
1826     }
1827 }
1828 
IncreaseGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1829 bool SequenceRunnerManager::IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1830 {
1831     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1832     if (seqRunner->isGlobalRunner_) {
1833         auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1834         if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1835             return false;
1836         }
1837         napi_reference_ref(env, iter->second, nullptr);
1838     } else {
1839         napi_reference_ref(env, seqRunner->seqRunnerRef_, nullptr);
1840     }
1841     return true;
1842 }
1843 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1844