1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "delegate_tasks.h"
17
18 #include <fcntl.h>
19 #include <sys/syscall.h>
20 #include <unistd.h>
21
22 #include "error_multimodal.h"
23 #include "util.h"
24
25 #undef MMI_LOG_DOMAIN
26 #define MMI_LOG_DOMAIN MMI_LOG_SERVER
27 #undef MMI_LOG_TAG
28 #define MMI_LOG_TAG "DelegateTasks"
29
30 namespace OHOS {
31 namespace MMI {
ProcessTask()32 void DelegateTasks::Task::ProcessTask()
33 {
34 CALL_DEBUG_ENTER;
35 if (hasWaited_) {
36 MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_);
37 return;
38 }
39 int32_t ret = fun_();
40 std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
41 MMI_HILOGD("Process taskType:%{public}s, taskId:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
42 if (!hasWaited_ && promise_ != nullptr) {
43 promise_->set_value(ret);
44 }
45 }
46
~DelegateTasks()47 DelegateTasks::~DelegateTasks()
48 {
49 if (fds_[0] >= 0) {
50 close(fds_[0]);
51 fds_[0] = -1;
52 }
53 if (fds_[1] >= 0) {
54 close(fds_[1]);
55 fds_[1] = -1;
56 }
57 }
58
Init()59 bool DelegateTasks::Init()
60 {
61 CALL_DEBUG_ENTER;
62 if (pipe(fds_) == -1) {
63 MMI_HILOGE("The pipe create failed, errno:%{public}d", errno);
64 return false;
65 }
66 if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
67 MMI_HILOGE("The fcntl read failed, errno:%{public}d", errno);
68 close(fds_[0]);
69 return false;
70 }
71 if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
72 MMI_HILOGE("The fcntl write failed, errno:%{public}d", errno);
73 close(fds_[1]);
74 return false;
75 }
76 return true;
77 }
78
ProcessTasks()79 void DelegateTasks::ProcessTasks()
80 {
81 CALL_DEBUG_ENTER;
82 std::vector<TaskPtr> tasks;
83 PopPendingTaskList(tasks);
84 for (const auto &it : tasks) {
85 it->ProcessTask();
86 }
87 }
88
PostSyncTask(DTaskCallback callback)89 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
90 {
91 CALL_DEBUG_ENTER;
92 CHKPR(callback, ERROR_NULL_POINTER);
93 if (IsCallFromWorkerThread()) {
94 return callback();
95 }
96 std::shared_ptr<Promise> promise = std::make_shared<Promise>();
97 Future future = promise->get_future();
98 auto task = PostTask(callback, promise);
99 CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
100
101 static constexpr int32_t timeout = 3000;
102 std::chrono::milliseconds span(timeout);
103 auto res = future.wait_for(span);
104 task->SetWaited();
105 if (res == std::future_status::timeout) {
106 MMI_HILOGE("Task timeout");
107 return ETASKS_WAIT_TIMEOUT;
108 } else if (res == std::future_status::deferred) {
109 MMI_HILOGE("Task deferred");
110 return ETASKS_WAIT_DEFERRED;
111 }
112 return future.get();
113 }
114
PostAsyncTask(DTaskCallback callback)115 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
116 {
117 CHKPR(callback, ERROR_NULL_POINTER);
118 if (IsCallFromWorkerThread()) {
119 return callback();
120 }
121 CHKPR(PostTask(callback), ETASKS_POST_ASYNCTASK_FAIL);
122 return RET_OK;
123 }
124
PopPendingTaskList(std::vector<TaskPtr> & tasks)125 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
126 {
127 std::lock_guard<std::mutex> guard(mux_);
128 static constexpr int32_t onceProcessTaskLimit = 10;
129 for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
130 if (tasks_.empty()) {
131 break;
132 }
133 auto task = tasks_.front();
134 CHKPB(task);
135 RecoveryId(task->GetId());
136 tasks.push_back(task->GetSharedPtr());
137 tasks_.pop();
138 }
139 }
140
PostTask(DTaskCallback callback,std::shared_ptr<Promise> promise)141 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, std::shared_ptr<Promise> promise)
142 {
143 if (IsCallFromWorkerThread()) {
144 MMI_HILOGE("This interface cannot be called from a worker thread");
145 return nullptr;
146 }
147 std::lock_guard<std::mutex> guard(mux_);
148 MMI_HILOGD("tasks_ size:%{public}d", static_cast<int32_t>(tasks_.size()));
149 static constexpr int32_t maxTasksLimit = 1000;
150 auto tsize = tasks_.size();
151 if (tsize > maxTasksLimit) {
152 MMI_HILOGE("The task queue is full. size:%{public}zu, maxTasksLimit:%{public}d", tsize, maxTasksLimit);
153 return nullptr;
154 }
155 int32_t id = GenerateId();
156 TaskData data = { GetThisThreadId(), id };
157 auto res = write(fds_[1], &data, sizeof(data));
158 if (res == -1) {
159 RecoveryId(id);
160 MMI_HILOGE("Pipe write failed, errno:%{public}d", errno);
161 return nullptr;
162 }
163 TaskPtr task = std::make_shared<Task>(id, callback, promise);
164 tasks_.push(task);
165 std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
166 MMI_HILOGD("Post taskType:%{public}s", taskType.c_str());
167 return task->GetSharedPtr();
168 }
169 } // namespace MMI
170 } // namespace OHOS