1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define MEDIA_TASK_THREAD
16 #define HST_LOG_TAG "Task"
17 #include "osal/task/task.h"
18 #include "osal/task/taskInner.h"
19 #include "osal/task/thread.h"
20 #include "osal/task/pipeline_threadpool.h"
21 #include "osal/utils/util.h"
22 #include "cpp_ext/memory_ext.h"
23 #include "common/log.h"
24
25 #include <mutex>
26
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29 }
30
31 namespace OHOS {
32 namespace Media {
33 namespace {
34 constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35 }
36 static std::atomic<uint16_t> singletonTaskId = 0;
37
SleepInTask(unsigned ms)38 void TaskInner::SleepInTask(unsigned ms)
39 {
40 OSAL::SleepFor(ms);
41 }
42
GetNowUs()43 static int64_t GetNowUs()
44 {
45 auto now = std::chrono::steady_clock::now();
46 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47 }
48
TaskInner(const std::string & name,const std::string & groupId,TaskType type,TaskPriority priority,bool singleLoop)49 TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50 bool singleLoop)
51 : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52 {
53 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54 name_.c_str(), groupId.c_str(), type);
55 if (type == TaskType::SINGLETON) {
56 std::string newName = name_ + std::to_string(++singletonTaskId);
57 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58 } else {
59 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60 }
61 }
62
Init()63 void TaskInner::Init()
64 {
65 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Init", name_.c_str());
66 pipelineThread_->AddTask(shared_from_this());
67 }
68
DeInit()69 void TaskInner::DeInit()
70 {
71 MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit", name_.c_str());
72 pipelineThread_->RemoveTask(shared_from_this());
73 {
74 AutoLock lock1(jobMutex_);
75 AutoLock lock2(stateMutex_);
76 runningState_ = RunningState::STOPPED;
77 topProcessUs_ = -1;
78 }
79 MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit done", name_.c_str());
80 }
81
~TaskInner()82 TaskInner::~TaskInner()
83 {
84 MEDIA_LOG_D_T(PUBLIC_LOG_S " dtor", name_.c_str());
85 }
86
UpdateDelayTime(int64_t delayUs)87 void TaskInner::UpdateDelayTime(int64_t delayUs)
88 {
89 if (!singleLoop_) {
90 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
91 return;
92 }
93 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
94 ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
95 pipelineThread_->LockJobState();
96 AutoLock lock(stateMutex_);
97 if (runningState_ != RunningState::STARTED) {
98 pipelineThread_->UnLockJobState(false);
99 return;
100 }
101 topProcessUs_ = GetNowUs() + delayUs;
102 pipelineThread_->UnLockJobState(true);
103 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
104 name_.c_str(), topProcessUs_);
105 }
106
Start()107 void TaskInner::Start()
108 {
109 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
110 pipelineThread_->LockJobState();
111 AutoLock lock(stateMutex_);
112 runningState_ = RunningState::STARTED;
113 if (singleLoop_) {
114 if (!job_) {
115 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
116 }
117 topProcessUs_ = GetNowUs();
118 } else {
119 UpdateTop();
120 }
121 pipelineThread_->UnLockJobState(true);
122 MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done, topProcessUs:%{public}" PRId64,
123 name_.c_str(), topProcessUs_);
124 }
125
Stop()126 void TaskInner::Stop()
127 {
128 if (pipelineThread_->IsRunningInSelf()) {
129 MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
130 runningState_ = RunningState::STOPPED;
131 topProcessUs_ = -1;
132 return;
133 }
134 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Stop", name_.c_str());
135 AutoLock lock1(jobMutex_);
136 pipelineThread_->LockJobState();
137 AutoLock lock2(stateMutex_);
138 if (runningState_.load() == RunningState::STOPPED) {
139 pipelineThread_->UnLockJobState(false);
140 return;
141 }
142 runningState_ = RunningState::STOPPED;
143 topProcessUs_ = -1;
144 pipelineThread_->UnLockJobState(true);
145 MEDIA_LOG_I_T(PUBLIC_LOG_S " Stop <<", name_.c_str());
146 }
147
StopAsync()148 void TaskInner::StopAsync()
149 {
150 if (pipelineThread_->IsRunningInSelf()) {
151 MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
152 runningState_ = RunningState::STOPPED;
153 topProcessUs_ = -1;
154 return;
155 }
156 MEDIA_LOG_I_T(PUBLIC_LOG_S " StopAsync", name_.c_str());
157 pipelineThread_->LockJobState();
158 AutoLock lock(stateMutex_);
159 bool stateChanged = false;
160 if (runningState_.load() != RunningState::STOPPED) {
161 runningState_ = RunningState::STOPPED;
162 topProcessUs_ = -1;
163 stateChanged = true;
164 }
165 pipelineThread_->UnLockJobState(stateChanged);
166 }
167
Pause()168 void TaskInner::Pause()
169 {
170 if (pipelineThread_->IsRunningInSelf()) {
171 RunningState state = runningState_.load();
172 if (state == RunningState::STARTED) {
173 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
174 PUBLIC_LOG_S " Pause done in self task", name_.c_str());
175 runningState_ = RunningState::PAUSED;
176 topProcessUs_ = -1;
177 return;
178 } else {
179 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
180 PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
181 return;
182 }
183 }
184 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
185 AutoLock lock1(jobMutex_);
186 pipelineThread_->LockJobState();
187 AutoLock lock2(stateMutex_);
188 RunningState state = runningState_.load();
189 if (state != RunningState::STARTED) {
190 pipelineThread_->UnLockJobState(false);
191 return;
192 }
193 runningState_ = RunningState::PAUSED;
194 topProcessUs_ = -1;
195 pipelineThread_->UnLockJobState(true);
196 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
197 }
198
PauseAsync()199 void TaskInner::PauseAsync()
200 {
201 if (pipelineThread_->IsRunningInSelf()) {
202 RunningState state = runningState_.load();
203 if (state == RunningState::STARTED) {
204 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
205 PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
206 runningState_ = RunningState::PAUSED;
207 topProcessUs_ = -1;
208 return;
209 } else {
210 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
211 PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
212 return;
213 }
214 }
215 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
216 pipelineThread_->LockJobState();
217 AutoLock lock(stateMutex_);
218 bool stateChanged = false;
219 if (runningState_.load() == RunningState::STARTED) {
220 runningState_ = RunningState::PAUSED;
221 topProcessUs_ = -1;
222 stateChanged = true;
223 }
224 pipelineThread_->UnLockJobState(stateChanged);
225 }
226
RegisterJob(const std::function<int64_t ()> & job)227 void TaskInner::RegisterJob(const std::function<int64_t()>& job)
228 {
229 MEDIA_LOG_I_T(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
230 job_ = std::move(job);
231 }
232
SubmitJobOnce(const std::function<void ()> & job,int64_t delayUs,bool wait)233 void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
234 {
235 MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
236 int64_t time = InsertJob(job, delayUs, false);
237 if (wait) {
238 AutoLock lock(stateMutex_);
239 replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
240 }
241 }
242
SubmitJob(const std::function<void ()> & job,int64_t delayUs,bool wait)243 void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
244 {
245 MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
246 int64_t time = InsertJob(job, delayUs, true);
247 if (wait) {
248 AutoLock lock(stateMutex_);
249 replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
250 }
251 }
252
UpdateTop()253 void TaskInner::UpdateTop()
254 {
255 // jobQueue_ is only handled in STARTED state, msgQueue_ always got handled.
256 if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
257 topProcessUs_ = -1;
258 return;
259 }
260 if (msgQueue_.empty()) {
261 topProcessUs_ = jobQueue_.begin()->first;
262 topIsJob_ = true;
263 } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
264 topProcessUs_ = msgQueue_.begin()->first;
265 topIsJob_ = false;
266 } else {
267 int64_t msgProcessTime = msgQueue_.begin()->first;
268 int64_t jobProcessTime = jobQueue_.begin()->first;
269 int64_t nowUs = GetNowUs();
270 if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
271 topProcessUs_ = msgProcessTime;
272 topIsJob_ = false;
273 } else {
274 topProcessUs_ = jobProcessTime;
275 topIsJob_ = true;
276 }
277 }
278 }
279
NextJobUs()280 int64_t TaskInner::NextJobUs()
281 {
282 AutoLock lock(stateMutex_);
283 return topProcessUs_;
284 }
285
HandleJob()286 void TaskInner::HandleJob()
287 {
288 AutoLock lock(jobMutex_);
289 if (singleLoop_) {
290 stateMutex_.lock();
291 int64_t currentTopProcessUs = topProcessUs_;
292 if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
293 topProcessUs_ = -1;
294 stateMutex_.unlock();
295 return;
296 }
297 // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
298 stateMutex_.unlock();
299 int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
300
301 AutoLock lock(stateMutex_);
302 // if topProcessUs_ is -1, we already pause/stop in job_()
303 // if topProcessUs_ is changed, we should ignore the returned delay time.
304 if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
305 topProcessUs_ = GetNowUs() + nextDelay;
306 }
307 } else {
308 std::function<void()> nextJob;
309 stateMutex_.lock();
310 if (topIsJob_) {
311 nextJob = std::move(jobQueue_.begin()->second);
312 jobQueue_.erase(jobQueue_.begin());
313 } else {
314 nextJob = std::move(msgQueue_.begin()->second);
315 msgQueue_.erase(msgQueue_.begin());
316 }
317 {
318 // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
319 stateMutex_.unlock();
320 nextJob();
321 replyCond_.NotifyAll();
322 }
323 AutoLock lock(stateMutex_);
324 UpdateTop();
325 }
326 }
327
InsertJob(const std::function<void ()> & job,int64_t delayUs,bool inJobQueue)328 int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
329 {
330 pipelineThread_->LockJobState();
331 AutoLock lock(stateMutex_);
332 int64_t nowUs = GetNowUs();
333 if (delayUs < 0) {
334 delayUs = 0;
335 }
336 int64_t processTime = nowUs + delayUs;
337 if (inJobQueue) {
338 while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
339 MEDIA_LOG_W_T("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
340 processTime++;
341 }
342 jobQueue_[processTime] = std::move(job);
343 } else {
344 while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
345 MEDIA_LOG_W_T("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
346 processTime++;
347 }
348 msgQueue_[processTime] = std::move(job);
349 }
350 int64_t lastProcessUs = topProcessUs_;
351 // update top if only new job is more emgercy or jobqueue is empty
352 if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
353 UpdateTop();
354 }
355 // if top is updated we should wake pipeline thread
356 pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
357 return processTime;
358 }
359 } // namespace Media
360 } // namespace OHOS
361