1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "taskpool.h"
17 
18 #include <cinttypes>
19 
20 #include "helper/error_helper.h"
21 #include "helper/hitrace_helper.h"
22 #include "helper/napi_helper.h"
23 #include "helper/object_helper.h"
24 #include "message_queue.h"
25 #include "task_manager.h"
26 #include "tools/log.h"
27 #include "worker.h"
28 
29 namespace Commonlibrary::Concurrent::TaskPoolModule {
30 using namespace Commonlibrary::Concurrent::Common::Helper;
31 
InitTaskPool(napi_env env,napi_value exports)32 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
33 {
34     HILOG_INFO("taskpool:: Import taskpool");
35     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
36     napi_value taskClass = nullptr;
37     napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
38     napi_value longTaskClass = nullptr;
39     napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
40                       nullptr, 0, nullptr, &longTaskClass);
41     napi_value genericsTaskClass = nullptr;
42     napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
43                       nullptr, 0, nullptr, &genericsTaskClass);
44     napi_value isCanceledFunc = nullptr;
45     napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc);
46     napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
47     napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
48     napi_value sendDataFunc = nullptr;
49     napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc);
50     napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
51     napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
52     napi_value taskGroupClass = nullptr;
53     napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
54                       &taskGroupClass);
55     napi_value seqRunnerClass = nullptr;
56     napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
57                       nullptr, 0, nullptr, &seqRunnerClass);
58 
59     // define priority
60     napi_value priorityObj = NapiHelper::CreateObject(env);
61     napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
62     napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
63     napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
64     napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
65     napi_property_descriptor exportPriority[] = {
66         DECLARE_NAPI_PROPERTY("HIGH", highPriority),
67         DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
68         DECLARE_NAPI_PROPERTY("LOW", lowPriority),
69         DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
70     };
71     napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
72 
73     // define State
74     napi_value stateObj = NapiHelper::CreateObject(env);
75     napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
76     napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
77     napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
78     napi_property_descriptor exportState[] = {
79         DECLARE_NAPI_PROPERTY("WAITING", waitingState),
80         DECLARE_NAPI_PROPERTY("RUNNING", runningState),
81         DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
82     };
83     napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
84 
85     napi_property_descriptor properties[] = {
86         DECLARE_NAPI_PROPERTY("Task", taskClass),
87         DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
88         DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
89         DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
90         DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
91         DECLARE_NAPI_PROPERTY("Priority", priorityObj),
92         DECLARE_NAPI_PROPERTY("State", stateObj),
93         DECLARE_NAPI_FUNCTION("execute", Execute),
94         DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
95         DECLARE_NAPI_FUNCTION("cancel", Cancel),
96         DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
97         DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
98         DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
99         DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
100     };
101     napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
102 
103     TaskManager::GetInstance().InitTaskManager(env);
104     return exports;
105 }
106 
107 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t * req)108 void TaskPool::ExecuteCallback(const uv_async_t* req)
109 {
110     auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
111     if (msgQueue == nullptr) {
112         HILOG_WARN("taskpool:: msgQueue is nullptr");
113         return;
114     }
115     ExecuteCallbackInner(*msgQueue);
116 }
117 
ExecuteCallbackTask(CallbackInfo * callbackInfo)118 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
119 {
120     auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
121     if (msgQueue == nullptr) {
122         HILOG_WARN("taskpool:: msgQueue is nullptr");
123         return;
124     }
125     ExecuteCallbackInner(*msgQueue);
126 }
127 
ExecuteCallbackInner(MsgQueue & msgQueue)128 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
129 {
130     while (!msgQueue.IsEmpty()) {
131         auto resultInfo = msgQueue.DeQueue();
132         if (resultInfo == nullptr) {
133             HILOG_DEBUG("taskpool:: resultInfo is nullptr");
134             continue;
135         }
136         ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
137         napi_status status = napi_ok;
138         CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
139         if (status != napi_ok) {
140             HILOG_ERROR("napi_open_handle_scope failed");
141             return;
142         }
143         auto env = resultInfo->hostEnv;
144         auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
145         if (callbackInfo == nullptr) {
146             HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
147             ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
148             continue;
149         }
150         TaskManager::GetInstance().ResetCallbackInfoWorker(callbackInfo);
151         auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
152         napi_value args;
153         napi_value result;
154         status = napi_deserialize(env, resultInfo->serializationArgs, &args);
155         napi_delete_serialization_data(env, resultInfo->serializationArgs);
156         if (status != napi_ok || args == nullptr) {
157             std::string errMessage = "taskpool:: failed to serialize function";
158             HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
159             ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
160             continue;
161         }
162         uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
163         napi_value argsArray[argsNum];
164         for (size_t i = 0; i < argsNum; i++) {
165             argsArray[i] = NapiHelper::GetElement(env, args, i);
166         }
167         napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
168         if (NapiHelper::IsExceptionPending(env)) {
169             napi_value exception = nullptr;
170             napi_get_and_clear_last_exception(env, &exception);
171             HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
172         }
173     }
174 }
175 // ---------------------------------- SendData ---------------------------------------
176 
177 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
178 {
179     napi_value result = nullptr;
180     napi_create_object(env, &result);
181     napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
182     napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
183     napi_set_named_property(env, result, "threadInfos", threadInfos);
184     napi_set_named_property(env, result, "taskInfos", taskInfos);
185     return result;
186 }
187 
TerminateTask(napi_env env,napi_callback_info cbinfo)188 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
189 {
190     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
191     size_t argc = 1; // 1: long task
192     napi_value args[1];
193     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
194     if (argc < 1) {
195         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
196             "the number of the params must be one.");
197         return nullptr;
198     }
199     if (!NapiHelper::IsObject(env, args[0])) {
200         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
201             "the type of the params must be object.");
202         return nullptr;
203     }
204     napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
205     uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
206     auto task = TaskManager::GetInstance().GetTask(taskId);
207     if (task == nullptr || !task->IsLongTask()) {
208         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
209             "the type of the params must be long task.");
210         return nullptr;
211     }
212     TaskManager::GetInstance().TerminateTask(taskId);
213     return nullptr;
214 }
215 
Execute(napi_env env,napi_callback_info cbinfo)216 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
217 {
218     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
219     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
220     if (argc < 1) {
221         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
222             "the number of params must be at least one.");
223         return nullptr;
224     }
225     napi_value* args = new napi_value[argc];
226     ObjectScope<napi_value> scope(args, true);
227     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
228     napi_valuetype type = napi_undefined;
229     napi_typeof(env, args[0], &type);
230     if (type == napi_object) {
231         uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
232         if (argc > 1) {
233             if (!NapiHelper::IsNumber(env, args[1])) {
234                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
235                     "the type of the second param must be number.");
236                 return nullptr;
237             }
238             priority = NapiHelper::GetUint32Value(env, args[1]);
239             if (priority >= Priority::NUMBER) {
240                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
241                 return nullptr;
242             }
243         }
244         if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
245             return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
246         }
247         Task* task = nullptr;
248         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
249         if (task == nullptr) {
250             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
251                 "the type of the first param must be task.");
252             return nullptr;
253         }
254         if (!task->CanExecute(env)) {
255             return nullptr;
256         }
257         napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
258                                                       static_cast<Priority>(priority));
259         if (promise == nullptr) {
260             return nullptr;
261         }
262         ExecuteTask(env, task, static_cast<Priority>(priority));
263         return promise;
264     }
265     if (type != napi_function) {
266         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
267             "the type of the first param must be object or function.");
268         return nullptr;
269     }
270     Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
271     if (task == nullptr) {
272         HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
273         return nullptr;
274     }
275     TaskManager::GetInstance().StoreTask(task->taskId_, task);
276     napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
277     ExecuteTask(env, task);
278     return promise;
279 }
280 
DelayTask(uv_timer_t * handle)281 void TaskPool::DelayTask(uv_timer_t* handle)
282 {
283     TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data);
284     auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
285     napi_status status = napi_ok;
286     if (task == nullptr) {
287         HILOG_DEBUG("taskpool:: task is nullptr");
288     } else if (task->taskState_ == ExecuteState::CANCELED) {
289         HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
290         HandleScope scope(task->env_, status);
291         if (status != napi_ok) {
292             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
293             return;
294         }
295         napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
296         napi_reject_deferred(task->env_, taskMessage->deferred, error);
297     } else {
298         HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
299         HandleScope scope(task->env_, status);
300         if (status != napi_ok) {
301             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
302             return;
303         }
304         TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
305         task->IncreaseRefCount();
306         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
307         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
308         if (taskInfo != nullptr) {
309             taskInfo->deferred = taskMessage->deferred;
310             if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
311                 task->taskState_ = ExecuteState::WAITING;
312                 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
313             }
314         } else {
315             napi_value execption = nullptr;
316             napi_get_and_clear_last_exception(task->env_, &execption);
317             if (execption != nullptr) {
318                 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
319             }
320         }
321     }
322     if (task != nullptr) {
323         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
324         task->delayedTimers_.erase(handle);
325     }
326     uv_timer_stop(handle);
327     ConcurrentHelper::UvHandleClose(handle);
328     delete taskMessage;
329     taskMessage = nullptr;
330 }
331 
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)332 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
333 {
334     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
335     uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
336     int32_t delayTime = 0;
337     Task* task = nullptr;
338     if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
339         return nullptr;
340     }
341 
342     if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
343         task->taskState_ == ExecuteState::FINISHED) {
344         task->taskState_ = ExecuteState::DELAYED;
345     }
346     task->UpdateTaskType(TaskType::COMMON_TASK);
347 
348     uv_loop_t* loop = NapiHelper::GetLibUV(env);
349     uv_update_time(loop);
350     uv_timer_t* timer = new uv_timer_t;
351     uv_timer_init(loop, timer);
352     TaskMessage *taskMessage = new TaskMessage();
353     taskMessage->priority = static_cast<Priority>(priority);
354     taskMessage->taskId = task->taskId_;
355     napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
356     timer->data = taskMessage;
357 
358     std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
359     strTrace += ", priority: " + std::to_string(priority);
360     strTrace += ", delayTime " + std::to_string(delayTime);
361     HITRACE_HELPER_METER_NAME(strTrace);
362     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
363 
364     uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
365     {
366         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
367         task->delayedTimers_.insert(timer);
368     }
369     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
370     if (engine->IsMainThread()) {
371         uv_async_send(&loop->wq_async);
372     } else {
373         uv_work_t *work = new uv_work_t;
374         uv_queue_work_with_qos(loop, work, [](uv_work_t *) {},
375                                [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated);
376     }
377     return promise;
378 }
379 
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)380 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
381 {
382     napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
383     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
384     HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
385     auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
386     napi_reference_ref(env, taskGroup->groupRef_, nullptr);
387     if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
388         taskGroup->groupState_ == ExecuteState::CANCELED) {
389         taskGroup->groupState_ = ExecuteState::WAITING;
390     }
391     GroupInfo* groupInfo = new GroupInfo();
392     groupInfo->priority = priority;
393     napi_value resArr;
394     napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
395     napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
396     groupInfo->resArr = arrRef;
397     napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
398     {
399         std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
400         if (taskGroup->taskNum_ == 0) {
401             napi_resolve_deferred(env, groupInfo->deferred, resArr);
402             taskGroup->groupState_ = ExecuteState::FINISHED;
403             napi_delete_reference(env, groupInfo->resArr);
404             napi_reference_unref(env, taskGroup->groupRef_, nullptr);
405             delete groupInfo;
406             taskGroup->currentGroupInfo_ = nullptr;
407             return promise;
408         }
409         if (taskGroup->currentGroupInfo_ == nullptr) {
410             taskGroup->currentGroupInfo_ = groupInfo;
411             for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
412                 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
413                 Task* task = nullptr;
414                 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
415                 if (task == nullptr) {
416                     HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
417                     return nullptr;
418                 }
419                 napi_reference_ref(env, task->taskRef_, nullptr);
420                 if (task->IsGroupCommonTask()) {
421                     task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
422                 }
423                 ExecuteTask(env, task, static_cast<Priority>(priority));
424             }
425         } else {
426             taskGroup->pendingGroupInfos_.push_back(groupInfo);
427         }
428     }
429     return promise;
430 }
431 
HandleTaskResult(const uv_async_t * req)432 void TaskPool::HandleTaskResult(const uv_async_t* req)
433 {
434     HILOG_DEBUG("taskpool:: HandleTaskResult task");
435     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
436     auto task = static_cast<Task*>(req->data);
437     if (task == nullptr) { // LCOV_EXCL_BR_LINE
438         HILOG_FATAL("taskpool:: HandleTaskResult task is null");
439         return;
440     }
441     if (!task->IsMainThreadTask()) {
442         if (task->ShouldDeleteTask(false)) {
443             delete task;
444             return;
445         }
446         if (task->IsFunctionTask()) {
447             napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
448         }
449     }
450     task->DecreaseTaskRefCount();
451     HandleTaskResultCallback(task);
452 }
453 
HandleTaskResultCallback(Task * task)454 void TaskPool::HandleTaskResultCallback(Task* task)
455 {
456     napi_handle_scope scope = nullptr;
457     NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
458     napi_value napiTaskResult = nullptr;
459     napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
460     napi_delete_serialization_data(task->env_, task->result_);
461 
462     // tag for trace parse: Task PerformTask End
463     std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
464     if (task->taskState_ == ExecuteState::CANCELED) {
465         strTrace += ", performResult : IsCanceled";
466         napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
467     } else if (status != napi_ok) {
468         HILOG_ERROR("taskpool: failed to deserialize result");
469         strTrace += ", performResult : DeserializeFailed";
470     } else if (task->success_) {
471         strTrace += ", performResult : Successful";
472     } else {
473         strTrace += ", performResult : Unsuccessful";
474     }
475     HITRACE_HELPER_METER_NAME(strTrace);
476     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
477     if (napiTaskResult == nullptr) {
478         napi_get_undefined(task->env_, &napiTaskResult);
479     }
480     reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
481     bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
482     task->taskState_ = ExecuteState::ENDING;
483     task->isRunning_ = false;
484     if (task->IsGroupTask()) {
485         UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
486     } else if (!task->IsPeriodicTask()) {
487         if (success) {
488             napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
489             if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
490                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
491             }
492         } else {
493             napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
494             if (task->onExecutionFailedCallBackInfo_ != nullptr) {
495                 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
496                 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
497             }
498         }
499     }
500     NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
501     TriggerTask(task);
502     HILOG_DEBUG("taskpool:: %{public}s", strTrace.c_str());
503 }
504 
TriggerTask(Task * task)505 void TaskPool::TriggerTask(Task* task)
506 {
507     HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
508     if (task->IsGroupTask()) {
509         return;
510     }
511     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
512     task->taskState_ = ExecuteState::FINISHED;
513     // seqRunnerTask will trigger the next
514     if (task->IsSeqRunnerTask()) {
515         if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
516             HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed",
517                         std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
518         }
519     } else if (task->IsCommonTask()) {
520         task->NotifyPendingTask();
521     }
522     if (task->IsPeriodicTask()) {
523         return;
524     }
525     if (!task->IsFunctionTask()) {
526         napi_reference_unref(task->env_, task->taskRef_, nullptr);
527         return;
528     }
529     TaskManager::GetInstance().RemoveTask(task->taskId_);
530     delete task;
531 }
532 
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)533 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
534 {
535     HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
536     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
537     task->taskState_ = ExecuteState::FINISHED;
538     napi_reference_unref(env, task->taskRef_, nullptr);
539     if (task->IsGroupCommonTask()) {
540         delete task->currentTaskInfo_;
541         task->currentTaskInfo_ = nullptr;
542     }
543     TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
544     if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
545         HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
546         return;
547     }
548     // store the result
549     uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
550     auto groupInfo = taskGroup->currentGroupInfo_;
551     napi_ref arrRef = groupInfo->resArr;
552     napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
553     napi_set_element(env, resArr, index, res);
554     groupInfo->finishedTaskNum++;
555     // store the index when the first exception occurs
556     if (!success && !groupInfo->HasException()) {
557         groupInfo->SetFailedIndex(index);
558     }
559     // we will not handle the result until all tasks are finished
560     if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
561         return;
562     }
563     // if there is no exception, just resolve
564     if (!groupInfo->HasException()) {
565         HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
566         napi_resolve_deferred(env, groupInfo->deferred, resArr);
567         for (uint64_t taskId : taskGroup->taskIds_) {
568             auto task = TaskManager::GetInstance().GetTask(taskId);
569             if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
570                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
571             }
572         }
573     } else {
574         napi_value res = nullptr;
575         napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
576         napi_reject_deferred(env, groupInfo->deferred, res);
577         auto iter = taskGroup->taskIds_.begin();
578         std::advance(iter, groupInfo->GetFailedIndex());
579         auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
580         if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
581             task->onExecutionFailedCallBackInfo_->taskError_ = res;
582             task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
583         }
584     }
585     taskGroup->groupState_ = ExecuteState::FINISHED;
586     napi_delete_reference(env, groupInfo->resArr);
587     napi_reference_unref(env, taskGroup->groupRef_, nullptr);
588     delete groupInfo;
589     taskGroup->currentGroupInfo_ = nullptr;
590     taskGroup->NotifyGroupTask(env);
591 }
592 
ExecuteTask(napi_env env,Task * task,Priority priority)593 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
594 {
595     // tag for trace parse: Task Allocation
596     std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
597         + ", priority : " + std::to_string(priority)
598         + ", executeState : " + std::to_string(ExecuteState::WAITING);
599     HITRACE_HELPER_METER_NAME(strTrace);
600     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
601     task->IncreaseRefCount();
602     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
603     if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING &&
604         task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) {
605         task->taskState_ = ExecuteState::WAITING;
606         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
607     }
608 }
609 
Cancel(napi_env env,napi_callback_info cbinfo)610 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
611 {
612     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
613     size_t argc = 1;
614     napi_value args[1];
615     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
616     if (argc < 1) {
617         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
618             "the number of the params must be 1.");
619         return nullptr;
620     }
621 
622     if (!NapiHelper::IsObject(env, args[0])) {
623         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
624             "the type of the params must be object.");
625         return nullptr;
626     }
627 
628     if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
629         napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
630         if (napiTaskId == nullptr) {
631             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
632                 "the type of the params must be task.");
633             return nullptr;
634         }
635         uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
636         TaskManager::GetInstance().CancelTask(env, taskId);
637     } else {
638         napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
639         if (napiGroupId == nullptr) {
640             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
641                 "the type of the params must be taskGroup.");
642             return nullptr;
643         }
644         uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
645         TaskGroupManager::GetInstance().CancelGroup(env, groupId);
646     }
647     return nullptr;
648 }
649 
IsConcurrent(napi_env env,napi_callback_info cbinfo)650 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
651 {
652     size_t argc = 1;
653     napi_value args[1];
654     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
655     if (argc != 1) {
656         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
657             "the number of the params must be 1.");
658         return nullptr;
659     }
660 
661     if (!NapiHelper::IsFunction(env, args[0])) {
662         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
663             "the type of the first param must be function.");
664         return nullptr;
665     }
666 
667     bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
668     return NapiHelper::CreateBooleanValue(env, isConcurrent);
669 }
670 
PeriodicTaskCallback(uv_timer_t * handle)671 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
672 {
673     Task* task = reinterpret_cast<Task*>(handle->data);
674     if (task == nullptr) {
675         HILOG_DEBUG("taskpool:: the task is nullptr");
676         return;
677     } else if (!task->IsPeriodicTask()) {
678         HILOG_DEBUG("taskpool:: the current task is not a periodic task");
679         return;
680     } else if (task->taskState_ == ExecuteState::CANCELED) {
681         HILOG_DEBUG("taskpool:: the periodic task has been canceled");
682         return;
683     }
684     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
685 
686     if (!task->isFirstTaskInfo_) {
687         napi_status status = napi_ok;
688         HandleScope scope(task->env_, status);
689         if (status != napi_ok) {
690             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
691             return;
692         }
693         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
694         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
695         if (taskInfo == nullptr) {
696             HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
697             return;
698         }
699     }
700     task->isFirstTaskInfo_ = false;
701 
702     task->IncreaseRefCount();
703     HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
704     if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
705         task->taskState_ = ExecuteState::WAITING;
706         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
707     }
708 }
709 
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)710 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
711 {
712     int32_t period = 0;
713     uint32_t priority = Priority::DEFAULT;
714     Task* periodicTask = nullptr;
715     if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
716         return nullptr;
717     }
718 
719     if (!periodicTask->CanExecutePeriodically(env)) {
720         return nullptr;
721     }
722     periodicTask->UpdatePeriodicTask();
723 
724     periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
725     napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
726     TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
727     if (taskInfo == nullptr) {
728         return nullptr;
729     }
730 
731     periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
732 
733     TriggerTimer(env, periodicTask, period);
734     return nullptr;
735 }
736 
TriggerTimer(napi_env env,Task * task,int32_t period)737 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
738 {
739     HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
740     uv_loop_t* loop = NapiHelper::GetLibUV(env);
741     task->timer_ = new uv_timer_t;
742     uv_timer_init(loop, task->timer_);
743     task->timer_->data = task;
744     uv_update_time(loop);
745     uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
746     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
747     if (engine->IsMainThread()) {
748         uv_async_send(&loop->wq_async);
749     } else {
750         uv_work_t* work = new uv_work_t;
751         uv_queue_work_with_qos(loop, work, [](uv_work_t*) {},
752                                [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated);
753     }
754 }
755 
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)756 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
757                                   Task* &task)
758 {
759     size_t argc = 3; // 3: delayTime, task and priority
760     napi_value args[3]; // 3: delayTime, task and priority
761     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
762     if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
763         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
764             "the number of params must be two or three.");
765         return false;
766     }
767 
768     if (!NapiHelper::IsNumber(env, args[0])) {
769         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
770             "the type of the first param must be number.");
771         return false;
772     }
773 
774     if (!NapiHelper::IsObject(env, args[1])) {
775         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
776             "the type of the second param must be object.");
777         return false;
778     }
779 
780     delayTime = NapiHelper::GetInt32Value(env, args[0]);
781     if (delayTime < 0) {
782         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
783         return false;
784     }
785 
786     if (argc > 2) { // 2: the params might have priority
787         if (!NapiHelper::IsNumber(env, args[2])) {
788             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
789                 "the type of the third param must be number.");
790             return false;
791         }
792         priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
793         if (priority >= Priority::NUMBER) {
794             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
795             return false;
796         }
797     }
798 
799     napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
800     if (task == nullptr) {
801         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
802             "the type of second param must be task");
803         return false;
804     }
805     if (!task->CanExecuteDelayed(env)) {
806         return false;
807     }
808     return true;
809 }
810 
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)811 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
812                                        uint32_t &priority, Task* &periodicTask)
813 {
814     size_t argc = 3; // 3 : period, task, priority
815     napi_value args[3]; // 3 : period, task, priority
816     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
817     if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
818         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
819         return false;
820     }
821     if (!NapiHelper::IsNumber(env, args[0])) {
822         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
823         return false;
824     }
825     period = NapiHelper::GetInt32Value(env, args[0]);
826     if (period < 0) {
827         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
828         return false;
829     }
830     if (!NapiHelper::IsObject(env, args[1])) {
831         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
832         return false;
833     }
834 
835     if (argc >= 3) { // 3 : third param maybe priority
836         if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
837             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
838             return false;
839         }
840         priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
841         if (priority >= Priority::NUMBER) {
842             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
843             return false;
844         }
845     }
846 
847     napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
848     if (periodicTask == nullptr) {
849         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
850         return false;
851     }
852 
853     return true;
854 }
855 } // namespace Commonlibrary::Concurrent::TaskPoolModule
856