1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17
18 #if defined(ENABLE_TASKPOOL_FFRT)
19 #include "c/executor_task.h"
20 #include "ffrt_inner.h"
21 #endif
22 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
23 #include "helper/hitrace_helper.h"
24 #include "process_helper.h"
25 #include "task_group.h"
26 #include "task_manager.h"
27 #include "taskpool.h"
28 #include "tools/log.h"
29
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33
PriorityScope(Worker * worker,Priority taskPriority)34 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
35 {
36 if (taskPriority != worker->priority_) {
37 HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
38 if (TaskManager::GetInstance().EnableFfrt()) {
39 #if defined(ENABLE_TASKPOOL_FFRT)
40 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
41 SetWorkerPriority(taskPriority);
42 }
43 #endif
44 } else {
45 SetWorkerPriority(taskPriority);
46 }
47 worker->priority_ = taskPriority;
48 }
49 }
50
~RunningScope()51 Worker::RunningScope::~RunningScope()
52 {
53 HILOG_DEBUG("taskpool:: RunningScope destruction");
54 if (scope_ != nullptr) {
55 napi_close_handle_scope(worker_->workerEnv_, scope_);
56 }
57 worker_->NotifyIdle();
58 worker_->idleState_ = true;
59 }
60
WorkerConstructor(napi_env env)61 Worker* Worker::WorkerConstructor(napi_env env)
62 {
63 HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
64 Worker* worker = new Worker(env);
65 worker->StartExecuteInThread();
66 return worker;
67 }
68
ReleaseWorkerHandles(const uv_async_t * req)69 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
70 {
71 auto worker = static_cast<Worker*>(req->data);
72 HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
73 if (!worker->CheckFreeConditions()) {
74 return;
75 }
76
77 TaskManager::GetInstance().RemoveWorker(worker);
78 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
79 HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
80 TaskManager::GetInstance().GetThreadNum());
81 // when there is no active handle, worker loop will stop automatically.
82 ConcurrentHelper::UvHandleClose(worker->performTaskSignal_);
83 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
84 ConcurrentHelper::UvHandleClose(worker->debuggerOnPostTaskSignal_);
85 #endif
86 ConcurrentHelper::UvHandleClose(worker->clearWorkerSignal_);
87 ConcurrentHelper::UvHandleClose(worker->triggerGCCheckSignal_);
88 worker->triggerGCCheckSignal_ = nullptr;
89
90 uv_loop_t* loop = worker->GetWorkerLoop();
91 if (loop != nullptr) {
92 uv_stop(loop);
93 }
94 }
95
CheckFreeConditions()96 bool Worker::CheckFreeConditions()
97 {
98 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
99 // only when all conditions are met can the worker be freed
100 if (workerEngine->HasListeningCounter()) {
101 HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
102 } else if (Timer::HasTimer(workerEnv_)) {
103 HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
104 } else if (workerEngine->HasWaitingRequest()) {
105 HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
106 } else if (workerEngine->HasSubEnv()) {
107 HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
108 } else if (workerEngine->HasPendingJob()) {
109 HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
110 } else if (workerEngine->IsProfiling()) {
111 HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
112 } else {
113 return true;
114 }
115 HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
116 TaskManager& taskManager = TaskManager::GetInstance();
117 taskManager.RestoreWorker(this);
118 taskManager.CountTraceForWorker();
119 return false;
120 }
121
StartExecuteInThread()122 void Worker::StartExecuteInThread()
123 {
124 if (!runner_) {
125 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
126 }
127 if (runner_) {
128 runner_->Execute(); // start a new thread
129 } else {
130 HILOG_ERROR("taskpool:: runner_ is nullptr");
131 }
132 }
133
134 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)135 void Worker::HandleDebuggerTask(const uv_async_t* req)
136 {
137 Worker* worker = reinterpret_cast<Worker*>(req->data);
138 if (worker == nullptr) {
139 HILOG_ERROR("taskpool:: worker is null");
140 return;
141 }
142 worker->debuggerMutex_.lock();
143 auto task = std::move(worker->debuggerQueue_.front());
144 worker->debuggerQueue_.pop();
145 worker->debuggerMutex_.unlock();
146 task();
147 }
148
DebuggerOnPostTask(std::function<void ()> && task)149 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
150 {
151 if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
152 std::lock_guard<std::mutex> lock(debuggerMutex_);
153 debuggerQueue_.push(std::move(task));
154 uv_async_send(debuggerOnPostTaskSignal_);
155 }
156 }
157 #endif
158
159 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()160 void Worker::InitFfrtInfo()
161 {
162 if (TaskManager::GetInstance().EnableFfrt()) {
163 static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
164 {ffrt::qos_background, Priority::IDLE},
165 {ffrt::qos_utility, Priority::LOW},
166 {ffrt::qos_default, Priority::DEFAULT},
167 {ffrt::qos_user_initiated, Priority::HIGH},
168 };
169 ffrt_qos_t qos = ffrt_this_task_get_qos();
170 priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
171 ffrtTaskHandle_ = ffrt_get_cur_task();
172 }
173 }
174
InitLoopHandleNum()175 void Worker::InitLoopHandleNum()
176 {
177 if (ffrtTaskHandle_ == nullptr) {
178 return;
179 }
180
181 uv_loop_t* loop = GetWorkerLoop();
182 if (loop != nullptr) {
183 initActiveHandleNum_ = loop->active_handles;
184 } else {
185 HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
186 }
187 }
188
IsLoopActive()189 bool Worker::IsLoopActive()
190 {
191 uv_loop_t* loop = GetWorkerLoop();
192 if (loop != nullptr) {
193 return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
194 } else {
195 HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
196 return false;
197 }
198 }
199
GetWaitTime()200 uint64_t Worker::GetWaitTime()
201 {
202 return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
203 }
204 #endif
205
ExecuteInThread(const void * data)206 void Worker::ExecuteInThread(const void* data)
207 {
208 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
209 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
210 {
211 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
212 if (worker->workerEnv_ == nullptr) {
213 HILOG_ERROR("taskpool:: worker create runtime failed");
214 return;
215 }
216 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
217 // mark worker env is taskpoolThread
218 workerEngine->MarkTaskPoolThread();
219 workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
220 }
221 uv_loop_t* loop = worker->GetWorkerLoop();
222 if (loop == nullptr) {
223 HILOG_ERROR("taskpool:: loop is nullptr");
224 return;
225 }
226 // save the worker tid
227 worker->tid_ = GetThreadId();
228
229 // Init worker task execute signal
230 ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
231 ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
232 ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
233
234 HITRACE_HELPER_FINISH_TRACE;
235 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
236 // Init debugger task post signal
237 ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
238 #endif
239 if (worker->PrepareForWorkerInstance()) {
240 // Call after uv_async_init
241 worker->NotifyWorkerCreated();
242 #if defined(ENABLE_TASKPOOL_FFRT)
243 worker->InitFfrtInfo();
244 worker->InitLoopHandleNum();
245 #endif
246 worker->RunLoop();
247 } else {
248 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
249 }
250 TaskManager::GetInstance().RemoveWorker(worker);
251 TaskManager::GetInstance().CountTraceForWorker();
252 worker->ReleaseWorkerThreadContent();
253 delete worker;
254 worker = nullptr;
255 }
256
PrepareForWorkerInstance()257 bool Worker::PrepareForWorkerInstance()
258 {
259 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
260 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
261 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
262 workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
263 this->DebuggerOnPostTask(std::move(task));
264 });
265 #endif
266 if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
267 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
268 return false;
269 }
270 // register timer interface
271 Timer::RegisterTime(workerEnv_);
272
273 // Check exception after worker construction
274 if (NapiHelper::IsExceptionPending(workerEnv_)) {
275 HILOG_ERROR("taskpool:: Worker construction occur exception");
276 return false;
277 }
278 return true;
279 }
280
ReleaseWorkerThreadContent()281 void Worker::ReleaseWorkerThreadContent()
282 {
283 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
284 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
285 if (workerEngine == nullptr) {
286 HILOG_ERROR("taskpool:: workerEngine is nullptr");
287 return;
288 }
289 if (hostEngine != nullptr) {
290 if (!hostEngine->DeleteWorker(workerEngine)) {
291 HILOG_ERROR("taskpool:: DeleteWorker fail");
292 }
293 }
294 if (state_ == WorkerState::BLOCKED) {
295 HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
296 } else {
297 HITRACE_HELPER_METER_NAME("Thread Exit");
298 }
299
300 Timer::ClearEnvironmentTimer(workerEnv_);
301 // 2. delete NativeEngine created in worker thread
302 if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
303 HILOG_ERROR("worker:: CallOffWorkerFunc error");
304 }
305 delete workerEngine;
306 workerEnv_ = nullptr;
307 }
308
NotifyExecuteTask()309 void Worker::NotifyExecuteTask()
310 {
311 if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
312 uv_async_send(performTaskSignal_);
313 }
314 }
315
NotifyIdle()316 void Worker::NotifyIdle()
317 {
318 TaskManager::GetInstance().NotifyWorkerIdle(this);
319 }
320
NotifyWorkerCreated()321 void Worker::NotifyWorkerCreated()
322 {
323 TaskManager::GetInstance().NotifyWorkerCreated(this);
324 }
325
NotifyTaskBegin()326 void Worker::NotifyTaskBegin()
327 {
328 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
329 workerEngine->NotifyTaskBegin();
330 }
331
TriggerGCCheck(const uv_async_t * req)332 void Worker::TriggerGCCheck(const uv_async_t* req)
333 {
334 if (req == nullptr || req->data == nullptr) {
335 HILOG_ERROR("taskpool:: req handle is invalid");
336 return;
337 }
338 auto worker = reinterpret_cast<Worker*>(req->data);
339 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
340 workerEngine->NotifyTaskFinished();
341 }
342
NotifyTaskFinished()343 void Worker::NotifyTaskFinished()
344 {
345 // trigger gc check by uv and return immediately if the handle is invalid
346 if (UNLIKELY(triggerGCCheckSignal_ == nullptr || uv_is_closing(
347 reinterpret_cast<uv_handle_t*>(triggerGCCheckSignal_)))) {
348 HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
349 return;
350 } else {
351 uv_async_send(triggerGCCheckSignal_);
352 }
353
354 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
355 if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
356 // the worker state is still RUNNING and the start time will be updated
357 startTime_ = ConcurrentHelper::GetMilliseconds();
358 } else {
359 UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
360 }
361 idlePoint_ = ConcurrentHelper::GetMilliseconds();
362 }
363
UpdateWorkerState(WorkerState expect,WorkerState desired)364 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
365 {
366 return state_.compare_exchange_strong(expect, desired);
367 }
368
PerformTask(const uv_async_t * req)369 void Worker::PerformTask(const uv_async_t* req)
370 {
371 uint64_t startTime = ConcurrentHelper::GetMilliseconds();
372 auto worker = static_cast<Worker*>(req->data);
373 napi_env env = worker->workerEnv_;
374 TaskManager::GetInstance().NotifyWorkerRunning(worker);
375 auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
376 if (taskInfo.first == 0) {
377 worker->NotifyIdle();
378 return;
379 }
380 RunningScope runningScope(worker);
381 PriorityScope priorityScope(worker, taskInfo.second);
382 Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
383 if (task == nullptr) {
384 HILOG_DEBUG("taskpool:: task has been released");
385 return;
386 } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
387 HILOG_WARN("taskpool:: task is invalid");
388 delete task;
389 return;
390 }
391 // try to record the memory data for gc
392 worker->NotifyTaskBegin();
393
394 if (!task->UpdateTask(startTime, worker)) {
395 worker->NotifyTaskFinished();
396 return;
397 }
398 if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) {
399 return;
400 }
401 if (task->IsLongTask()) {
402 worker->UpdateLongTaskInfo(task);
403 }
404 worker->StoreTaskId(task->taskId_);
405 // tag for trace parse: Task Perform
406 std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_)
407 + ", priority : " + std::to_string(taskInfo.second);
408 HITRACE_HELPER_METER_NAME(strTrace);
409 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
410
411 napi_value func = nullptr;
412 napi_value args = nullptr;
413 napi_value errorInfo = task->DeserializeValue(env, &func, &args);
414 if (UNLIKELY(func == nullptr || args == nullptr)) {
415 if (errorInfo != nullptr) {
416 worker->NotifyTaskResult(env, task, errorInfo);
417 }
418 return;
419 }
420 if (!worker->InitTaskPoolFunc(env, func, task)) {
421 return;
422 }
423 worker->hasExecuted_ = true;
424 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
425 napi_value argsArray[argsNum];
426 for (size_t i = 0; i < argsNum; i++) {
427 argsArray[i] = NapiHelper::GetElement(env, args, i);
428 }
429
430 if (!task->CheckStartExecution(taskInfo.second)) {
431 if (task->ShouldDeleteTask()) {
432 delete task;
433 }
434 return;
435 }
436 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
437 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
438 workerEngine->ClearCurrentTaskInfo();
439 task->DecreaseRefCount();
440 task->StoreTaskDuration();
441 worker->UpdateExecutedInfo();
442 HandleFunctionException(env, task);
443 }
444
NotifyTaskResult(napi_env env,Task * task,napi_value result)445 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
446 {
447 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
448 HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
449 void* resultData = nullptr;
450 napi_value undefined = NapiHelper::GetUndefinedValue(env);
451 bool defaultTransfer = true;
452 bool defaultCloneSendable = false;
453 napi_status status = napi_serialize_inner(env, result, undefined, undefined,
454 defaultTransfer, defaultCloneSendable, &resultData);
455 if ((status != napi_ok || resultData == nullptr) && task->success_) {
456 task->success_ = false;
457 std::string errMessage = "taskpool: failed to serialize result.";
458 HILOG_ERROR("%{public}s", errMessage.c_str());
459 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
460 NotifyTaskResult(env, task, err);
461 return;
462 }
463 task->result_ = resultData;
464 NotifyHandleTaskResult(task);
465 }
466
NotifyHandleTaskResult(Task * task)467 void Worker::NotifyHandleTaskResult(Task* task)
468 {
469 if (!task->IsReadyToHandle()) {
470 return;
471 }
472 Worker* worker = reinterpret_cast<Worker*>(task->worker_);
473 if (worker != nullptr) {
474 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
475 auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
476 if (iter != worker->currentTaskId_.end()) {
477 worker->currentTaskId_.erase(iter);
478 }
479 } else {
480 HILOG_FATAL("taskpool:: worker is nullptr");
481 return;
482 }
483 if (!task->VerifyAndPostResult(worker->priority_)) {
484 if (task->ShouldDeleteTask()) {
485 delete task;
486 }
487 }
488 worker->NotifyTaskFinished();
489 }
490
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)491 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
492 {
493 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
494 if (env == nullptr) { // LCOV_EXCL_BR_LINE
495 HILOG_FATAL("taskpool:: TaskResultCallback engine is null");
496 return;
497 }
498 if (data == nullptr) { // LCOV_EXCL_BR_LINE
499 HILOG_FATAL("taskpool:: data is nullptr");
500 return;
501 }
502 Task* task = static_cast<Task*>(data);
503 auto taskId = reinterpret_cast<uint64_t>(task);
504 if (TaskManager::GetInstance().GetTask(taskId) == nullptr) {
505 HILOG_FATAL("taskpool:: task is nullptr");
506 return;
507 }
508 auto worker = static_cast<Worker*>(task->worker_);
509 worker->isExecutingLongTask_ = task->IsLongTask();
510 task->DecreaseRefCount();
511 task->ioTime_ = ConcurrentHelper::GetMilliseconds();
512 if (task->cpuTime_ != 0) {
513 uint64_t ioDuration = task->ioTime_ - task->startTime_;
514 uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
515 TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
516 }
517 task->success_ = success;
518 NotifyTaskResult(env, task, result);
519 }
520
521 // reset qos_user_initiated after perform task
ResetWorkerPriority()522 void Worker::ResetWorkerPriority()
523 {
524 if (priority_ != Priority::HIGH) {
525 if (TaskManager::GetInstance().EnableFfrt()) {
526 #if defined(ENABLE_TASKPOOL_FFRT)
527 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
528 SetWorkerPriority(Priority::HIGH);
529 }
530 #endif
531 } else {
532 SetWorkerPriority(Priority::HIGH);
533 }
534 priority_ = Priority::HIGH;
535 }
536 }
537
StoreTaskId(uint64_t taskId)538 void Worker::StoreTaskId(uint64_t taskId)
539 {
540 std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
541 currentTaskId_.emplace_back(taskId);
542 }
543
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)544 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
545 {
546 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
547 bool success = workerEngine->InitTaskPoolFunc(env, func, task);
548 napi_value exception;
549 napi_get_and_clear_last_exception(env, &exception);
550 if (exception != nullptr) {
551 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
552 task->success_ = false;
553 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
554 NotifyTaskResult(env, task, errorEvent);
555 return false;
556 }
557 if (!success) {
558 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
559 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
560 "taskpool:: function may not be concurrent.");
561 task->success_ = false;
562 NotifyTaskResult(env, task, err);
563 return false;
564 }
565 return true;
566 }
567
UpdateExecutedInfo()568 void Worker::UpdateExecutedInfo()
569 {
570 // if the worker is blocked, just skip
571 if (LIKELY(state_ != WorkerState::BLOCKED)) {
572 uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
573 TaskManager::GetInstance().UpdateExecutedInfo(duration);
574 }
575 }
576
577 // Only when the worker has no longTask can it be released.
TerminateTask(uint64_t taskId)578 void Worker::TerminateTask(uint64_t taskId)
579 {
580 HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
581 std::lock_guard<std::mutex> lock(longMutex_);
582 longTasksSet_.erase(taskId);
583 if (longTasksSet_.empty()) {
584 hasLongTask_ = false;
585 }
586 }
587
588 // to store longTasks' state
UpdateLongTaskInfo(Task * task)589 void Worker::UpdateLongTaskInfo(Task* task)
590 {
591 HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
592 TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
593 std::lock_guard<std::mutex> lock(longMutex_);
594 hasLongTask_ = true;
595 isExecutingLongTask_ = true;
596 longTasksSet_.emplace(task->taskId_);
597 }
598
IsExecutingLongTask()599 bool Worker::IsExecutingLongTask()
600 {
601 return isExecutingLongTask_;
602 }
603
HasLongTask()604 bool Worker::HasLongTask()
605 {
606 return hasLongTask_;
607 }
608
HandleFunctionException(napi_env env,Task * task)609 void Worker::HandleFunctionException(napi_env env, Task* task)
610 {
611 napi_value exception;
612 napi_get_and_clear_last_exception(env, &exception);
613 if (exception != nullptr) {
614 HILOG_ERROR("taskpool::PerformTask occur exception");
615 task->DecreaseRefCount();
616 task->success_ = false;
617 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
618 NotifyTaskResult(env, task, errorEvent);
619 return;
620 }
621 NotifyHandleTaskResult(task);
622 }
623 } // namespace Commonlibrary::Concurrent::TaskPoolModule
624