1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <malloc.h>
19 #include "task_queue.h"
20 #include "media_log.h"
21 #include "media_errors.h"
22 
23 using namespace OHOS::QOS;
24 
25 namespace {
26     constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_PLAYER, "TaskQueue" };
27 }
28 
29 namespace OHOS {
30 namespace Media {
~TaskQueue()31 TaskQueue::~TaskQueue()
32 {
33     (void)Stop();
34 }
35 
Start()36 int32_t TaskQueue::Start()
37 {
38     std::unique_lock<std::mutex> lock(mutex_);
39     CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
40         MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
41     isExit_ = false;
42     thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
43     uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
44     MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s], curTimeUs: [%{public}" PRIu64 "]",
45         FAKE_POINTER(this), name_.c_str(), curTimeNs);
46     return MSERR_OK;
47 }
48 
Stop()49 int32_t TaskQueue::Stop() noexcept
50 {
51     std::unique_lock<std::mutex> lock(mutex_);
52     if (isExit_) {
53         MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str());
54         return MSERR_OK;
55     }
56 
57     if (std::this_thread::get_id() == thread_->get_id()) {
58         MEDIA_LOGI("Stop at the task thread, reject");
59         return MSERR_INVALID_OPERATION;
60     }
61 
62     std::unique_ptr<std::thread> t;
63     isExit_ = true;
64     cond_.notify_all();
65     std::swap(thread_, t);
66     lock.unlock();
67 
68     if (t != nullptr && t->joinable()) {
69         t->join();
70     }
71 
72     lock.lock();
73     CancelNotExecutedTaskLocked();
74     return MSERR_OK;
75 }
76 
SetQos(const QosLevel level)77 void TaskQueue::SetQos(const QosLevel level)
78 {
79     if (tid_ == -1) {
80         MEDIA_LOGW("SetQos thread level failed, tid invalid");
81         return;
82     }
83     MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level));
84     SetQosForOtherThread(level, tid_);
85 }
86 
ResetQos()87 void TaskQueue::ResetQos()
88 {
89     if (tid_ == -1) {
90         MEDIA_LOGW("ResetQos thread level failed, tid invalid");
91         return;
92     }
93     ResetQosForOtherThread(tid_);
94     MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_));
95 }
96 
97 // cancelNotExecuted = false, delayUs = 0ULL.
EnqueueTask(const std::shared_ptr<ITaskHandler> & task,bool cancelNotExecuted,uint64_t delayUs)98 __attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
99     bool cancelNotExecuted, uint64_t delayUs)
100 {
101     constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
102 
103     CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
104         "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
105 
106     task->Clear();
107 
108     CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
109         "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
110         "], invalid! [%{public}s]",
111         delayUs, MAX_DELAY_US, name_.c_str());
112 
113     std::unique_lock<std::mutex> lock(mutex_);
114     CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
115         "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
116 
117     if (cancelNotExecuted) {
118         CancelNotExecutedTaskLocked();
119     }
120 
121     // 1000 is ns to us.
122     constexpr uint32_t US_TO_NS = 1000;
123     uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
124     CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
125         "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
126 
127     uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
128     auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
129         return (item.executeTimeNs_ > executeTimeNs);
130     });
131     (void)taskList_.insert(iter, {task, executeTimeNs});
132     cond_.notify_all();
133 
134     return 0;
135 }
136 
CancelNotExecutedTaskLocked()137 __attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
138 {
139     MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
140     while (!taskList_.empty()) {
141         std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
142         taskList_.pop_front();
143         if (task != nullptr) {
144             task->Cancel();
145         }
146     }
147 }
148 
TaskProcessor()149 __attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
150 {
151     constexpr uint32_t nameSizeMax = 15;
152     tid_ = gettid();
153     MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
154     pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
155     (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
156     while (true) {
157         std::unique_lock<std::mutex> lock(mutex_);
158         cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
159         if (isExit_) {
160             MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
161             return;
162         }
163         TaskHandlerItem item = taskList_.front();
164         uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
165         if (curTimeNs >= item.executeTimeNs_) {
166             taskList_.pop_front();
167         } else {
168             uint64_t diff =  item.executeTimeNs_ - curTimeNs;
169             (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
170             continue;
171         }
172         isTaskExecuting_ = true;
173         lock.unlock();
174 
175         if (item.task_ == nullptr || item.task_->IsCanceled()) {
176             MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
177             lock.lock();
178             isTaskExecuting_ = false;
179             lock.unlock();
180             continue;
181         }
182 
183         item.task_->Execute();
184         lock.lock();
185         isTaskExecuting_ = false;
186         lock.unlock();
187         if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
188             continue;
189         }
190         int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
191         if (res != MSERR_OK) {
192             MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
193         }
194     }
195     (void)mallopt(M_FLUSH_THREAD_CACHE, 0);
196     MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
197 }
198 
IsTaskExecuting()199 bool TaskQueue::IsTaskExecuting()
200 {
201     std::unique_lock<std::mutex> lock(mutex_);
202     return isTaskExecuting_;
203 }
204 } // namespace Media
205 } // namespace OHOS
206