1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_scheduler.h"
17 
18 #include <fcntl.h>
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 
22 #include "devicestatus_define.h"
23 
24 #undef LOG_TAG
25 #define LOG_TAG "TaskScheduler"
26 
27 namespace OHOS {
28 namespace Msdp {
29 namespace DeviceStatus {
30 
ProcessTask()31 void TaskScheduler::Task::ProcessTask()
32 {
33     CALL_DEBUG_ENTER;
34     if (hasWaited_) {
35         FI_HILOGE("Expired tasks will be discarded, id:%{public}d", id_);
36         return;
37     }
38     int32_t ret = fun_();
39     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
40     FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
41     if (!hasWaited_ && promise_ != nullptr) {
42         promise_->set_value(ret);
43     }
44 }
45 
~TaskScheduler()46 TaskScheduler::~TaskScheduler()
47 {
48     if (fds_[0] >= 0) {
49         if (close(fds_[0]) < 0) {
50             FI_HILOGE("Close fds_[0] failed, err:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
51         }
52         fds_[0] = -1;
53     }
54     if (fds_[1] >= 0) {
55         if (close(fds_[1]) < 0) {
56             FI_HILOGE("Close fds_[1] failed, err:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
57         }
58         fds_[1] = -1;
59     }
60 }
61 
Init()62 bool TaskScheduler::Init()
63 {
64     CALL_DEBUG_ENTER;
65     if (::pipe2(fds_, O_CLOEXEC | O_NONBLOCK) != 0) {
66         FI_HILOGE("pipe2 failed, errno:%{public}s", ::strerror(errno));
67         return false;
68     }
69     return true;
70 }
71 
ProcessTasks()72 void TaskScheduler::ProcessTasks()
73 {
74     CALL_DEBUG_ENTER;
75     std::vector<TaskPtr> tasks;
76     PopPendingTaskList(tasks);
77     for (const auto &it : tasks) {
78         it->ProcessTask();
79     }
80 }
81 
PostSyncTask(DTaskCallback cb)82 int32_t TaskScheduler::PostSyncTask(DTaskCallback cb)
83 {
84     CALL_DEBUG_ENTER;
85     CHKPR(cb, ERROR_NULL_POINTER);
86     if (IsCallFromWorkerThread()) {
87         return cb();
88     }
89     Promise promise;
90     Future future = promise.get_future();
91     auto task = PostTask(cb, &promise);
92     CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
93 
94     static constexpr int32_t timeout = 3000;
95     std::chrono::milliseconds span(timeout);
96     auto res = future.wait_for(span);
97     task->SetWaited();
98     if (res == std::future_status::timeout) {
99         FI_HILOGE("Task timeout");
100         return ETASKS_WAIT_TIMEOUT;
101     } else if (res == std::future_status::deferred) {
102         FI_HILOGE("Task deferred");
103         return ETASKS_WAIT_DEFERRED;
104     }
105     return future.get();
106 }
107 
PostAsyncTask(DTaskCallback callback)108 int32_t TaskScheduler::PostAsyncTask(DTaskCallback callback)
109 {
110     CHKPR(callback, ERROR_NULL_POINTER);
111     auto task = PostTask(callback);
112     CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL);
113     return RET_OK;
114 }
115 
PopPendingTaskList(std::vector<TaskPtr> & tasks)116 void TaskScheduler::PopPendingTaskList(std::vector<TaskPtr> &tasks)
117 {
118     static constexpr int32_t onceProcessTaskLimit = 10;
119     std::lock_guard<std::mutex> guard(mux_);
120     for (int32_t i = 0; i < onceProcessTaskLimit; i++) {
121         if (tasks_.empty()) {
122             break;
123         }
124         auto firstTask = tasks_.front();
125         CHKPB(firstTask);
126         RecoveryId(firstTask->GetId());
127         tasks.push_back(firstTask->GetSharedPtr());
128         tasks_.pop();
129     }
130 }
131 
PostTask(DTaskCallback callback,Promise * promise)132 TaskScheduler::TaskPtr TaskScheduler::PostTask(DTaskCallback callback, Promise *promise)
133 {
134     FI_HILOGD("tasks_ size:%{public}zu", tasks_.size());
135     static constexpr int32_t maxTasksLimit = 1000;
136     std::lock_guard<std::mutex> guard(mux_);
137     size_t tsize = tasks_.size();
138     if (tsize > maxTasksLimit) {
139         FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit);
140         return nullptr;
141     }
142     int32_t id = GenerateId();
143     TaskData data = { GetThisThreadId(), id };
144     ssize_t res = write(fds_[1], &data, sizeof(data));
145     if (res == -1) {
146         RecoveryId(id);
147         FI_HILOGE("Pipeline writes failed, errno:%{public}d", errno);
148         return nullptr;
149     }
150     TaskPtr task = std::make_shared<Task>(id, callback, promise);
151     tasks_.push(task);
152     std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
153     FI_HILOGD("Post %{public}s", taskType.c_str());
154     return task->GetSharedPtr();
155 }
156 } // namespace DeviceStatus
157 } // namespace Msdp
158 } // namespace OHOS
159