1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "datashare_uv_queue.h"
17 #include <thread>
18 #include <chrono>
19 #include "datashare_log.h"
20 
21 namespace OHOS {
22 namespace DataShare {
23 using namespace std::chrono;
24 constexpr int WAIT_TIME = 3;
25 constexpr int SLEEP_TIME = 1;
26 constexpr int TRY_TIMES = 2000;
DataShareUvQueue(napi_env env)27 DataShareUvQueue::DataShareUvQueue(napi_env env)
28     : env_(env)
29 {
30     napi_get_uv_event_loop(env, &loop_);
31 }
32 
LambdaForWork(uv_work_t * work,int uvstatus)33 void DataShareUvQueue::LambdaForWork(uv_work_t *work, int uvstatus)
34 {
35     if (UV_ECANCELED == uvstatus || work == nullptr || work->data == nullptr) {
36         LOG_ERROR("invalid work or work->data.");
37         DataShareUvQueue::Purge(work);
38         return;
39     }
40     auto *entry = static_cast<UvEntry*>(work->data);
41     {
42         std::unique_lock<std::mutex> lock(entry->mutex);
43         if (entry->func) {
44             entry->func();
45         }
46         entry->done = true;
47         entry->condition.notify_all();
48     }
49     DataShareUvQueue::Purge(work);
50 }
51 
SyncCall(NapiVoidFunc func,NapiBoolFunc retFunc)52 void DataShareUvQueue::SyncCall(NapiVoidFunc func, NapiBoolFunc retFunc)
53 {
54     uv_work_t* work = new (std::nothrow) uv_work_t;
55     if (work == nullptr) {
56         LOG_ERROR("invalid work.");
57         return;
58     }
59     work->data = new UvEntry {env_, std::move(func), false, {}, {}, std::atomic<int>(1)};
60     if (work->data == nullptr) {
61         delete work;
62         LOG_ERROR("invalid uvEntry.");
63         return;
64     }
65 
66     auto *uvEntry = static_cast<UvEntry*>(work->data);
67     {
68         std::unique_lock<std::mutex> lock(uvEntry->mutex);
69         uvEntry->count.fetch_add(1);
70         auto status = uv_queue_work(
71             loop_, work, [](uv_work_t *work) {}, LambdaForWork);
72         if (status != napi_ok) {
73             LOG_ERROR("queue work failed");
74             DataShareUvQueue::Purge(work);
75             return;
76         }
77         if (uvEntry->condition.wait_for(lock, std::chrono::seconds(WAIT_TIME), [uvEntry] { return uvEntry->done; })) {
78             auto time = static_cast<uint64_t>(duration_cast<milliseconds>(
79                 system_clock::now().time_since_epoch()).count());
80             LOG_INFO("function ended successfully. times %{public}" PRIu64 ".", time);
81         }
82         if (!uvEntry->done && uv_cancel((uv_req_t*)work) != napi_ok) {
83             LOG_ERROR("uv_cancel failed.");
84         }
85     }
86 
87     CheckFuncAndExec(retFunc);
88     DataShareUvQueue::Purge(work);
89 }
90 
Purge(uv_work_t * work)91 void DataShareUvQueue::Purge(uv_work_t* work)
92 {
93     if (work == nullptr) {
94         LOG_ERROR("invalid work");
95         return;
96     }
97     if (work->data == nullptr) {
98         LOG_ERROR("invalid work->data");
99         delete work;
100         return;
101     }
102 
103     auto *entry = static_cast<UvEntry*>(work->data);
104     auto count = entry->count.fetch_sub(1);
105     if (count != 1) {
106         return;
107     }
108 
109     delete entry;
110     entry = nullptr;
111 
112     delete work;
113     work = nullptr;
114 }
115 
CheckFuncAndExec(NapiBoolFunc retFunc)116 void DataShareUvQueue::CheckFuncAndExec(NapiBoolFunc retFunc)
117 {
118     if (retFunc) {
119         int tryTimes = TRY_TIMES;
120         while (retFunc() != true && tryTimes > 0) {
121             std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_TIME));
122             tryTimes--;
123         }
124         if (tryTimes <= 0) {
125             LOG_ERROR("function execute timeout.");
126         }
127     }
128 }
129 } // namespace DataShare
130 } // namespace OHOS