1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cinttypes>
17 
18 #include "native_safe_async_work.h"
19 
20 #include "ecmascript/napi/include/jsnapi.h"
21 #include "napi/native_api.h"
22 #include "native_api_internal.h"
23 #include "native_async_work.h"
24 #include "native_engine.h"
25 #include "native_value.h"
26 #include "securec.h"
27 #include "utils/log.h"
28 
29 #ifdef ENABLE_CONTAINER_SCOPE
30 #include "core/common/container_scope.h"
31 #endif
32 
33 #ifdef ENABLE_CONTAINER_SCOPE
34 using OHOS::Ace::ContainerScope;
35 #endif
36 
37 #if defined(ENABLE_EVENT_HANDLER)
38 #include "event_handler.h"
39 using namespace OHOS::AppExecFwk;
40 #endif
41 
42 // static methods start
AsyncCallback(uv_async_t * asyncHandler)43 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
44 {
45     HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
46     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
47     if (that == nullptr) {
48         HILOG_ERROR("NativeSafeAsyncWork:: DereferenceOf failed!");
49         return;
50     }
51     that->ProcessAsyncHandle();
52 }
53 
CallJs(NativeEngine * engine,napi_value js_call_func,void * context,void * data)54 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
55 {
56     if (engine == nullptr || js_call_func == nullptr) {
57         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
58         return;
59     }
60     napi_value value = nullptr;
61     napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
62     if (value == nullptr) {
63         HILOG_ERROR("CreateUndefined failed");
64         return;
65     }
66 
67     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
68     if (resultValue == nullptr) {
69         HILOG_ERROR("CallFunction failed");
70     }
71 }
72 
NativeSafeAsyncWork(NativeEngine * engine,napi_value func,napi_value asyncResource,napi_value asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)73 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
74                                          napi_value func,
75                                          napi_value asyncResource,
76                                          napi_value asyncResourceName,
77                                          size_t maxQueueSize,
78                                          size_t threadCount,
79                                          void* finalizeData,
80                                          NativeFinalize finalizeCallback,
81                                          void* context,
82                                          NativeThreadSafeFunctionCallJs callJsCallback)
83     :engine_(engine), engineId_(engine->GetId()), maxQueueSize_(maxQueueSize),
84     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
85     context_(context), callJsCallback_(callJsCallback)
86 {
87     asyncContext_.napiAsyncResource = asyncResource;
88     asyncContext_.napiAsyncResourceName = asyncResourceName;
89 
90     errno_t err = EOK;
91     err = memset_s(&asyncHandler_, sizeof(asyncHandler_), 0, sizeof(asyncHandler_));
92     if (err != EOK) {
93         HILOG_ERROR("faild to init asyncHandler_");
94         return;
95     }
96 
97     if (func != nullptr) {
98         uint32_t initialRefcount = 1;
99         ref_ = engine->CreateReference(func, initialRefcount);
100     }
101 
102 #ifdef ENABLE_CONTAINER_SCOPE
103     containerScopeId_ = ContainerScope::CurrentId();
104 #endif
105 
106 #if defined(ENABLE_EVENT_HANDLER)
107     std::shared_ptr<EventRunner> runner = EventRunner::Current();
108     if (runner != nullptr) {
109         eventHandler_ = std::make_shared<EventHandler>(runner);
110     }
111 #endif
112 }
113 
~NativeSafeAsyncWork()114 NativeSafeAsyncWork::~NativeSafeAsyncWork()
115 {
116     if (ref_ != nullptr) {
117         delete ref_;
118         ref_ = nullptr;
119     }
120 }
121 
Init()122 bool NativeSafeAsyncWork::Init()
123 {
124     HILOG_DEBUG("NativeSafeAsyncWork::Init called");
125 
126     uv_loop_t* loop = engine_->GetUVLoop();
127     if (loop == nullptr) {
128         HILOG_ERROR("Get loop failed");
129         return false;
130     }
131 
132     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
133     if (ret != 0) {
134         HILOG_ERROR("uv async send failed in Init ret = %{public}d", ret);
135         return false;
136     }
137 
138     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
139     return true;
140 }
141 
IsMaxQueueSize()142 bool NativeSafeAsyncWork::IsMaxQueueSize()
143 {
144     return (queue_.size() > maxQueueSize_ &&
145            maxQueueSize_ > 0 &&
146            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
147            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
148 }
149 
ValidEngineCheck()150 SafeAsyncCode NativeSafeAsyncWork::ValidEngineCheck()
151 {
152     if (!NativeEngine::IsAlive(engine_)) {
153         HILOG_ERROR("napi_env has been destoryed");
154         return SafeAsyncCode::SAFE_ASYNC_FAILED;
155     } else if (engineId_ != engine_->GetId()) {
156         LOG_IF_SPECIAL(UNLIKELY(engine_->IsCrossThreadCheckEnabled()),
157                        "current tsfn was created by dead env, "
158                        "owner id: %{public}" PRIu64 ", current id: %{public}" PRIu64,
159                        engineId_, engine_->GetId());
160         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
161     }
162     return SafeAsyncCode::SAFE_ASYNC_OK;
163 }
164 
Send(void * data,NativeThreadSafeFunctionCallMode mode)165 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
166 {
167     std::unique_lock<std::mutex> lock(mutex_);
168     if (IsMaxQueueSize()) {
169         HILOG_INFO("queue size bigger than max queue size");
170         if (mode == NATIVE_TSFUNC_BLOCKING) {
171             while (IsMaxQueueSize()) {
172                 condition_.wait(lock);
173             }
174         } else {
175             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
176         }
177     }
178 
179     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
180         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
181         if (threadCount_ == 0) {
182             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
183         } else {
184             threadCount_--;
185             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
186         }
187     } else {
188         SafeAsyncCode checkRet = ValidEngineCheck();
189         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
190             return checkRet;
191         }
192         queue_.emplace(data);
193         auto ret = uv_async_send(&asyncHandler_);
194         if (ret != 0) {
195             HILOG_ERROR("uv async send failed in Send ret = %{public}d", ret);
196             return SafeAsyncCode::SAFE_ASYNC_FAILED;
197         }
198     }
199 
200     return SafeAsyncCode::SAFE_ASYNC_OK;
201 }
202 
Acquire()203 SafeAsyncCode NativeSafeAsyncWork::Acquire()
204 {
205     HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
206 
207     std::unique_lock<std::mutex> lock(mutex_);
208 
209     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
210         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
211         HILOG_WARN("Do not acquire, thread is closed!");
212         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
213     }
214 
215     // increase thread count
216     threadCount_++;
217 
218     return SafeAsyncCode::SAFE_ASYNC_OK;
219 }
220 
Release(NativeThreadSafeFunctionReleaseMode mode)221 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
222 {
223     HILOG_DEBUG("NativeSafeAsyncWork::Release called");
224 
225     std::unique_lock<std::mutex> lock(mutex_);
226 
227     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
228         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
229         HILOG_WARN("Do not release, thread is closed!");
230         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
231     }
232 
233     if (threadCount_ == 0) {
234         HILOG_ERROR("Do not release, thread count is zero.");
235         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
236     }
237 
238     // decrease thread count
239     threadCount_--;
240 
241     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
242         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
243         if (maxQueueSize_ > 0) {
244             condition_.notify_one();
245         }
246     }
247 
248     if (threadCount_ == 0 ||
249         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
250         SafeAsyncCode checkRet = ValidEngineCheck();
251         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
252             return checkRet;
253         }
254         // trigger async handle
255         auto ret = uv_async_send(&asyncHandler_);
256         if (ret != 0) {
257             HILOG_ERROR("uv async send failed in Release ret = %{public}d", ret);
258             return SafeAsyncCode::SAFE_ASYNC_FAILED;
259         }
260     }
261 
262     return SafeAsyncCode::SAFE_ASYNC_OK;
263 }
264 
Ref()265 bool NativeSafeAsyncWork::Ref()
266 {
267     if (!IsSameTid()) {
268         HILOG_ERROR("tid not same");
269         return false;
270     }
271 
272     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
273 
274     return true;
275 }
276 
Unref()277 bool NativeSafeAsyncWork::Unref()
278 {
279     if (!IsSameTid()) {
280         HILOG_ERROR("tid not same");
281         return false;
282     }
283 
284     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
285 
286     return true;
287 }
288 
GetContext()289 void* NativeSafeAsyncWork::GetContext()
290 {
291     return context_;
292 }
293 
ProcessAsyncHandle()294 void NativeSafeAsyncWork::ProcessAsyncHandle()
295 {
296     std::unique_lock<std::mutex> lock(mutex_);
297     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
298         HILOG_ERROR("Process failed, thread is closed!");
299         return;
300     }
301 
302     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
303         HILOG_ERROR("thread is closing!");
304         CloseHandles();
305         return;
306     }
307 
308     size_t size = queue_.size();
309     void* data = nullptr;
310 
311     auto vm = engine_->GetEcmaVm();
312     panda::LocalScope scope(vm);
313 #ifdef ENABLE_CONTAINER_SCOPE
314     ContainerScope containerScope(containerScopeId_);
315 #endif
316     TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
317     while (size > 0) {
318         data = queue_.front();
319 
320         // when queue is full, notify send.
321         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
322             condition_.notify_one();
323         }
324         napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get(engine_);
325         lock.unlock();
326         if (callJsCallback_ != nullptr) {
327             callJsCallback_(engine_, func_, context_, data);
328         } else {
329             CallJs(engine_, func_, context_, data);
330         }
331         lock.lock();
332 
333         if (tryCatch.HasCaught()) {
334             engine_->HandleUncaughtException();
335         }
336         queue_.pop();
337         size--;
338     }
339 
340     if (!queue_.empty()) {
341         auto ret = uv_async_send(&asyncHandler_);
342         if (ret != 0) {
343             HILOG_ERROR("uv async send failed in ProcessAsyncHandle ret = %{public}d", ret);
344         }
345     }
346 
347     if (queue_.empty() && threadCount_ == 0) {
348         CloseHandles();
349     }
350 }
351 
CloseHandles()352 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
353 {
354     HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
355 
356     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
357         HILOG_INFO("Close failed, thread is closed!");
358         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
359     }
360 
361     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
362 
363     // close async handler
364     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
365         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
366             reinterpret_cast<uv_async_t*>(handle));
367         that->CleanUp();
368     });
369 
370     return SafeAsyncCode::SAFE_ASYNC_OK;
371 }
372 
CleanUp()373 void NativeSafeAsyncWork::CleanUp()
374 {
375     HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
376 
377     if (finalizeCallback_ != nullptr) {
378         finalizeCallback_(engine_, finalizeData_, context_);
379     }
380 
381     // clean data
382     while (!queue_.empty()) {
383         if (callJsCallback_ != nullptr) {
384             callJsCallback_(nullptr, nullptr, context_, queue_.front());
385         } else {
386             CallJs(nullptr, nullptr, context_, queue_.front());
387         }
388         queue_.pop();
389     }
390     delete this;
391 }
392 
IsSameTid()393 bool NativeSafeAsyncWork::IsSameTid()
394 {
395     auto tid = pthread_self();
396     return (tid == engine_->GetTid()) ? true : false;
397 }
398 
PostTask(void * data,int32_t priority,bool isTail)399 napi_status NativeSafeAsyncWork::PostTask(void *data, int32_t priority, bool isTail)
400 {
401 #if defined(ENABLE_EVENT_HANDLER)
402     HILOG_DEBUG("NativeSafeAsyncWork::PostTask called");
403     std::unique_lock<std::mutex> lock(eventHandlerMutex_);
404     if (engine_ == nullptr || eventHandler_ == nullptr) {
405         HILOG_ERROR("post task failed due to nullptr engine or eventHandler");
406         return napi_status::napi_generic_failure;
407     }
408     // the task will be execute at main thread or worker thread
409     auto task = [this, data]() {
410         HILOG_DEBUG("The task is executing in main thread or worker thread");
411         panda::LocalScope scope(this->engine_->GetEcmaVm());
412         napi_value func_ = (this->ref_ == nullptr) ? nullptr : this->ref_->Get(engine_);
413         if (this->callJsCallback_ != nullptr) {
414             this->callJsCallback_(engine_, func_, context_, data);
415         } else {
416             CallJs(engine_, func_, context_, data);
417         }
418     };
419 
420     bool res = false;
421     if (isTail) {
422         HILOG_DEBUG("The task is posted from tail");
423         res = eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority));
424     } else {
425         HILOG_DEBUG("The task is posted from head");
426         res = eventHandler_->PostTaskAtFront(task, std::string(), static_cast<EventQueue::Priority>(priority));
427     }
428 
429     return res ? napi_status::napi_ok : napi_status::napi_generic_failure;
430 #else
431     HILOG_WARN("EventHandler feature is not supported");
432     return napi_status::napi_generic_failure;
433 #endif
434 }
435 
SendEvent(const std::function<void ()> & cb,napi_event_priority priority)436 napi_status NativeSafeAsyncWork::SendEvent(const std::function<void()> &cb, napi_event_priority priority)
437 {
438 #ifdef ENABLE_EVENT_HANDLER
439     if (eventHandler_) {
440         auto task = [eng = engine_, cb]() {
441             auto vm = eng->GetEcmaVm();
442             panda::LocalScope scope(vm);
443             cb();
444         };
445         if (eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority)))
446             return napi_status::napi_ok;
447         else
448             return napi_status::napi_generic_failure;
449     }
450 #endif
451     CallbackWrapper *cbw = new (std::nothrow) CallbackWrapper();
452     if (!cbw) {
453         HILOG_ERROR("malloc failed!");
454         return napi_status::napi_generic_failure;
455     }
456     cbw->cb = cb;
457     auto code = Send(reinterpret_cast<void *>(cbw), NATIVE_TSFUNC_NONBLOCKING);
458 
459     napi_status status = napi_status::napi_ok;
460     switch (code) {
461         case SafeAsyncCode::SAFE_ASYNC_OK:
462             status = napi_status::napi_ok;
463             break;
464         case SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL:
465             status = napi_status::napi_queue_full;
466             break;
467         case SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS:
468             status = napi_status::napi_invalid_arg;
469             break;
470         case SafeAsyncCode::SAFE_ASYNC_CLOSED:
471             status = napi_status::napi_closing;
472             break;
473         case SafeAsyncCode::SAFE_ASYNC_FAILED:
474             status = napi_status::napi_generic_failure;
475             break;
476         default:
477             HILOG_FATAL("this branch is unreachable, code is %{public}d", code);
478             status = napi_status::napi_generic_failure;
479             break;
480     }
481     if (status != napi_status::napi_ok) {
482         HILOG_ERROR("send event failed(%{public}d)", status);
483         delete cbw;
484         cbw = nullptr;
485     }
486     return status;
487 }