1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sequence_runner.h"
16 
17 #include <cinttypes>
18 
19 #include "helper/error_helper.h"
20 #include "helper/napi_helper.h"
21 #include "helper/object_helper.h"
22 #include "task_manager.h"
23 #include "tools/log.h"
24 
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27 static constexpr char EXECUTE_STR[] = "execute";
28 static constexpr char SEQ_RUNNER_ID_STR[] = "seqRunnerId";
29 
SeqRunnerConstructorInner(napi_env env,napi_value & thisVar,SequenceRunner * seqRunner)30 bool SequenceRunner::SeqRunnerConstructorInner(napi_env env, napi_value &thisVar, SequenceRunner *seqRunner)
31 {
32     // update seqRunner.seqRunnerId
33     uint64_t seqRunnerId = reinterpret_cast<uint64_t>(seqRunner);
34     napi_value napiSeqRunnerId = NapiHelper::CreateUint64(env, seqRunnerId);
35     TaskGroupManager::GetInstance().StoreSequenceRunner(seqRunnerId, seqRunner);
36     napi_property_descriptor properties[] = {
37         DECLARE_NAPI_PROPERTY(SEQ_RUNNER_ID_STR, napiSeqRunnerId),
38         DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute),
39     };
40     napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
41     HILOG_INFO("taskpool:: construct seqRunner name is %{public}s, seqRunnerid %{public}s.",
42                seqRunner->seqName_.c_str(), std::to_string(seqRunnerId).c_str());
43 
44     seqRunner->seqRunnerId_ = seqRunnerId;
45     napi_status status = napi_wrap(env, thisVar, seqRunner, SequenceRunnerDestructor, nullptr, nullptr);
46     if (status != napi_ok) {
47         HILOG_ERROR("taskpool::SeqRunnerConstructorInner napi_wrap return value is %{public}d", status);
48         SequenceRunnerDestructor(env, seqRunner, nullptr);
49         return false;
50     }
51     return true;
52 }
53 
SeqRunnerConstructor(napi_env env,napi_callback_info cbinfo)54 napi_value SequenceRunner::SeqRunnerConstructor(napi_env env, napi_callback_info cbinfo)
55 {
56     // get input args out of env and cbinfo
57     size_t argc = 2; // 2: The maximum number of parameters is 2
58     napi_value args[2]; // 2: The maximum number of parameters is 2
59     napi_value thisVar;
60     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
61 
62     uint32_t priority = Priority::DEFAULT;
63     std::string name = "";
64     if (argc == 2) { // 2: The number of parameters is 2, if the first is seqRunner name, the second must be priority
65         if (NapiHelper::IsString(env, args[0]) && NapiHelper::IsNumber(env, args[1])) {
66             name = NapiHelper::GetString(env, args[0]);
67             priority = NapiHelper::GetUint32Value(env, args[1]);
68             if (priority >= Priority::NUMBER) {
69                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
70                 return nullptr;
71             }
72         } else {
73             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
74                 "the type of first param must be string and the type of second param must be string.");
75             return nullptr;
76         }
77     } else if (argc == 1) {
78         if (NapiHelper::IsString(env, args[0])) {
79             name = NapiHelper::GetString(env, args[0]);
80         } else if (NapiHelper::IsNumber(env, args[0])) {
81             priority = NapiHelper::GetUint32Value(env, args[0]);
82             if (priority >= Priority::NUMBER) {
83                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
84                 return nullptr;
85             }
86         } else {
87             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
88                 "the type of first param must be string or number.");
89             return nullptr;
90         }
91     }
92 
93     SequenceRunner* seqRunner = nullptr;
94     if (name != "") {
95         seqRunner = SequenceRunnerManager::GetInstance().CreateOrGetGlobalRunner(env, thisVar, argc, name, priority);
96         if (seqRunner == nullptr) {
97             HILOG_ERROR("taskpool:: create or get globalRunner failed");
98             return nullptr;
99         }
100     } else {
101         seqRunner = new SequenceRunner();
102         seqRunner->priority_ = static_cast<Priority>(priority);
103         napi_create_reference(env, thisVar, 0, &seqRunner->seqRunnerRef_);
104     }
105 
106     if (!SeqRunnerConstructorInner(env, thisVar, seqRunner)) {
107         HILOG_ERROR("taskpool:: SeqRunnerConstructorInner failed");
108         return nullptr;
109     }
110     return thisVar;
111 }
112 
Execute(napi_env env,napi_callback_info cbinfo)113 napi_value SequenceRunner::Execute(napi_env env, napi_callback_info cbinfo)
114 {
115     size_t argc = 1;
116     napi_value args[1];
117     napi_value thisVar;
118     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
119     std::string errMessage = "";
120     if (argc < 1) {
121         errMessage = "seqRunner:: number of params at least one";
122         HILOG_ERROR("taskpool::%{public}s", errMessage.c_str());
123         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of param at least one.");
124         return nullptr;
125     }
126     if (!NapiHelper::IsObject(env, args[0]) || !NapiHelper::HasNameProperty(env, args[0], TASKID_STR)) {
127         errMessage = "seqRunner:: first param must be task.";
128         HILOG_ERROR("taskpool::%{public}s", errMessage.c_str());
129         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
130         return nullptr;
131     }
132     napi_value napiSeqRunnerId = NapiHelper::GetNameProperty(env, thisVar, SEQ_RUNNER_ID_STR);
133     uint64_t seqRunnerId = NapiHelper::GetUint64Value(env, napiSeqRunnerId);
134     SequenceRunner* seqRunner = TaskGroupManager::GetInstance().GetSeqRunner(seqRunnerId);
135     if (seqRunner == nullptr) {
136         return nullptr;
137     }
138     Task* task = nullptr;
139     napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
140     if (task == nullptr) {
141         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of param must be task.");
142         return nullptr;
143     }
144     if (!task->CanForSequenceRunner(env)) {
145         return nullptr;
146     }
147     task->seqRunnerId_ = seqRunnerId;
148     napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::SEQRUNNER_TASK, seqRunner->priority_);
149     if (promise == nullptr) {
150         return nullptr;
151     }
152     if (!SequenceRunnerManager::GetInstance().IncreaseGlobalSeqRunner(env, seqRunner)) {
153         return nullptr;
154     }
155     if (seqRunner->currentTaskId_ == 0) {
156         HILOG_INFO("taskpool:: taskId %{public}s in seqRunner %{public}s immediately.",
157                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
158         seqRunner->currentTaskId_ = task->taskId_;
159         task->IncreaseRefCount();
160         task->taskState_ = ExecuteState::WAITING;
161         ExecuteTaskImmediately(task->taskId_, seqRunner->priority_);
162     } else {
163         HILOG_INFO("taskpool:: add taskId: %{public}s to seqRunner %{public}s.",
164                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
165         TaskGroupManager::GetInstance().AddTaskToSeqRunner(seqRunnerId, task);
166     }
167     return promise;
168 }
169 
ExecuteTaskImmediately(uint64_t taskId,Priority priority)170 void SequenceRunner::ExecuteTaskImmediately(uint64_t taskId, Priority priority)
171 {
172     TaskManager::GetInstance().EnqueueTaskId(taskId, priority);
173 }
174 
175 void SequenceRunner::SequenceRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
176 {
177     SequenceRunner* seqRunner = static_cast<SequenceRunner*>(data);
178     if (seqRunner->isGlobalRunner_) {
179         SequenceRunnerManager::GetInstance().GlobalSequenceRunnerDestructor(env, seqRunner);
180     } else {
181         TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
182         napi_delete_reference(env, seqRunner->seqRunnerRef_);
183         delete seqRunner;
184     }
185 }
186 } // namespace Commonlibrary::Concurrent::TaskPoolModule