1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_group.h"
17 
18 #include "helper/error_helper.h"
19 #include "helper/napi_helper.h"
20 #include "helper/object_helper.h"
21 #include "napi/native_api.h"
22 #include "tools/log.h"
23 
24 namespace Commonlibrary::Concurrent::TaskPoolModule {
25 using namespace Commonlibrary::Concurrent::Common::Helper;
26 
TaskGroupConstructor(napi_env env,napi_callback_info cbinfo)27 napi_value TaskGroup::TaskGroupConstructor(napi_env env, napi_callback_info cbinfo)
28 {
29     size_t argc = 1;
30     napi_value args[1];
31     napi_value thisVar;
32     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
33     if (argc > 1) {
34         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
35             "the number of params must be zero or one.");
36         return nullptr;
37     }
38     napi_value name;
39     if (argc == 1) {
40         // check 1st param is taskGroupName
41         if (!NapiHelper::IsString(env, args[0])) {
42             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
43                 "the first param must be string.");
44             return nullptr;
45         }
46         name = args[0];
47     } else {
48         name = NapiHelper::CreateEmptyString(env);
49     }
50     TaskGroup* group = new TaskGroup();
51     uint64_t groupId = reinterpret_cast<uint64_t>(group);
52     group->groupId_ = groupId;
53     TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group);
54     napi_value napiGroupId = NapiHelper::CreateUint64(env, groupId);
55     napi_property_descriptor properties[] = {
56         DECLARE_NAPI_PROPERTY(GROUP_ID_STR, napiGroupId),
57         DECLARE_NAPI_FUNCTION_WITH_DATA("addTask", AddTask, thisVar),
58     };
59     napi_set_named_property(env, thisVar, NAME, name);
60     napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
61     napi_status status = napi_wrap(env, thisVar, group, TaskGroupDestructor, nullptr, nullptr);
62     if (status != napi_ok) {
63         HILOG_ERROR("taskpool::TaskGroupConstructor napi_wrap return value is %{public}d", status);
64         TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
65         delete group;
66         group = nullptr;
67         return nullptr;
68     }
69     napi_create_reference(env, thisVar, 0, &group->groupRef_);
70     napi_add_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
71     return thisVar;
72 }
73 
74 void TaskGroup::TaskGroupDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
75 {
76     HILOG_DEBUG("taskpool::TaskGroupDestructor");
77     TaskGroup* group = static_cast<TaskGroup*>(data);
78     napi_remove_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
79     TaskGroupManager::GetInstance().ReleaseTaskGroupData(env, group);
80     napi_delete_reference(env, group->groupRef_);
81     delete group;
82 }
83 
AddTask(napi_env env,napi_callback_info cbinfo)84 napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo)
85 {
86     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
87     std::string errMessage = "";
88     if (argc < 1) {
89         errMessage = "taskGroup:: the number of params must be at least one";
90         HILOG_ERROR("%{public}s", errMessage.c_str());
91         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
92             "the number of params must be at least one.");
93         return nullptr;
94     }
95     napi_value* args = new napi_value[argc];
96     ObjectScope<napi_value> scope(args, true);
97     napi_value thisVar;
98     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
99     napi_value napiGroupId = NapiHelper::GetNameProperty(env, thisVar, GROUP_ID_STR);
100     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
101     TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
102     if (group->groupState_ != ExecuteState::NOT_FOUND) {
103         errMessage = "taskpool:: executed taskGroup cannot addTask";
104         HILOG_ERROR("%{public}s", errMessage.c_str());
105         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
106         return nullptr;
107     }
108     napi_valuetype type = napi_undefined;
109     napi_typeof(env, args[0], &type);
110     if (type == napi_object) {
111         Task* task = nullptr;
112         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
113         if (task == nullptr) {
114             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
115                 "the type of the params must be task.");
116             return nullptr;
117         }
118         if (!task->CanForTaskGroup(env)) {
119             return nullptr;
120         }
121         task->taskType_ = TaskType::GROUP_COMMON_TASK;
122         task->groupId_ = groupId;
123         napi_reference_ref(env, task->taskRef_, nullptr);
124         TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
125         return nullptr;
126     } else if (type == napi_function) {
127         napi_value napiTask = NapiHelper::CreateObject(env);
128         Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK);
129         if (task == nullptr) {
130             return nullptr;
131         }
132         task->groupId_ = groupId;
133         napi_status status = napi_wrap(env, napiTask, task, Task::TaskDestructor, nullptr, nullptr);
134         if (status != napi_ok) {
135             HILOG_ERROR("taskpool::AddTask napi_wrap return value is %{public}d", status);
136             delete task;
137             task = nullptr;
138             return nullptr;
139         }
140         TaskManager::GetInstance().StoreTask(task->taskId_, task);
141         napi_create_reference(env, napiTask, 1, &task->taskRef_);
142         TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
143         return nullptr;
144     }
145     ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
146         "the type of the first param must be object or function.");
147     return nullptr;
148 }
149 
HostEnvCleanupHook(void * data)150 void TaskGroup::HostEnvCleanupHook(void* data)
151 {
152     if (data == nullptr) {
153         HILOG_ERROR("taskpool:: taskGroup cleanupHook arg is nullptr");
154         return;
155     }
156     TaskGroup* group = static_cast<TaskGroup*>(data);
157     group->isValid_ = false;
158 }
159 
GetTaskIndex(uint32_t taskId)160 uint32_t TaskGroup::GetTaskIndex(uint32_t taskId)
161 {
162     uint32_t index = 0;
163     for (uint32_t id : taskIds_) {
164         if (taskId == id) {
165             break;
166         }
167         index++;
168     }
169     return index;
170 }
171 
NotifyGroupTask(napi_env env)172 void TaskGroup::NotifyGroupTask(napi_env env)
173 {
174     HILOG_DEBUG("taskpool:: NotifyGroupTask");
175     std::lock_guard<RECURSIVE_MUTEX> lock(taskGroupMutex_);
176     if (pendingGroupInfos_.empty()) {
177         return;
178     }
179     groupState_ = ExecuteState::WAITING;
180     currentGroupInfo_ = pendingGroupInfos_.front();
181     pendingGroupInfos_.pop_front();
182     for (auto iter = taskRefs_.begin(); iter != taskRefs_.end(); iter++) {
183         napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
184         Task* task = nullptr;
185         napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
186         if (task == nullptr) {
187             HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
188             return;
189         }
190         napi_reference_ref(env, task->taskRef_, nullptr);
191         Priority priority = currentGroupInfo_->priority;
192         if (task->IsGroupCommonTask()) {
193             task->GetTaskInfo(env, napiTask, priority);
194         } else {
195             reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
196         }
197         task->IncreaseRefCount();
198         TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
199         task->taskState_ = ExecuteState::WAITING;
200         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
201     }
202 }
203 
CancelPendingGroup(napi_env env)204 void TaskGroup::CancelPendingGroup(napi_env env)
205 {
206     HILOG_DEBUG("taskpool:: CancelPendingGroup");
207     if (pendingGroupInfos_.empty()) {
208         return;
209     }
210     napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
211     auto pendingIter = pendingGroupInfos_.begin();
212     auto engine = reinterpret_cast<NativeEngine*>(env);
213     for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) {
214         for (size_t i = 0; i < taskIds_.size(); i++) {
215             engine->DecreaseSubEnvCounter();
216         }
217         GroupInfo* info = *pendingIter;
218         napi_reject_deferred(env, info->deferred, error);
219         napi_reference_unref(env, groupRef_, nullptr);
220         delete info;
221     }
222     pendingIter = pendingGroupInfos_.begin();
223     pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end());
224 }
225 
CancelGroupTask(napi_env env,uint64_t taskId)226 void TaskGroup::CancelGroupTask(napi_env env, uint64_t taskId)
227 {
228     TaskGroupManager::GetInstance().CancelGroupTask(env, taskId, this);
229     if (currentGroupInfo_ != nullptr && currentGroupInfo_->finishedTaskNum == taskNum_) {
230         napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
231         RejectResult(env, error);
232     }
233 }
234 
RejectResult(napi_env env,napi_value res)235 void TaskGroup::RejectResult(napi_env env, napi_value res)
236 {
237     napi_reject_deferred(env, currentGroupInfo_->deferred, res);
238     napi_delete_reference(env, currentGroupInfo_->resArr);
239     napi_reference_unref(env, groupRef_, nullptr);
240     delete currentGroupInfo_;
241     currentGroupInfo_ = nullptr;
242 }
243 } // namespace Commonlibrary::Concurrent::TaskPoolModule