1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
34 #include "helper/concurrent_helper.h"
35 #include "helper/error_helper.h"
36 #include "helper/hitrace_helper.h"
37 #include "taskpool.h"
38 #include "tools/log.h"
39 #include "worker.h"
40
41 namespace Commonlibrary::Concurrent::TaskPoolModule {
42 using namespace OHOS::JsSysModule;
43
44 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
45 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
46 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
47 static constexpr uint32_t STEP_SIZE = 2;
48 static constexpr uint32_t DEFAULT_THREADS = 3;
49 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
50 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
51 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
52 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
53 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
54 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
55 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
56
57 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
58 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
59 {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
60 {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
61 {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
62 {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
63 };
64 #endif
65
66 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()67 TaskManager& TaskManager::GetInstance()
68 {
69 static TaskManager manager;
70 return manager;
71 }
72
TaskManager()73 TaskManager::TaskManager()
74 {
75 for (size_t i = 0; i < taskQueues_.size(); i++) {
76 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
77 taskQueues_[i] = std::move(taskQueue);
78 }
79 }
80
~TaskManager()81 TaskManager::~TaskManager()
82 {
83 HILOG_INFO("taskpool:: ~TaskManager");
84 if (timer_ == nullptr) {
85 HILOG_ERROR("taskpool:: timer_ is nullptr");
86 } else {
87 uv_timer_stop(timer_);
88 ConcurrentHelper::UvHandleClose(timer_);
89 ConcurrentHelper::UvHandleClose(expandHandle_);
90 }
91
92 if (loop_ != nullptr) {
93 uv_stop(loop_);
94 }
95
96 {
97 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
98 for (auto& worker : workers_) {
99 delete worker;
100 }
101 workers_.clear();
102 }
103
104 {
105 std::lock_guard<std::mutex> lock(callbackMutex_);
106 for (auto& [_, callbackPtr] : callbackTable_) {
107 if (callbackPtr == nullptr) {
108 continue;
109 }
110 callbackPtr.reset();
111 }
112 callbackTable_.clear();
113 }
114
115 {
116 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
117 for (auto& [_, task] : tasks_) {
118 delete task;
119 task = nullptr;
120 }
121 tasks_.clear();
122 }
123 CountTraceForWorker();
124 }
125
CountTraceForWorker()126 void TaskManager::CountTraceForWorker()
127 {
128 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
129 int64_t threadNum = static_cast<int64_t>(workers_.size());
130 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
131 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
132 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
133 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
134 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
135 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
136 }
137
GetThreadInfos(napi_env env)138 napi_value TaskManager::GetThreadInfos(napi_env env)
139 {
140 napi_value threadInfos = nullptr;
141 napi_create_array(env, &threadInfos);
142 {
143 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
144 int32_t i = 0;
145 for (auto& worker : workers_) {
146 if (worker->workerEnv_ == nullptr) {
147 continue;
148 }
149 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
151
152 napi_value taskId = nullptr;
153 napi_create_array(env, &taskId);
154 int32_t j = 0;
155 {
156 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157 for (auto& currentId : worker->currentTaskId_) {
158 napi_value id = NapiHelper::CreateUint32(env, currentId);
159 napi_set_element(env, taskId, j, id);
160 j++;
161 }
162 }
163 napi_value threadInfo = nullptr;
164 napi_create_object(env, &threadInfo);
165 napi_set_named_property(env, threadInfo, "tid", tid);
166 napi_set_named_property(env, threadInfo, "priority", priority);
167 napi_set_named_property(env, threadInfo, "taskIds", taskId);
168 napi_set_element(env, threadInfos, i, threadInfo);
169 i++;
170 }
171 }
172 return threadInfos;
173 }
174
GetTaskInfos(napi_env env)175 napi_value TaskManager::GetTaskInfos(napi_env env)
176 {
177 napi_value taskInfos = nullptr;
178 napi_create_array(env, &taskInfos);
179 {
180 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
181 int32_t i = 0;
182 for (const auto& [_, task] : tasks_) {
183 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
184 task->taskState_ == ExecuteState::FINISHED) {
185 continue;
186 }
187 napi_value taskInfoValue = NapiHelper::CreateObject(env);
188 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
189 napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
190 napi_value name = nullptr;
191 napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
192 napi_set_named_property(env, taskInfoValue, "name", name);
193 ExecuteState state = task->taskState_;
194 uint64_t duration = 0;
195 if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
196 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
197 }
198 napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
199 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
200 napi_set_named_property(env, taskInfoValue, "state", stateValue);
201 napi_value durationValue = NapiHelper::CreateUint32(env, duration);
202 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
203 napi_set_element(env, taskInfos, i, taskInfoValue);
204 i++;
205 }
206 }
207 return taskInfos;
208 }
209
UpdateExecutedInfo(uint64_t duration)210 void TaskManager::UpdateExecutedInfo(uint64_t duration)
211 {
212 totalExecTime_ += duration;
213 totalExecCount_++;
214 }
215
ComputeSuitableThreadNum()216 uint32_t TaskManager::ComputeSuitableThreadNum()
217 {
218 uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
219 return targetNum;
220 }
221
ComputeSuitableIdleNum()222 uint32_t TaskManager::ComputeSuitableIdleNum()
223 {
224 uint32_t targetNum = 0;
225 if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
226 // this branch is used for avoiding time-consuming tasks that may block the taskpool
227 targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
228 } else if (totalExecCount_ != 0) {
229 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
230 uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
231 targetNum = std::min(result, GetNonIdleTaskNum());
232 }
233 return targetNum;
234 }
235
CheckForBlockedWorkers()236 void TaskManager::CheckForBlockedWorkers()
237 {
238 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
239 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
240 // else we will choose the longer one
241 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
242 bool needChecking = false;
243 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
244 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
245 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
246 auto worker = *iter;
247 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248 // if the worker is executing the longTask, we will not do the check
249 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
252 continue;
253 }
254 // When executing the promise task, the worker state may not be updated and will be
255 // marked as 'BLOCKED', so we should exclude this situation.
256 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
257 // the task like I/O in uv threads, we should also exclude this situation.
258 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
260 if (!workerEngine->HasWaitingRequest()) {
261 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
262 } else {
263 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
265 }
266 continue;
267 }
268
269 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270 // If the current worker has a longTask and is not executing, we will only interrupt it.
271 if (worker->HasLongTask()) {
272 continue;
273 }
274 needChecking = true;
275 idleWorkers_.erase(worker);
276 timeoutWorkers_.insert(worker);
277 }
278 // should trigger the check when we have marked and removed workers
279 if (UNLIKELY(needChecking)) {
280 TryExpand();
281 }
282 }
283
TryTriggerExpand()284 void TaskManager::TryTriggerExpand()
285 {
286 // post the signal to notify the monitor thread to expand
287 if (UNLIKELY(!isHandleInited_)) {
288 NotifyExecuteTask();
289 needChecking_ = true;
290 HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
291 return;
292 }
293 uv_async_send(expandHandle_);
294 }
295
296 #if defined(OHOS_PLATFORM)
297 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)298 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
299 {
300 char path[128]; // 128: buffer for path
301 pid_t pid = getpid();
302 ssize_t bytesLen = -1;
303 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
304 if (ret < 0) {
305 HILOG_ERROR("snprintf_s failed");
306 return false;
307 }
308 int fd = open(path, O_RDONLY | O_NONBLOCK);
309 if (UNLIKELY(fd == -1)) {
310 return false;
311 }
312 bytesLen = read(fd, buf, size - 1);
313 close(fd);
314 if (bytesLen <= 0) {
315 HILOG_ERROR("taskpool:: failed to read %{public}s", path);
316 return false;
317 }
318 buf[bytesLen] = '\0';
319 return true;
320 }
321
GetIdleWorkers()322 uint32_t TaskManager::GetIdleWorkers()
323 {
324 char buf[4096]; // 4096: buffer for thread info
325 uint32_t idleCount = 0;
326 std::unordered_set<pid_t> tids {};
327 {
328 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
329 for (auto& worker : idleWorkers_) {
330 #if defined(ENABLE_TASKPOOL_FFRT)
331 if (worker->ffrtTaskHandle_ != nullptr) {
332 if (worker->GetWaitTime() > 0) {
333 idleCount++;
334 }
335 continue;
336 }
337 #endif
338 tids.emplace(worker->tid_);
339 }
340 }
341 // The ffrt thread does not read thread info
342 for (auto tid : tids) {
343 if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
344 continue;
345 }
346 char state;
347 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
348 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
349 return 0;
350 }
351 if (state == 'S') {
352 idleCount++;
353 }
354 }
355 return idleCount;
356 }
357
GetIdleWorkersList(uint32_t step)358 void TaskManager::GetIdleWorkersList(uint32_t step)
359 {
360 char buf[4096]; // 4096: buffer for thread info
361 for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363 if (worker->ffrtTaskHandle_ != nullptr) {
364 uint64_t workerWaitTime = worker->GetWaitTime();
365 bool isWorkerLoopActive = worker->IsLoopActive();
366 if (workerWaitTime == 0) {
367 continue;
368 }
369 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
370 std::chrono::steady_clock::now().time_since_epoch()).count());
371 if (!isWorkerLoopActive) {
372 freeList_.emplace_back(worker);
373 } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
374 freeList_.emplace_back(worker);
375 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
376 } else {
377 HILOG_INFO("taskpool:: worker uv alive, and will be free in 2 intervals if not wake.");
378 }
379 continue;
380 }
381 #endif
382 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
383 continue;
384 }
385 char state;
386 uint64_t utime;
387 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
388 &state, sizeof(state), &utime) != 2) { // 2: state and utime
389 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
390 return;
391 }
392 if (state != 'S' || utime != worker->lastCpuTime_) {
393 worker->idleCount_ = 0;
394 worker->lastCpuTime_ = utime;
395 continue;
396 }
397 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
398 freeList_.emplace_back(worker);
399 }
400 }
401 }
402
TriggerShrink(uint32_t step)403 void TaskManager::TriggerShrink(uint32_t step)
404 {
405 GetIdleWorkersList(step);
406 step = std::min(step, static_cast<uint32_t>(freeList_.size()));
407 uint32_t count = 0;
408 for (size_t i = 0; i < freeList_.size(); i++) {
409 auto worker = freeList_[i];
410 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
411 continue;
412 }
413 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
414 if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
415 continue;
416 }
417 idleWorkers_.erase(worker);
418 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
419 uv_async_send(worker->clearWorkerSignal_);
420 if (++count == step) {
421 break;
422 }
423 }
424 freeList_.clear();
425 }
426 #else
GetIdleWorkers()427 uint32_t TaskManager::GetIdleWorkers()
428 {
429 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
430 return idleWorkers_.size();
431 }
432
TriggerShrink(uint32_t step)433 void TaskManager::TriggerShrink(uint32_t step)
434 {
435 for (uint32_t i = 0; i < step; i++) {
436 // try to free the worker that idle time meets the requirement
437 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
438 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
439 return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 && !worker->HasLongTask();
440 });
441 // remove it from all sets
442 if (iter != idleWorkers_.end()) {
443 auto worker = *iter;
444 idleWorkers_.erase(worker);
445 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
446 uv_async_send(worker->clearWorkerSignal_);
447 }
448 }
449 }
450 #endif
451
NotifyShrink(uint32_t targetNum)452 void TaskManager::NotifyShrink(uint32_t targetNum)
453 {
454 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
455 uint32_t workerCount = workers_.size();
456 uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
457 if (minThread == 0) {
458 HILOG_INFO("taskpool:: the system now is under low memory");
459 }
460 if (workerCount > minThread && workerCount > targetNum) {
461 targetNum = std::max(minThread, targetNum);
462 uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
463 TriggerShrink(step);
464 }
465 // remove all timeout workers
466 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
467 if (workers_.find(*iter) == workers_.end()) {
468 HILOG_WARN("taskpool:: current worker maybe release");
469 iter = timeoutWorkers_.erase(iter);
470 } else if ((*iter)->runningCount_ == 0) {
471 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
472 uv_async_send((*iter)->clearWorkerSignal_);
473 timeoutWorkers_.erase(iter++);
474 return;
475 } else {
476 iter++;
477 }
478 }
479 uint32_t idleNum = idleWorkers_.size();
480 // System memory state is moderate and the worker has exeuted tasks, we will try to release it
481 if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
482 auto worker = *(idleWorkers_.begin());
483 if (worker == nullptr || worker->clearWorkerSignal_ == nullptr) {
484 return;
485 }
486 if (worker->HasLongTask()) { // worker that has longTask should not be released
487 return;
488 }
489 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
490 TriggerShrink(DEFAULT_MIN_THREADS);
491 return;
492 }
493 }
494
495 // Create a worker for performance
496 if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
497 CreateWorkers(hostEnv_);
498 }
499 // stop the timer
500 if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
501 suspend_ = true;
502 uv_timer_stop(timer_);
503 HILOG_DEBUG("taskpool:: timer will be suspended");
504 }
505 }
506
TriggerLoadBalance(const uv_timer_t * req)507 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
508 {
509 TaskManager& taskManager = TaskManager::GetInstance();
510 taskManager.CheckForBlockedWorkers();
511 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
512 taskManager.NotifyShrink(targetNum);
513 taskManager.CountTraceForWorker();
514 }
515
TryExpand()516 void TaskManager::TryExpand()
517 {
518 // dispatch task in the TaskPoolManager thread
519 NotifyExecuteTask();
520 // do not trigger when there are more idleWorkers than tasks
521 uint32_t idleNum = GetIdleWorkers();
522 if (idleNum > GetNonIdleTaskNum()) {
523 return;
524 }
525 needChecking_ = false; // do not need to check
526 uint32_t targetNum = ComputeSuitableIdleNum();
527 uint32_t workerCount = 0;
528 uint32_t idleCount = 0;
529 uint32_t timeoutWorkers = 0;
530 {
531 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
532 idleCount = idleWorkers_.size();
533 workerCount = workers_.size();
534 timeoutWorkers = timeoutWorkers_.size();
535 }
536 uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
537 maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
538 if (workerCount < maxThreads && idleCount < targetNum) {
539 uint32_t step = std::min(maxThreads, targetNum) - idleCount;
540 // Prevent the total number of expanded threads from exceeding maxThreads
541 if (step + workerCount > maxThreads) {
542 step = maxThreads - workerCount;
543 }
544 CreateWorkers(hostEnv_, step);
545 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
546 maxThreads, step, GetThreadNum());
547 }
548 if (UNLIKELY(suspend_)) {
549 suspend_ = false;
550 uv_timer_again(timer_);
551 }
552 }
553
NotifyExpand(const uv_async_t * req)554 void TaskManager::NotifyExpand(const uv_async_t* req)
555 {
556 TaskManager& taskManager = TaskManager::GetInstance();
557 taskManager.TryExpand();
558 }
559
RunTaskManager()560 void TaskManager::RunTaskManager()
561 {
562 loop_ = uv_loop_new();
563 if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
564 HILOG_FATAL("taskpool:: new loop failed.");
565 return;
566 }
567 ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
568 timer_ = new uv_timer_t;
569 uv_timer_init(loop_, timer_);
570 uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
571 isHandleInited_ = true;
572 #if defined IOS_PLATFORM || defined MAC_PLATFORM
573 pthread_setname_np("OS_TaskManager");
574 #else
575 pthread_setname_np(pthread_self(), "OS_TaskManager");
576 #endif
577 if (UNLIKELY(needChecking_)) {
578 needChecking_ = false;
579 uv_async_send(expandHandle_);
580 }
581 uv_run(loop_, UV_RUN_DEFAULT);
582 if (loop_ != nullptr) {
583 uv_loop_delete(loop_);
584 }
585 }
586
CancelTask(napi_env env,uint64_t taskId)587 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
588 {
589 // 1. Cannot find taskInfo by executeId, throw error
590 // 2. Find executing taskInfo, skip it
591 // 3. Find waiting taskInfo, cancel it
592 // 4. Find canceled taskInfo, skip it
593 std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
594 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
595 HITRACE_HELPER_METER_NAME(strTrace);
596 Task* task = GetTask(taskId);
597 if (task == nullptr) {
598 std::string errMsg = "taskpool:: the task may not exist";
599 HILOG_ERROR("%{public}s", errMsg.c_str());
600 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
601 return;
602 }
603 if (task->taskState_ == ExecuteState::CANCELED) {
604 HILOG_DEBUG("taskpool:: task has been canceled");
605 return;
606 }
607 if (task->IsGroupCommonTask()) {
608 // when task is a group common task, still check the state
609 if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
610 task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
611 std::string errMsg = "taskpool:: task is not executed or has been executed";
612 HILOG_ERROR("%{public}s", errMsg.c_str());
613 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
614 return;
615 }
616 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
617 if (taskGroup == nullptr) {
618 return;
619 }
620 return taskGroup->CancelGroupTask(env, task->taskId_);
621 }
622
623 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
624 if (task->IsPeriodicTask()) {
625 napi_reference_unref(env, task->taskRef_, nullptr);
626 task->CancelPendingTask(env);
627 uv_timer_stop(task->timer_);
628 ConcurrentHelper::UvHandleClose(task->timer_);
629 return;
630 } else if (task->IsSeqRunnerTask()) {
631 CancelSeqRunnerTask(env, task);
632 return;
633 }
634 if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
635 task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
636 task->taskState_ == ExecuteState::ENDING) {
637 std::string errMsg = "taskpool:: task is not executed or has been executed";
638 HILOG_ERROR("%{public}s", errMsg.c_str());
639 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
640 return;
641 }
642
643 task->ClearDelayedTimers();
644 ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
645 task->CancelPendingTask(env);
646 if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
647 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
648 task->DecreaseTaskRefCount();
649 DecreaseRefCount(env, task->taskId_);
650 EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
651 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
652 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
653 napi_reference_unref(env, task->taskRef_, nullptr);
654 delete task->currentTaskInfo_;
655 task->currentTaskInfo_ = nullptr;
656 }
657 }
658
CancelSeqRunnerTask(napi_env env,Task * task)659 void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
660 {
661 if (task->taskState_ == ExecuteState::FINISHED) {
662 std::string errMsg = "taskpool:: sequenceRunner task has been executed";
663 HILOG_ERROR("%{public}s", errMsg.c_str());
664 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
665 } else {
666 task->taskState_ = ExecuteState::CANCELED;
667 }
668 }
669
NotifyWorkerIdle(Worker * worker)670 void TaskManager::NotifyWorkerIdle(Worker* worker)
671 {
672 {
673 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
674 if (worker->state_ == WorkerState::BLOCKED) {
675 return;
676 }
677 idleWorkers_.insert(worker);
678 }
679 if (GetTaskNum() != 0) {
680 NotifyExecuteTask();
681 }
682 CountTraceForWorker();
683 }
684
NotifyWorkerCreated(Worker * worker)685 void TaskManager::NotifyWorkerCreated(Worker* worker)
686 {
687 NotifyWorkerIdle(worker);
688 }
689
NotifyWorkerAdded(Worker * worker)690 void TaskManager::NotifyWorkerAdded(Worker* worker)
691 {
692 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
693 workers_.insert(worker);
694 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
695 }
696
NotifyWorkerRunning(Worker * worker)697 void TaskManager::NotifyWorkerRunning(Worker* worker)
698 {
699 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
700 idleWorkers_.erase(worker);
701 CountTraceForWorker();
702 }
703
GetRunningWorkers()704 uint32_t TaskManager::GetRunningWorkers()
705 {
706 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
707 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
708 return worker->runningCount_ != 0;
709 });
710 }
711
GetTimeoutWorkers()712 uint32_t TaskManager::GetTimeoutWorkers()
713 {
714 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
715 return timeoutWorkers_.size();
716 }
717
GetTaskNum()718 uint32_t TaskManager::GetTaskNum()
719 {
720 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
721 uint32_t sum = 0;
722 for (const auto& elements : taskQueues_) {
723 sum += elements->GetTaskNum();
724 }
725 return sum;
726 }
727
GetNonIdleTaskNum()728 uint32_t TaskManager::GetNonIdleTaskNum()
729 {
730 return nonIdleTaskNum_;
731 }
732
IncreaseNumIfNoIdle(Priority priority)733 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
734 {
735 if (priority != Priority::IDLE) {
736 ++nonIdleTaskNum_;
737 }
738 }
739
DecreaseNumIfNoIdle(Priority priority)740 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
741 {
742 if (priority != Priority::IDLE) {
743 --nonIdleTaskNum_;
744 }
745 }
746
GetThreadNum()747 uint32_t TaskManager::GetThreadNum()
748 {
749 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
750 return workers_.size();
751 }
752
EnqueueTaskId(uint64_t taskId,Priority priority)753 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
754 {
755 {
756 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
757 IncreaseNumIfNoIdle(priority);
758 taskQueues_[priority]->EnqueueTaskId(taskId);
759 }
760 TryTriggerExpand();
761 Task* task = GetTask(taskId);
762 if (task == nullptr) {
763 HILOG_FATAL("taskpool:: task is nullptr");
764 return;
765 }
766 task->IncreaseTaskRefCount();
767 if (task->onEnqueuedCallBackInfo_ != nullptr) {
768 task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
769 }
770 }
771
EraseWaitingTaskId(uint64_t taskId,Priority priority)772 void TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
773 {
774 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
775 if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
776 HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
777 }
778 }
779
DequeueTaskId()780 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
781 {
782 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
783 auto& highTaskQueue = taskQueues_[Priority::HIGH];
784 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
785 highPrioExecuteCount_++;
786 return GetTaskByPriority(highTaskQueue, Priority::HIGH);
787 }
788 highPrioExecuteCount_ = 0;
789
790 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
791 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
792 mediumPrioExecuteCount_++;
793 return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
794 }
795 mediumPrioExecuteCount_ = 0;
796
797 auto& lowTaskQueue = taskQueues_[Priority::LOW];
798 if (!lowTaskQueue->IsEmpty()) {
799 return GetTaskByPriority(lowTaskQueue, Priority::LOW);
800 }
801
802 auto& idleTaskQueue = taskQueues_[Priority::IDLE];
803 if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
804 return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
805 }
806 return std::make_pair(0, Priority::LOW);
807 }
808
IsChooseIdle()809 bool TaskManager::IsChooseIdle()
810 {
811 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
812 for (auto& worker : workers_) {
813 if (worker->state_ == WorkerState::IDLE) {
814 // If worker->state_ is WorkerState::IDLE, it means that the worker is free
815 continue;
816 }
817 // If there is a worker running a task, do not take the idle task.
818 return false;
819 }
820 // Only when all workers are free, will idle task be taken.
821 return true;
822 }
823
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)824 std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
825 Priority priority)
826 {
827 uint64_t taskId = taskQueue->DequeueTaskId();
828 if (IsDependendByTaskId(taskId)) {
829 EnqueuePendingTaskInfo(taskId, priority);
830 return std::make_pair(0, priority);
831 }
832 DecreaseNumIfNoIdle(priority);
833 return std::make_pair(taskId, priority);
834 }
835
NotifyExecuteTask()836 void TaskManager::NotifyExecuteTask()
837 {
838 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
839 if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
840 // When there are only idle tasks and workers executing them, it is not triggered
841 return;
842 }
843
844 for (auto& worker : idleWorkers_) {
845 worker->NotifyExecuteTask();
846 }
847 }
848
InitTaskManager(napi_env env)849 void TaskManager::InitTaskManager(napi_env env)
850 {
851 HITRACE_HELPER_METER_NAME("InitTaskManager");
852 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
853 #if defined(ENABLE_TASKPOOL_FFRT)
854 globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
855 if (!globalEnableFfrtFlag_) {
856 UpdateSystemAppFlag();
857 if (IsSystemApp()) {
858 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
859 }
860 }
861 if (EnableFfrt()) {
862 HILOG_INFO("taskpool:: apps use ffrt");
863 } else {
864 HILOG_INFO("taskpool:: apps do not use ffrt");
865 }
866 #endif
867 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
868 mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
869 OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
870 #endif
871 auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
872 if (mainThreadEngine == nullptr) {
873 HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
874 return;
875 }
876 hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
877 // Add a reserved thread for taskpool
878 CreateWorkers(hostEnv_);
879 // Create a timer to manage worker threads
880 std::thread workerManager([this] {this->RunTaskManager();});
881 workerManager.detach();
882 }
883 }
884
CreateWorkers(napi_env env,uint32_t num)885 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
886 {
887 HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
888 for (uint32_t i = 0; i < num; i++) {
889 auto worker = Worker::WorkerConstructor(env);
890 NotifyWorkerAdded(worker);
891 }
892 CountTraceForWorker();
893 }
894
RemoveWorker(Worker * worker)895 void TaskManager::RemoveWorker(Worker* worker)
896 {
897 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
898 idleWorkers_.erase(worker);
899 timeoutWorkers_.erase(worker);
900 workers_.erase(worker);
901 }
902
RestoreWorker(Worker * worker)903 void TaskManager::RestoreWorker(Worker* worker)
904 {
905 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
906 if (UNLIKELY(suspend_)) {
907 suspend_ = false;
908 uv_timer_again(timer_);
909 }
910 if (worker->state_ == WorkerState::BLOCKED) {
911 // since the worker is blocked, we should add it to the timeout set
912 timeoutWorkers_.insert(worker);
913 return;
914 }
915 // Since the worker may be executing some tasks in IO thread, we should add it to the
916 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
917 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
918 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
919 if (GetTaskNum() != 0) {
920 NotifyExecuteTask();
921 }
922 }
923
924 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)925 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
926 {
927 std::lock_guard<std::mutex> lock(callbackMutex_);
928 callbackTable_[taskId] = callbackInfo;
929 }
930
GetCallbackInfo(uint64_t taskId)931 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
932 {
933 std::lock_guard<std::mutex> lock(callbackMutex_);
934 auto iter = callbackTable_.find(taskId);
935 if (iter == callbackTable_.end() || iter->second == nullptr) {
936 HILOG_ERROR("taskpool:: the callback does not exist");
937 return nullptr;
938 }
939 return iter->second;
940 }
941
IncreaseRefCount(uint64_t taskId)942 void TaskManager::IncreaseRefCount(uint64_t taskId)
943 {
944 if (taskId == 0) { // do not support func
945 return;
946 }
947 std::lock_guard<std::mutex> lock(callbackMutex_);
948 auto iter = callbackTable_.find(taskId);
949 if (iter == callbackTable_.end() || iter->second == nullptr) {
950 return;
951 }
952 iter->second->refCount++;
953 }
954
DecreaseRefCount(napi_env env,uint64_t taskId)955 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
956 {
957 if (taskId == 0) { // do not support func
958 return;
959 }
960 std::lock_guard<std::mutex> lock(callbackMutex_);
961 auto iter = callbackTable_.find(taskId);
962 if (iter == callbackTable_.end() || iter->second == nullptr) {
963 return;
964 }
965
966 auto task = reinterpret_cast<Task*>(taskId);
967 if (!task->IsValid()) {
968 callbackTable_.erase(iter);
969 return;
970 }
971
972 iter->second->refCount--;
973 if (iter->second->refCount == 0) {
974 callbackTable_.erase(iter);
975 }
976 }
977
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)978 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
979 {
980 std::lock_guard<std::mutex> lock(callbackMutex_);
981 callbackInfo->worker = nullptr;
982 }
983
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)984 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
985 {
986 HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
987 std::lock_guard<std::mutex> lock(callbackMutex_);
988 auto iter = callbackTable_.find(task->taskId_);
989 if (iter == callbackTable_.end() || iter->second == nullptr) {
990 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
991 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
992 delete resultInfo;
993 return nullptr;
994 }
995 Worker* worker = static_cast<Worker*>(task->worker_);
996 worker->Enqueue(task->env_, resultInfo);
997 auto callbackInfo = iter->second;
998 callbackInfo->refCount++;
999 callbackInfo->worker = worker;
1000 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1001 workerEngine->IncreaseListeningCounter();
1002 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1003 if (task->IsMainThreadTask()) {
1004 HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1005 auto onCallbackTask = [callbackInfo]() {
1006 TaskPool::ExecuteCallbackTask(callbackInfo.get());
1007 };
1008 TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1009 } else {
1010 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1011 uv_async_send(callbackInfo->onCallbackSignal);
1012 }
1013 #else
1014 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1015 uv_async_send(callbackInfo->onCallbackSignal);
1016 #endif
1017 return nullptr;
1018 }
1019
GetMessageQueue(const uv_async_t * req)1020 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1021 {
1022 std::lock_guard<std::mutex> lock(callbackMutex_);
1023 auto info = static_cast<CallbackInfo*>(req->data);
1024 if (info == nullptr || info->worker == nullptr) {
1025 HILOG_WARN("taskpool:: info or worker is nullptr");
1026 return nullptr;
1027 }
1028 auto worker = info->worker;
1029 MsgQueue* queue = nullptr;
1030 worker->Dequeue(info->hostEnv, queue);
1031 return queue;
1032 }
1033
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1034 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1035 {
1036 std::lock_guard<std::mutex> lock(callbackMutex_);
1037 if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1038 HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1039 return nullptr;
1040 }
1041 auto worker = callbackInfo->worker;
1042 MsgQueue* queue = nullptr;
1043 worker->Dequeue(callbackInfo->hostEnv, queue);
1044 return queue;
1045 }
1046 // ---------------------------------- SendData ---------------------------------------
1047
NotifyDependencyTaskInfo(uint64_t taskId)1048 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
1049 {
1050 HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1051 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1052 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1053 auto iter = dependentTaskInfos_.find(taskId);
1054 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1055 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1056 return;
1057 }
1058 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1059 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1060 RemoveDependencyById(taskId, *taskIdIter);
1061 taskIdIter = iter->second.erase(taskIdIter);
1062 if (taskInfo.first != 0) {
1063 EnqueueTaskId(taskInfo.first, taskInfo.second);
1064 }
1065 }
1066 }
1067
RemoveDependencyById(uint64_t dependentTaskId,uint64_t taskId)1068 void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
1069 {
1070 HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1071 // remove dependency after task execute
1072 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1073 auto dependTaskIter = dependTaskInfos_.find(taskId);
1074 if (dependTaskIter != dependTaskInfos_.end()) {
1075 auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1076 if (dependTaskInnerIter != dependTaskIter->second.end()) {
1077 dependTaskIter->second.erase(dependTaskInnerIter);
1078 }
1079 }
1080 }
1081
IsDependendByTaskId(uint64_t taskId)1082 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
1083 {
1084 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1085 auto iter = dependTaskInfos_.find(taskId);
1086 if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1087 return false;
1088 }
1089 return true;
1090 }
1091
IsDependentByTaskId(uint64_t dependentTaskId)1092 bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
1093 {
1094 std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1095 auto iter = dependentTaskInfos_.find(dependentTaskId);
1096 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1097 return false;
1098 }
1099 return true;
1100 }
1101
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)1102 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
1103 {
1104 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1105 StoreDependentTaskInfo(taskIdSet, taskId);
1106 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1107 auto iter = dependTaskInfos_.find(taskId);
1108 if (iter == dependTaskInfos_.end()) {
1109 for (const auto& dependentId : taskIdSet) {
1110 auto idIter = dependTaskInfos_.find(dependentId);
1111 if (idIter == dependTaskInfos_.end()) {
1112 continue;
1113 }
1114 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1115 return false;
1116 }
1117 }
1118 dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1119 return true;
1120 }
1121
1122 for (const auto& dependentId : iter->second) {
1123 auto idIter = dependTaskInfos_.find(dependentId);
1124 if (idIter == dependTaskInfos_.end()) {
1125 continue;
1126 }
1127 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1128 return false;
1129 }
1130 }
1131 iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1132 return true;
1133 }
1134
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)1135 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
1136 {
1137 for (const auto& id : idSet) {
1138 if (id == taskId) {
1139 return false;
1140 }
1141 auto iter = dependentIdSet.find(id);
1142 if (iter != dependentIdSet.end()) {
1143 continue;
1144 }
1145 auto dIter = dependTaskInfos_.find(id);
1146 if (dIter == dependTaskInfos_.end()) {
1147 continue;
1148 }
1149 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1150 return false;
1151 }
1152 }
1153 return true;
1154 }
1155
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)1156 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
1157 {
1158 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1159 RemoveDependentTaskInfo(dependentId, taskId);
1160 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1161 auto iter = dependTaskInfos_.find(taskId);
1162 if (iter == dependTaskInfos_.end()) {
1163 return false;
1164 }
1165 auto dependIter = iter->second.find(dependentId);
1166 if (dependIter == iter->second.end()) {
1167 return false;
1168 }
1169 iter->second.erase(dependIter);
1170 return true;
1171 }
1172
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)1173 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
1174 {
1175 if (taskId == 0) {
1176 return;
1177 }
1178 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1179 pendingTaskInfos_.emplace(taskId, priority);
1180 }
1181
DequeuePendingTaskInfo(uint64_t taskId)1182 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
1183 {
1184 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1185 if (pendingTaskInfos_.empty()) {
1186 return std::make_pair(0, Priority::DEFAULT);
1187 }
1188 std::pair<uint64_t, Priority> result;
1189 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1190 if (it->first == taskId) {
1191 result = std::make_pair(it->first, it->second);
1192 it = pendingTaskInfos_.erase(it);
1193 break;
1194 }
1195 }
1196 return result;
1197 }
1198
RemovePendingTaskInfo(uint64_t taskId)1199 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
1200 {
1201 HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1202 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1203 pendingTaskInfos_.erase(taskId);
1204 }
1205
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)1206 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
1207 {
1208 HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1209 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1210 for (const auto& id : dependentTaskIdSet) {
1211 auto iter = dependentTaskInfos_.find(id);
1212 if (iter == dependentTaskInfos_.end()) {
1213 std::set<uint64_t> set{taskId};
1214 dependentTaskInfos_.emplace(id, std::move(set));
1215 } else {
1216 iter->second.emplace(taskId);
1217 }
1218 }
1219 }
1220
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)1221 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
1222 {
1223 HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1224 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1225 auto iter = dependentTaskInfos_.find(dependentTaskId);
1226 if (iter == dependentTaskInfos_.end()) {
1227 return;
1228 }
1229 auto taskIter = iter->second.find(taskId);
1230 if (taskIter == iter->second.end()) {
1231 return;
1232 }
1233 iter->second.erase(taskIter);
1234 }
1235
GetTaskDependInfoToString(uint64_t taskId)1236 std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
1237 {
1238 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1239 std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1240 auto iter = dependTaskInfos_.find(taskId);
1241 if (iter != dependTaskInfos_.end()) {
1242 for (const auto& id : iter->second) {
1243 str += " " + std::to_string(id);
1244 }
1245 }
1246 return str;
1247 }
1248
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1249 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1250 {
1251 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1252 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1253 auto iter = taskDurationInfos_.find(taskId);
1254 if (iter == taskDurationInfos_.end()) {
1255 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1256 taskDurationInfos_.emplace(taskId, std::move(durationData));
1257 } else {
1258 if (totalDuration != 0) {
1259 iter->second.first = totalDuration;
1260 }
1261 if (cpuDuration != 0) {
1262 iter->second.second = cpuDuration;
1263 }
1264 }
1265 }
1266
GetTaskDuration(uint64_t taskId,std::string durationType)1267 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
1268 {
1269 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1270 auto iter = taskDurationInfos_.find(taskId);
1271 if (iter == taskDurationInfos_.end()) {
1272 return 0;
1273 }
1274 if (durationType == TASK_TOTAL_TIME) {
1275 return iter->second.first;
1276 } else if (durationType == TASK_CPU_TIME) {
1277 return iter->second.second;
1278 } else if (iter->second.first == 0) {
1279 return 0;
1280 }
1281 return iter->second.first - iter->second.second;
1282 }
1283
GetTaskName(uint64_t taskId)1284 std::string TaskManager::GetTaskName(uint64_t taskId)
1285 {
1286 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1287 auto iter = tasks_.find(taskId);
1288 if (iter == tasks_.end()) {
1289 return "";
1290 }
1291 return iter->second->name_;
1292 }
1293
RemoveTaskDuration(uint64_t taskId)1294 void TaskManager::RemoveTaskDuration(uint64_t taskId)
1295 {
1296 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1297 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1298 auto iter = taskDurationInfos_.find(taskId);
1299 if (iter != taskDurationInfos_.end()) {
1300 taskDurationInfos_.erase(iter);
1301 }
1302 }
1303
StoreLongTaskInfo(uint64_t taskId,Worker * worker)1304 void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1305 {
1306 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1307 longTasksMap_.emplace(taskId, worker);
1308 }
1309
RemoveLongTaskInfo(uint64_t taskId)1310 void TaskManager::RemoveLongTaskInfo(uint64_t taskId)
1311 {
1312 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1313 longTasksMap_.erase(taskId);
1314 }
1315
GetLongTaskInfo(uint64_t taskId)1316 Worker* TaskManager::GetLongTaskInfo(uint64_t taskId)
1317 {
1318 std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1319 auto iter = longTasksMap_.find(taskId);
1320 return iter != longTasksMap_.end() ? iter->second : nullptr;
1321 }
1322
TerminateTask(uint64_t taskId)1323 void TaskManager::TerminateTask(uint64_t taskId)
1324 {
1325 HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1326 auto worker = GetLongTaskInfo(taskId);
1327 if (UNLIKELY(worker == nullptr)) {
1328 return;
1329 }
1330 worker->TerminateTask(taskId);
1331 RemoveLongTaskInfo(taskId);
1332 }
1333
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1334 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1335 {
1336 uint64_t taskId = task->taskId_;
1337 if (shouldDeleteTask) {
1338 RemoveTask(taskId);
1339 }
1340 if (task->onResultSignal_ != nullptr) {
1341 if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
1342 ConcurrentHelper::UvHandleClose(task->onResultSignal_);
1343 } else {
1344 delete task->onResultSignal_;
1345 }
1346 task->onResultSignal_ = nullptr;
1347 }
1348
1349 if (task->currentTaskInfo_ != nullptr) {
1350 delete task->currentTaskInfo_;
1351 task->currentTaskInfo_ = nullptr;
1352 }
1353
1354 task->CancelPendingTask(env);
1355
1356 task->ClearDelayedTimers();
1357
1358 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1359 return;
1360 }
1361 DecreaseRefCount(env, taskId);
1362 RemoveTaskDuration(taskId);
1363 RemovePendingTaskInfo(taskId);
1364 ReleaseCallBackInfo(task);
1365 {
1366 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1367 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1368 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1369 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1370 } else {
1371 ++dependentTaskIter;
1372 }
1373 }
1374 }
1375 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1376 auto dependTaskIter = dependTaskInfos_.find(taskId);
1377 if (dependTaskIter != dependTaskInfos_.end()) {
1378 dependTaskInfos_.erase(dependTaskIter);
1379 }
1380 }
1381
ReleaseCallBackInfo(Task * task)1382 void TaskManager::ReleaseCallBackInfo(Task* task)
1383 {
1384 HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1385 if (task->onEnqueuedCallBackInfo_ != nullptr) {
1386 delete task->onEnqueuedCallBackInfo_;
1387 task->onEnqueuedCallBackInfo_ = nullptr;
1388 }
1389
1390 if (task->onStartExecutionCallBackInfo_ != nullptr) {
1391 delete task->onStartExecutionCallBackInfo_;
1392 task->onStartExecutionCallBackInfo_ = nullptr;
1393 }
1394
1395 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1396 delete task->onExecutionFailedCallBackInfo_;
1397 task->onExecutionFailedCallBackInfo_ = nullptr;
1398 }
1399
1400 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1401 delete task->onExecutionSucceededCallBackInfo_;
1402 task->onExecutionSucceededCallBackInfo_ = nullptr;
1403 }
1404
1405 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1406 if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1407 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1408 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1409 } else {
1410 delete task->onStartExecutionSignal_;
1411 }
1412 task->onStartExecutionSignal_ = nullptr;
1413 }
1414 #else
1415 if (task->onStartExecutionSignal_ != nullptr) {
1416 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1417 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1418 } else {
1419 delete task->onStartExecutionSignal_;
1420 }
1421 task->onStartExecutionSignal_ = nullptr;
1422 }
1423 #endif
1424 }
1425
StoreTask(uint64_t taskId,Task * task)1426 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1427 {
1428 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1429 tasks_.emplace(taskId, task);
1430 }
1431
RemoveTask(uint64_t taskId)1432 void TaskManager::RemoveTask(uint64_t taskId)
1433 {
1434 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1435 tasks_.erase(taskId);
1436 }
1437
GetTask(uint64_t taskId)1438 Task* TaskManager::GetTask(uint64_t taskId)
1439 {
1440 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1441 auto iter = tasks_.find(taskId);
1442 if (iter == tasks_.end()) {
1443 return nullptr;
1444 }
1445 return iter->second;
1446 }
1447
1448 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1449 void TaskManager::UpdateSystemAppFlag()
1450 {
1451 auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1452 if (abilityManager == nullptr) {
1453 HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1454 return;
1455 }
1456 auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1457 if (bundleObj == nullptr) {
1458 HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1459 return;
1460 }
1461 auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1462 if (bundleMgr == nullptr) {
1463 HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1464 return;
1465 }
1466 OHOS::AppExecFwk::BundleInfo bundleInfo;
1467 if (bundleMgr->GetBundleInfoForSelf(
1468 static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1469 != OHOS::ERR_OK) {
1470 HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1471 return;
1472 }
1473 isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1474 }
1475 #endif
1476
1477 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1478 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1479 {
1480 return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1481 }
1482 #endif
1483
CheckTask(uint64_t taskId)1484 bool TaskManager::CheckTask(uint64_t taskId)
1485 {
1486 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1487 auto item = tasks_.find(taskId);
1488 return item != tasks_.end();
1489 }
1490
1491 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1492 TaskGroupManager& TaskGroupManager::GetInstance()
1493 {
1494 static TaskGroupManager groupManager;
1495 return groupManager;
1496 }
1497
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1498 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1499 {
1500 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1501 auto groupIter = taskGroups_.find(groupId);
1502 if (groupIter == taskGroups_.end()) {
1503 HILOG_DEBUG("taskpool:: taskGroup has been released");
1504 return;
1505 }
1506 auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1507 if (taskGroup == nullptr) {
1508 HILOG_ERROR("taskpool:: taskGroup is null");
1509 return;
1510 }
1511 taskGroup->taskRefs_.push_back(taskRef);
1512 taskGroup->taskNum_++;
1513 taskGroup->taskIds_.push_back(taskId);
1514 }
1515
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1516 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1517 {
1518 HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
1519 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1520 {
1521 std::lock_guard<RECURSIVE_MUTEX> lock(group->taskGroupMutex_);
1522 if (group->isValid_) {
1523 for (uint64_t taskId : group->taskIds_) {
1524 Task* task = TaskManager::GetInstance().GetTask(taskId);
1525 if (task == nullptr || !task->IsValid()) {
1526 continue;
1527 }
1528 napi_reference_unref(task->env_, task->taskRef_, nullptr);
1529 }
1530 }
1531
1532 if (group->currentGroupInfo_ != nullptr) {
1533 delete group->currentGroupInfo_;
1534 }
1535 }
1536 group->CancelPendingGroup(env);
1537 }
1538
CancelGroup(napi_env env,uint64_t groupId)1539 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1540 {
1541 std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
1542 HITRACE_HELPER_METER_NAME(strTrace);
1543 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
1544 TaskGroup* taskGroup = GetTaskGroup(groupId);
1545 if (taskGroup == nullptr) {
1546 HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1547 return;
1548 }
1549 if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1550 return;
1551 }
1552 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
1553 if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
1554 taskGroup->groupState_ == ExecuteState::FINISHED) {
1555 std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
1556 HILOG_ERROR("%{public}s", errMsg.c_str());
1557 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
1558 return;
1559 }
1560 ExecuteState groupState = taskGroup->groupState_;
1561 taskGroup->groupState_ = ExecuteState::CANCELED;
1562 taskGroup->CancelPendingGroup(env);
1563 if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
1564 for (uint64_t taskId : taskGroup->taskIds_) {
1565 CancelGroupTask(env, taskId, taskGroup);
1566 }
1567 if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
1568 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1569 taskGroup->RejectResult(env, error);
1570 return;
1571 }
1572 }
1573 if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
1574 auto engine = reinterpret_cast<NativeEngine*>(env);
1575 for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
1576 engine->DecreaseSubEnvCounter();
1577 }
1578 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1579 taskGroup->RejectResult(env, error);
1580 }
1581 }
1582
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1583 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1584 {
1585 HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
1586 auto task = TaskManager::GetInstance().GetTask(taskId);
1587 if (task == nullptr) {
1588 HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1589 return;
1590 }
1591 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1592 if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
1593 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1594 task->DecreaseTaskRefCount();
1595 TaskManager::GetInstance().DecreaseRefCount(env, taskId);
1596 TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1597 delete task->currentTaskInfo_;
1598 task->currentTaskInfo_ = nullptr;
1599 if (group->currentGroupInfo_ != nullptr) {
1600 group->currentGroupInfo_->finishedTaskNum++;
1601 }
1602 }
1603 task->taskState_ = ExecuteState::CANCELED;
1604 }
1605
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1606 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1607 {
1608 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1609 seqRunners_.emplace(seqRunnerId, seqRunner);
1610 }
1611
RemoveSequenceRunner(uint64_t seqRunnerId)1612 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1613 {
1614 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1615 seqRunners_.erase(seqRunnerId);
1616 }
1617
GetSeqRunner(uint64_t seqRunnerId)1618 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1619 {
1620 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1621 auto iter = seqRunners_.find(seqRunnerId);
1622 if (iter != seqRunners_.end()) {
1623 return iter->second;
1624 }
1625 HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
1626 return nullptr;
1627 }
1628
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1629 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1630 {
1631 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1632 auto iter = seqRunners_.find(seqRunnerId);
1633 if (iter == seqRunners_.end()) {
1634 HILOG_ERROR("seqRunner:: seqRunner not found.");
1635 return;
1636 } else {
1637 std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1638 iter->second->seqRunnerTasks_.push(task);
1639 }
1640 }
1641
TriggerSeqRunner(napi_env env,Task * lastTask)1642 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1643 {
1644 uint64_t seqRunnerId = lastTask->seqRunnerId_;
1645 SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1646 if (seqRunner == nullptr) {
1647 HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1648 return false;
1649 }
1650 if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
1651 HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
1652 return false;
1653 }
1654 if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1655 HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1656 return false;
1657 }
1658 {
1659 std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1660 if (seqRunner->seqRunnerTasks_.empty()) {
1661 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
1662 seqRunner->currentTaskId_ = 0;
1663 return true;
1664 }
1665 Task* task = seqRunner->seqRunnerTasks_.front();
1666 seqRunner->seqRunnerTasks_.pop();
1667 while (task->taskState_ == ExecuteState::CANCELED) {
1668 DisposeCanceledTask(env, task);
1669 if (seqRunner->seqRunnerTasks_.empty()) {
1670 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
1671 std::to_string(seqRunnerId).c_str());
1672 seqRunner->currentTaskId_ = 0;
1673 return true;
1674 }
1675 task = seqRunner->seqRunnerTasks_.front();
1676 seqRunner->seqRunnerTasks_.pop();
1677 }
1678 seqRunner->currentTaskId_ = task->taskId_;
1679 task->IncreaseRefCount();
1680 task->taskState_ = ExecuteState::WAITING;
1681 HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
1682 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
1683 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1684 }
1685 return true;
1686 }
1687
DisposeCanceledTask(napi_env env,Task * task)1688 void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
1689 {
1690 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
1691 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
1692 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1693 napi_reference_unref(env, task->taskRef_, nullptr);
1694 delete task->currentTaskInfo_;
1695 task->currentTaskInfo_ = nullptr;
1696 }
1697
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1698 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1699 {
1700 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1701 taskGroups_.emplace(groupId, taskGroup);
1702 }
1703
RemoveTaskGroup(uint64_t groupId)1704 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1705 {
1706 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1707 taskGroups_.erase(groupId);
1708 }
1709
GetTaskGroup(uint64_t groupId)1710 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1711 {
1712 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1713 auto groupIter = taskGroups_.find(groupId);
1714 if (groupIter == taskGroups_.end()) {
1715 return nullptr;
1716 }
1717 return reinterpret_cast<TaskGroup*>(groupIter->second);
1718 }
1719
UpdateGroupState(uint64_t groupId)1720 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
1721 {
1722 HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
1723 // During the modification process of the group, prevent other sub threads from performing other
1724 // operations on the group pointer, which may cause the modification to fail.
1725 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1726 auto groupIter = taskGroups_.find(groupId);
1727 if (groupIter == taskGroups_.end()) {
1728 return false;
1729 }
1730 TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
1731 if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
1732 HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
1733 return false;
1734 }
1735 group->groupState_ = ExecuteState::RUNNING;
1736 return true;
1737 }
1738
1739 // ----------------------------------- SequenceRunnerManager ----------------------------------------
GetInstance()1740 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
1741 {
1742 static SequenceRunnerManager sequenceRunnerManager;
1743 return sequenceRunnerManager;
1744 }
1745
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)1746 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
1747 const std::string &name, uint32_t priority)
1748 {
1749 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1750 SequenceRunner *seqRunner = nullptr;
1751 auto iter = globalSeqRunner_.find(name);
1752 if (iter == globalSeqRunner_.end()) {
1753 seqRunner = new SequenceRunner();
1754 // refresh priority default values on first creation
1755 if (argc == 2) { // 2: The number of parameters is 2.
1756 seqRunner->priority_ = static_cast<Priority>(priority);
1757 }
1758 seqRunner->isGlobalRunner_ = true;
1759 seqRunner->seqName_ = name;
1760 globalSeqRunner_.emplace(name, seqRunner);
1761 } else {
1762 seqRunner = iter->second;
1763 if (priority != seqRunner->priority_) {
1764 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
1765 return nullptr;
1766 }
1767 }
1768 seqRunner->count_++;
1769 auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
1770 if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
1771 napi_ref gloableSeqRunnerRef = nullptr;
1772 napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
1773 seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
1774 }
1775
1776 return seqRunner;
1777 }
1778
TriggerGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1779 bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1780 {
1781 if (seqRunner->isGlobalRunner_) {
1782 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1783 auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1784 if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1785 return false;
1786 }
1787 napi_reference_unref(env, iter->second, nullptr);
1788 } else {
1789 napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1790 }
1791 return true;
1792 }
1793
DecreaseSeqCount(SequenceRunner * seqRunner)1794 uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
1795 {
1796 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1797 return --(seqRunner->count_);
1798 }
1799
RemoveGlobalSeqRunnerRef(napi_env env,SequenceRunner * seqRunner)1800 void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
1801 {
1802 std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
1803 auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1804 if (iter != seqRunner->globalSeqRunnerRef_.end()) {
1805 napi_delete_reference(env, iter->second);
1806 seqRunner->globalSeqRunnerRef_.erase(iter);
1807 }
1808 }
1809
RemoveSequenceRunner(const std::string & name)1810 void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
1811 {
1812 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1813 auto iter = globalSeqRunner_.find(name.c_str());
1814 if (iter != globalSeqRunner_.end()) {
1815 globalSeqRunner_.erase(iter->first);
1816 }
1817 }
1818
GlobalSequenceRunnerDestructor(napi_env env,SequenceRunner * seqRunner)1819 void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
1820 {
1821 RemoveGlobalSeqRunnerRef(env, seqRunner);
1822 if (DecreaseSeqCount(seqRunner) == 0) {
1823 RemoveSequenceRunner(seqRunner->seqName_);
1824 TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
1825 delete seqRunner;
1826 }
1827 }
1828
IncreaseGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1829 bool SequenceRunnerManager::IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1830 {
1831 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1832 if (seqRunner->isGlobalRunner_) {
1833 auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1834 if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1835 return false;
1836 }
1837 napi_reference_ref(env, iter->second, nullptr);
1838 } else {
1839 napi_reference_ref(env, seqRunner->seqRunnerRef_, nullptr);
1840 }
1841 return true;
1842 }
1843 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1844