1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17
18 #include <cinttypes>
19
20 #include "helper/error_helper.h"
21 #include "helper/hitrace_helper.h"
22 #include "helper/napi_helper.h"
23 #include "helper/object_helper.h"
24 #include "message_queue.h"
25 #include "task_manager.h"
26 #include "tools/log.h"
27 #include "worker.h"
28
29 namespace Commonlibrary::Concurrent::TaskPoolModule {
30 using namespace Commonlibrary::Concurrent::Common::Helper;
31
InitTaskPool(napi_env env,napi_value exports)32 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
33 {
34 HILOG_INFO("taskpool:: Import taskpool");
35 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
36 napi_value taskClass = nullptr;
37 napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
38 napi_value longTaskClass = nullptr;
39 napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
40 nullptr, 0, nullptr, &longTaskClass);
41 napi_value genericsTaskClass = nullptr;
42 napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
43 nullptr, 0, nullptr, &genericsTaskClass);
44 napi_value isCanceledFunc = nullptr;
45 napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc);
46 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
47 napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
48 napi_value sendDataFunc = nullptr;
49 napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc);
50 napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
51 napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
52 napi_value taskGroupClass = nullptr;
53 napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
54 &taskGroupClass);
55 napi_value seqRunnerClass = nullptr;
56 napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
57 nullptr, 0, nullptr, &seqRunnerClass);
58
59 // define priority
60 napi_value priorityObj = NapiHelper::CreateObject(env);
61 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
62 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
63 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
64 napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
65 napi_property_descriptor exportPriority[] = {
66 DECLARE_NAPI_PROPERTY("HIGH", highPriority),
67 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
68 DECLARE_NAPI_PROPERTY("LOW", lowPriority),
69 DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
70 };
71 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
72
73 // define State
74 napi_value stateObj = NapiHelper::CreateObject(env);
75 napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
76 napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
77 napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
78 napi_property_descriptor exportState[] = {
79 DECLARE_NAPI_PROPERTY("WAITING", waitingState),
80 DECLARE_NAPI_PROPERTY("RUNNING", runningState),
81 DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
82 };
83 napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
84
85 napi_property_descriptor properties[] = {
86 DECLARE_NAPI_PROPERTY("Task", taskClass),
87 DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
88 DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
89 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
90 DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
91 DECLARE_NAPI_PROPERTY("Priority", priorityObj),
92 DECLARE_NAPI_PROPERTY("State", stateObj),
93 DECLARE_NAPI_FUNCTION("execute", Execute),
94 DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
95 DECLARE_NAPI_FUNCTION("cancel", Cancel),
96 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
97 DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
98 DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
99 DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
100 };
101 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
102
103 TaskManager::GetInstance().InitTaskManager(env);
104 return exports;
105 }
106
107 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t * req)108 void TaskPool::ExecuteCallback(const uv_async_t* req)
109 {
110 auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
111 if (msgQueue == nullptr) {
112 HILOG_WARN("taskpool:: msgQueue is nullptr");
113 return;
114 }
115 ExecuteCallbackInner(*msgQueue);
116 }
117
ExecuteCallbackTask(CallbackInfo * callbackInfo)118 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
119 {
120 auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
121 if (msgQueue == nullptr) {
122 HILOG_WARN("taskpool:: msgQueue is nullptr");
123 return;
124 }
125 ExecuteCallbackInner(*msgQueue);
126 }
127
ExecuteCallbackInner(MsgQueue & msgQueue)128 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
129 {
130 while (!msgQueue.IsEmpty()) {
131 auto resultInfo = msgQueue.DeQueue();
132 if (resultInfo == nullptr) {
133 HILOG_DEBUG("taskpool:: resultInfo is nullptr");
134 continue;
135 }
136 ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
137 napi_status status = napi_ok;
138 CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
139 if (status != napi_ok) {
140 HILOG_ERROR("napi_open_handle_scope failed");
141 return;
142 }
143 auto env = resultInfo->hostEnv;
144 auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
145 if (callbackInfo == nullptr) {
146 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
147 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
148 continue;
149 }
150 TaskManager::GetInstance().ResetCallbackInfoWorker(callbackInfo);
151 auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
152 napi_value args;
153 napi_value result;
154 status = napi_deserialize(env, resultInfo->serializationArgs, &args);
155 napi_delete_serialization_data(env, resultInfo->serializationArgs);
156 if (status != napi_ok || args == nullptr) {
157 std::string errMessage = "taskpool:: failed to serialize function";
158 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
159 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
160 continue;
161 }
162 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
163 napi_value argsArray[argsNum];
164 for (size_t i = 0; i < argsNum; i++) {
165 argsArray[i] = NapiHelper::GetElement(env, args, i);
166 }
167 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
168 if (NapiHelper::IsExceptionPending(env)) {
169 napi_value exception = nullptr;
170 napi_get_and_clear_last_exception(env, &exception);
171 HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
172 }
173 }
174 }
175 // ---------------------------------- SendData ---------------------------------------
176
177 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
178 {
179 napi_value result = nullptr;
180 napi_create_object(env, &result);
181 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
182 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
183 napi_set_named_property(env, result, "threadInfos", threadInfos);
184 napi_set_named_property(env, result, "taskInfos", taskInfos);
185 return result;
186 }
187
TerminateTask(napi_env env,napi_callback_info cbinfo)188 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
189 {
190 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
191 size_t argc = 1; // 1: long task
192 napi_value args[1];
193 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
194 if (argc < 1) {
195 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
196 "the number of the params must be one.");
197 return nullptr;
198 }
199 if (!NapiHelper::IsObject(env, args[0])) {
200 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
201 "the type of the params must be object.");
202 return nullptr;
203 }
204 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
205 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
206 auto task = TaskManager::GetInstance().GetTask(taskId);
207 if (task == nullptr || !task->IsLongTask()) {
208 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
209 "the type of the params must be long task.");
210 return nullptr;
211 }
212 TaskManager::GetInstance().TerminateTask(taskId);
213 return nullptr;
214 }
215
Execute(napi_env env,napi_callback_info cbinfo)216 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
217 {
218 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
219 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
220 if (argc < 1) {
221 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
222 "the number of params must be at least one.");
223 return nullptr;
224 }
225 napi_value* args = new napi_value[argc];
226 ObjectScope<napi_value> scope(args, true);
227 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
228 napi_valuetype type = napi_undefined;
229 napi_typeof(env, args[0], &type);
230 if (type == napi_object) {
231 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
232 if (argc > 1) {
233 if (!NapiHelper::IsNumber(env, args[1])) {
234 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
235 "the type of the second param must be number.");
236 return nullptr;
237 }
238 priority = NapiHelper::GetUint32Value(env, args[1]);
239 if (priority >= Priority::NUMBER) {
240 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
241 return nullptr;
242 }
243 }
244 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
245 return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
246 }
247 Task* task = nullptr;
248 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
249 if (task == nullptr) {
250 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
251 "the type of the first param must be task.");
252 return nullptr;
253 }
254 if (!task->CanExecute(env)) {
255 return nullptr;
256 }
257 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
258 static_cast<Priority>(priority));
259 if (promise == nullptr) {
260 return nullptr;
261 }
262 ExecuteTask(env, task, static_cast<Priority>(priority));
263 return promise;
264 }
265 if (type != napi_function) {
266 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
267 "the type of the first param must be object or function.");
268 return nullptr;
269 }
270 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
271 if (task == nullptr) {
272 HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
273 return nullptr;
274 }
275 TaskManager::GetInstance().StoreTask(task->taskId_, task);
276 napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
277 ExecuteTask(env, task);
278 return promise;
279 }
280
DelayTask(uv_timer_t * handle)281 void TaskPool::DelayTask(uv_timer_t* handle)
282 {
283 TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data);
284 auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
285 napi_status status = napi_ok;
286 if (task == nullptr) {
287 HILOG_DEBUG("taskpool:: task is nullptr");
288 } else if (task->taskState_ == ExecuteState::CANCELED) {
289 HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
290 HandleScope scope(task->env_, status);
291 if (status != napi_ok) {
292 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
293 return;
294 }
295 napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
296 napi_reject_deferred(task->env_, taskMessage->deferred, error);
297 } else {
298 HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
299 HandleScope scope(task->env_, status);
300 if (status != napi_ok) {
301 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
302 return;
303 }
304 TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
305 task->IncreaseRefCount();
306 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
307 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
308 if (taskInfo != nullptr) {
309 taskInfo->deferred = taskMessage->deferred;
310 if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
311 task->taskState_ = ExecuteState::WAITING;
312 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
313 }
314 } else {
315 napi_value execption = nullptr;
316 napi_get_and_clear_last_exception(task->env_, &execption);
317 if (execption != nullptr) {
318 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
319 }
320 }
321 }
322 if (task != nullptr) {
323 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
324 task->delayedTimers_.erase(handle);
325 }
326 uv_timer_stop(handle);
327 ConcurrentHelper::UvHandleClose(handle);
328 delete taskMessage;
329 taskMessage = nullptr;
330 }
331
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)332 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
333 {
334 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
335 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
336 int32_t delayTime = 0;
337 Task* task = nullptr;
338 if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
339 return nullptr;
340 }
341
342 if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
343 task->taskState_ == ExecuteState::FINISHED) {
344 task->taskState_ = ExecuteState::DELAYED;
345 }
346 task->UpdateTaskType(TaskType::COMMON_TASK);
347
348 uv_loop_t* loop = NapiHelper::GetLibUV(env);
349 uv_update_time(loop);
350 uv_timer_t* timer = new uv_timer_t;
351 uv_timer_init(loop, timer);
352 TaskMessage *taskMessage = new TaskMessage();
353 taskMessage->priority = static_cast<Priority>(priority);
354 taskMessage->taskId = task->taskId_;
355 napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
356 timer->data = taskMessage;
357
358 std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
359 strTrace += ", priority: " + std::to_string(priority);
360 strTrace += ", delayTime " + std::to_string(delayTime);
361 HITRACE_HELPER_METER_NAME(strTrace);
362 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
363
364 uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
365 {
366 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
367 task->delayedTimers_.insert(timer);
368 }
369 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
370 if (engine->IsMainThread()) {
371 uv_async_send(&loop->wq_async);
372 } else {
373 uv_work_t *work = new uv_work_t;
374 uv_queue_work_with_qos(loop, work, [](uv_work_t *) {},
375 [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated);
376 }
377 return promise;
378 }
379
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)380 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
381 {
382 napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
383 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
384 HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
385 auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
386 napi_reference_ref(env, taskGroup->groupRef_, nullptr);
387 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
388 taskGroup->groupState_ == ExecuteState::CANCELED) {
389 taskGroup->groupState_ = ExecuteState::WAITING;
390 }
391 GroupInfo* groupInfo = new GroupInfo();
392 groupInfo->priority = priority;
393 napi_value resArr;
394 napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
395 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
396 groupInfo->resArr = arrRef;
397 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
398 {
399 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
400 if (taskGroup->taskNum_ == 0) {
401 napi_resolve_deferred(env, groupInfo->deferred, resArr);
402 taskGroup->groupState_ = ExecuteState::FINISHED;
403 napi_delete_reference(env, groupInfo->resArr);
404 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
405 delete groupInfo;
406 taskGroup->currentGroupInfo_ = nullptr;
407 return promise;
408 }
409 if (taskGroup->currentGroupInfo_ == nullptr) {
410 taskGroup->currentGroupInfo_ = groupInfo;
411 for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
412 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
413 Task* task = nullptr;
414 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
415 if (task == nullptr) {
416 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
417 return nullptr;
418 }
419 napi_reference_ref(env, task->taskRef_, nullptr);
420 if (task->IsGroupCommonTask()) {
421 task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
422 }
423 ExecuteTask(env, task, static_cast<Priority>(priority));
424 }
425 } else {
426 taskGroup->pendingGroupInfos_.push_back(groupInfo);
427 }
428 }
429 return promise;
430 }
431
HandleTaskResult(const uv_async_t * req)432 void TaskPool::HandleTaskResult(const uv_async_t* req)
433 {
434 HILOG_DEBUG("taskpool:: HandleTaskResult task");
435 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
436 auto task = static_cast<Task*>(req->data);
437 if (task == nullptr) { // LCOV_EXCL_BR_LINE
438 HILOG_FATAL("taskpool:: HandleTaskResult task is null");
439 return;
440 }
441 if (!task->IsMainThreadTask()) {
442 if (task->ShouldDeleteTask(false)) {
443 delete task;
444 return;
445 }
446 if (task->IsFunctionTask()) {
447 napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
448 }
449 }
450 task->DecreaseTaskRefCount();
451 HandleTaskResultCallback(task);
452 }
453
HandleTaskResultCallback(Task * task)454 void TaskPool::HandleTaskResultCallback(Task* task)
455 {
456 napi_handle_scope scope = nullptr;
457 NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
458 napi_value napiTaskResult = nullptr;
459 napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
460 napi_delete_serialization_data(task->env_, task->result_);
461
462 // tag for trace parse: Task PerformTask End
463 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
464 if (task->taskState_ == ExecuteState::CANCELED) {
465 strTrace += ", performResult : IsCanceled";
466 napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
467 } else if (status != napi_ok) {
468 HILOG_ERROR("taskpool: failed to deserialize result");
469 strTrace += ", performResult : DeserializeFailed";
470 } else if (task->success_) {
471 strTrace += ", performResult : Successful";
472 } else {
473 strTrace += ", performResult : Unsuccessful";
474 }
475 HITRACE_HELPER_METER_NAME(strTrace);
476 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
477 if (napiTaskResult == nullptr) {
478 napi_get_undefined(task->env_, &napiTaskResult);
479 }
480 reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
481 bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
482 task->taskState_ = ExecuteState::ENDING;
483 task->isRunning_ = false;
484 if (task->IsGroupTask()) {
485 UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
486 } else if (!task->IsPeriodicTask()) {
487 if (success) {
488 napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
489 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
490 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
491 }
492 } else {
493 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
494 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
495 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
496 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
497 }
498 }
499 }
500 NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
501 TriggerTask(task);
502 HILOG_DEBUG("taskpool:: %{public}s", strTrace.c_str());
503 }
504
TriggerTask(Task * task)505 void TaskPool::TriggerTask(Task* task)
506 {
507 HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
508 if (task->IsGroupTask()) {
509 return;
510 }
511 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
512 task->taskState_ = ExecuteState::FINISHED;
513 // seqRunnerTask will trigger the next
514 if (task->IsSeqRunnerTask()) {
515 if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
516 HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed",
517 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
518 }
519 } else if (task->IsCommonTask()) {
520 task->NotifyPendingTask();
521 }
522 if (task->IsPeriodicTask()) {
523 return;
524 }
525 if (!task->IsFunctionTask()) {
526 napi_reference_unref(task->env_, task->taskRef_, nullptr);
527 return;
528 }
529 TaskManager::GetInstance().RemoveTask(task->taskId_);
530 delete task;
531 }
532
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)533 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
534 {
535 HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
536 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
537 task->taskState_ = ExecuteState::FINISHED;
538 napi_reference_unref(env, task->taskRef_, nullptr);
539 if (task->IsGroupCommonTask()) {
540 delete task->currentTaskInfo_;
541 task->currentTaskInfo_ = nullptr;
542 }
543 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
544 if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
545 HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
546 return;
547 }
548 // store the result
549 uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
550 auto groupInfo = taskGroup->currentGroupInfo_;
551 napi_ref arrRef = groupInfo->resArr;
552 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
553 napi_set_element(env, resArr, index, res);
554 groupInfo->finishedTaskNum++;
555 // store the index when the first exception occurs
556 if (!success && !groupInfo->HasException()) {
557 groupInfo->SetFailedIndex(index);
558 }
559 // we will not handle the result until all tasks are finished
560 if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
561 return;
562 }
563 // if there is no exception, just resolve
564 if (!groupInfo->HasException()) {
565 HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
566 napi_resolve_deferred(env, groupInfo->deferred, resArr);
567 for (uint64_t taskId : taskGroup->taskIds_) {
568 auto task = TaskManager::GetInstance().GetTask(taskId);
569 if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
570 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
571 }
572 }
573 } else {
574 napi_value res = nullptr;
575 napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
576 napi_reject_deferred(env, groupInfo->deferred, res);
577 auto iter = taskGroup->taskIds_.begin();
578 std::advance(iter, groupInfo->GetFailedIndex());
579 auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
580 if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
581 task->onExecutionFailedCallBackInfo_->taskError_ = res;
582 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
583 }
584 }
585 taskGroup->groupState_ = ExecuteState::FINISHED;
586 napi_delete_reference(env, groupInfo->resArr);
587 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
588 delete groupInfo;
589 taskGroup->currentGroupInfo_ = nullptr;
590 taskGroup->NotifyGroupTask(env);
591 }
592
ExecuteTask(napi_env env,Task * task,Priority priority)593 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
594 {
595 // tag for trace parse: Task Allocation
596 std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
597 + ", priority : " + std::to_string(priority)
598 + ", executeState : " + std::to_string(ExecuteState::WAITING);
599 HITRACE_HELPER_METER_NAME(strTrace);
600 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
601 task->IncreaseRefCount();
602 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
603 if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING &&
604 task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) {
605 task->taskState_ = ExecuteState::WAITING;
606 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
607 }
608 }
609
Cancel(napi_env env,napi_callback_info cbinfo)610 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
611 {
612 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
613 size_t argc = 1;
614 napi_value args[1];
615 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
616 if (argc < 1) {
617 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
618 "the number of the params must be 1.");
619 return nullptr;
620 }
621
622 if (!NapiHelper::IsObject(env, args[0])) {
623 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
624 "the type of the params must be object.");
625 return nullptr;
626 }
627
628 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
629 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
630 if (napiTaskId == nullptr) {
631 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
632 "the type of the params must be task.");
633 return nullptr;
634 }
635 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
636 TaskManager::GetInstance().CancelTask(env, taskId);
637 } else {
638 napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
639 if (napiGroupId == nullptr) {
640 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
641 "the type of the params must be taskGroup.");
642 return nullptr;
643 }
644 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
645 TaskGroupManager::GetInstance().CancelGroup(env, groupId);
646 }
647 return nullptr;
648 }
649
IsConcurrent(napi_env env,napi_callback_info cbinfo)650 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
651 {
652 size_t argc = 1;
653 napi_value args[1];
654 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
655 if (argc != 1) {
656 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
657 "the number of the params must be 1.");
658 return nullptr;
659 }
660
661 if (!NapiHelper::IsFunction(env, args[0])) {
662 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
663 "the type of the first param must be function.");
664 return nullptr;
665 }
666
667 bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
668 return NapiHelper::CreateBooleanValue(env, isConcurrent);
669 }
670
PeriodicTaskCallback(uv_timer_t * handle)671 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
672 {
673 Task* task = reinterpret_cast<Task*>(handle->data);
674 if (task == nullptr) {
675 HILOG_DEBUG("taskpool:: the task is nullptr");
676 return;
677 } else if (!task->IsPeriodicTask()) {
678 HILOG_DEBUG("taskpool:: the current task is not a periodic task");
679 return;
680 } else if (task->taskState_ == ExecuteState::CANCELED) {
681 HILOG_DEBUG("taskpool:: the periodic task has been canceled");
682 return;
683 }
684 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
685
686 if (!task->isFirstTaskInfo_) {
687 napi_status status = napi_ok;
688 HandleScope scope(task->env_, status);
689 if (status != napi_ok) {
690 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
691 return;
692 }
693 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
694 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
695 if (taskInfo == nullptr) {
696 HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
697 return;
698 }
699 }
700 task->isFirstTaskInfo_ = false;
701
702 task->IncreaseRefCount();
703 HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
704 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
705 task->taskState_ = ExecuteState::WAITING;
706 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
707 }
708 }
709
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)710 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
711 {
712 int32_t period = 0;
713 uint32_t priority = Priority::DEFAULT;
714 Task* periodicTask = nullptr;
715 if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
716 return nullptr;
717 }
718
719 if (!periodicTask->CanExecutePeriodically(env)) {
720 return nullptr;
721 }
722 periodicTask->UpdatePeriodicTask();
723
724 periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
725 napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
726 TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
727 if (taskInfo == nullptr) {
728 return nullptr;
729 }
730
731 periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
732
733 TriggerTimer(env, periodicTask, period);
734 return nullptr;
735 }
736
TriggerTimer(napi_env env,Task * task,int32_t period)737 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
738 {
739 HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
740 uv_loop_t* loop = NapiHelper::GetLibUV(env);
741 task->timer_ = new uv_timer_t;
742 uv_timer_init(loop, task->timer_);
743 task->timer_->data = task;
744 uv_update_time(loop);
745 uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
746 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
747 if (engine->IsMainThread()) {
748 uv_async_send(&loop->wq_async);
749 } else {
750 uv_work_t* work = new uv_work_t;
751 uv_queue_work_with_qos(loop, work, [](uv_work_t*) {},
752 [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated);
753 }
754 }
755
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)756 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
757 Task* &task)
758 {
759 size_t argc = 3; // 3: delayTime, task and priority
760 napi_value args[3]; // 3: delayTime, task and priority
761 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
762 if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
763 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
764 "the number of params must be two or three.");
765 return false;
766 }
767
768 if (!NapiHelper::IsNumber(env, args[0])) {
769 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
770 "the type of the first param must be number.");
771 return false;
772 }
773
774 if (!NapiHelper::IsObject(env, args[1])) {
775 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
776 "the type of the second param must be object.");
777 return false;
778 }
779
780 delayTime = NapiHelper::GetInt32Value(env, args[0]);
781 if (delayTime < 0) {
782 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
783 return false;
784 }
785
786 if (argc > 2) { // 2: the params might have priority
787 if (!NapiHelper::IsNumber(env, args[2])) {
788 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
789 "the type of the third param must be number.");
790 return false;
791 }
792 priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
793 if (priority >= Priority::NUMBER) {
794 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
795 return false;
796 }
797 }
798
799 napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
800 if (task == nullptr) {
801 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
802 "the type of second param must be task");
803 return false;
804 }
805 if (!task->CanExecuteDelayed(env)) {
806 return false;
807 }
808 return true;
809 }
810
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)811 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
812 uint32_t &priority, Task* &periodicTask)
813 {
814 size_t argc = 3; // 3 : period, task, priority
815 napi_value args[3]; // 3 : period, task, priority
816 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
817 if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
818 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
819 return false;
820 }
821 if (!NapiHelper::IsNumber(env, args[0])) {
822 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
823 return false;
824 }
825 period = NapiHelper::GetInt32Value(env, args[0]);
826 if (period < 0) {
827 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
828 return false;
829 }
830 if (!NapiHelper::IsObject(env, args[1])) {
831 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
832 return false;
833 }
834
835 if (argc >= 3) { // 3 : third param maybe priority
836 if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
837 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
838 return false;
839 }
840 priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
841 if (priority >= Priority::NUMBER) {
842 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
843 return false;
844 }
845 }
846
847 napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
848 if (periodicTask == nullptr) {
849 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
850 return false;
851 }
852
853 return true;
854 }
855 } // namespace Commonlibrary::Concurrent::TaskPoolModule
856