1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define MEDIA_TASK_THREAD
16 #define HST_LOG_TAG "Task"
17 #include "osal/task/task.h"
18 #include "osal/task/taskInner.h"
19 #include "osal/task/thread.h"
20 #include "osal/task/pipeline_threadpool.h"
21 #include "osal/utils/util.h"
22 #include "cpp_ext/memory_ext.h"
23 #include "common/log.h"
24 
25 #include <mutex>
26 
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29 }
30 
31 namespace OHOS {
32 namespace Media {
33 namespace {
34     constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35 }
36 static std::atomic<uint16_t> singletonTaskId = 0;
37 
SleepInTask(unsigned ms)38 void TaskInner::SleepInTask(unsigned ms)
39 {
40     OSAL::SleepFor(ms);
41 }
42 
GetNowUs()43 static int64_t GetNowUs()
44 {
45     auto now = std::chrono::steady_clock::now();
46     return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47 }
48 
TaskInner(const std::string & name,const std::string & groupId,TaskType type,TaskPriority priority,bool singleLoop)49 TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50     bool singleLoop)
51     : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52 {
53     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54         name_.c_str(), groupId.c_str(), type);
55     if (type == TaskType::SINGLETON) {
56         std::string newName = name_ + std::to_string(++singletonTaskId);
57         pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58     } else {
59         pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60     }
61 }
62 
Init()63 void TaskInner::Init()
64 {
65     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Init", name_.c_str());
66     pipelineThread_->AddTask(shared_from_this());
67 }
68 
DeInit()69 void TaskInner::DeInit()
70 {
71     MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit", name_.c_str());
72     pipelineThread_->RemoveTask(shared_from_this());
73     {
74         AutoLock lock1(jobMutex_);
75         AutoLock lock2(stateMutex_);
76         runningState_ = RunningState::STOPPED;
77         topProcessUs_ = -1;
78     }
79     MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit done", name_.c_str());
80 }
81 
~TaskInner()82 TaskInner::~TaskInner()
83 {
84     MEDIA_LOG_D_T(PUBLIC_LOG_S " dtor", name_.c_str());
85 }
86 
UpdateDelayTime(int64_t delayUs)87 void TaskInner::UpdateDelayTime(int64_t delayUs)
88 {
89     if (!singleLoop_) {
90         MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
91         return;
92     }
93     MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
94         ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
95     pipelineThread_->LockJobState();
96     AutoLock lock(stateMutex_);
97     if (runningState_ != RunningState::STARTED) {
98         pipelineThread_->UnLockJobState(false);
99         return;
100     }
101     topProcessUs_ = GetNowUs() + delayUs;
102     pipelineThread_->UnLockJobState(true);
103     MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
104         name_.c_str(), topProcessUs_);
105 }
106 
Start()107 void TaskInner::Start()
108 {
109     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
110     pipelineThread_->LockJobState();
111     AutoLock lock(stateMutex_);
112     runningState_ = RunningState::STARTED;
113     if (singleLoop_) {
114         if (!job_) {
115             MEDIA_LOG_D_T("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
116         }
117         topProcessUs_ = GetNowUs();
118     } else {
119         UpdateTop();
120     }
121     pipelineThread_->UnLockJobState(true);
122     MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done, topProcessUs:%{public}" PRId64,
123         name_.c_str(), topProcessUs_);
124 }
125 
Stop()126 void TaskInner::Stop()
127 {
128     if (pipelineThread_->IsRunningInSelf()) {
129         MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
130         runningState_ = RunningState::STOPPED;
131         topProcessUs_ = -1;
132         return;
133     }
134     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Stop", name_.c_str());
135     AutoLock lock1(jobMutex_);
136     pipelineThread_->LockJobState();
137     AutoLock lock2(stateMutex_);
138     if (runningState_.load() == RunningState::STOPPED) {
139         pipelineThread_->UnLockJobState(false);
140         return;
141     }
142     runningState_ = RunningState::STOPPED;
143     topProcessUs_ = -1;
144     pipelineThread_->UnLockJobState(true);
145     MEDIA_LOG_I_T(PUBLIC_LOG_S " Stop <<", name_.c_str());
146 }
147 
StopAsync()148 void TaskInner::StopAsync()
149 {
150     if (pipelineThread_->IsRunningInSelf()) {
151         MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
152         runningState_ = RunningState::STOPPED;
153         topProcessUs_ = -1;
154         return;
155     }
156     MEDIA_LOG_I_T(PUBLIC_LOG_S " StopAsync", name_.c_str());
157     pipelineThread_->LockJobState();
158     AutoLock lock(stateMutex_);
159     bool stateChanged = false;
160     if (runningState_.load() != RunningState::STOPPED) {
161         runningState_ = RunningState::STOPPED;
162         topProcessUs_ = -1;
163         stateChanged = true;
164     }
165     pipelineThread_->UnLockJobState(stateChanged);
166 }
167 
Pause()168 void TaskInner::Pause()
169 {
170     if (pipelineThread_->IsRunningInSelf()) {
171         RunningState state = runningState_.load();
172         if (state == RunningState::STARTED) {
173             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
174                 PUBLIC_LOG_S " Pause done in self task", name_.c_str());
175             runningState_ = RunningState::PAUSED;
176             topProcessUs_ = -1;
177             return;
178         } else {
179             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
180                 PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
181             return;
182         }
183     }
184     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
185     AutoLock lock1(jobMutex_);
186     pipelineThread_->LockJobState();
187     AutoLock lock2(stateMutex_);
188     RunningState state = runningState_.load();
189     if (state != RunningState::STARTED) {
190         pipelineThread_->UnLockJobState(false);
191         return;
192     }
193     runningState_ = RunningState::PAUSED;
194     topProcessUs_ = -1;
195     pipelineThread_->UnLockJobState(true);
196     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
197 }
198 
PauseAsync()199 void TaskInner::PauseAsync()
200 {
201     if (pipelineThread_->IsRunningInSelf()) {
202         RunningState state = runningState_.load();
203         if (state == RunningState::STARTED) {
204             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
205                 PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
206             runningState_ = RunningState::PAUSED;
207             topProcessUs_ = -1;
208             return;
209         } else {
210             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
211                 PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
212             return;
213         }
214     }
215     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
216     pipelineThread_->LockJobState();
217     AutoLock lock(stateMutex_);
218     bool stateChanged = false;
219     if (runningState_.load() == RunningState::STARTED) {
220         runningState_ = RunningState::PAUSED;
221         topProcessUs_ = -1;
222         stateChanged = true;
223     }
224     pipelineThread_->UnLockJobState(stateChanged);
225 }
226 
RegisterJob(const std::function<int64_t ()> & job)227 void TaskInner::RegisterJob(const std::function<int64_t()>& job)
228 {
229     MEDIA_LOG_I_T(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
230     job_ = std::move(job);
231 }
232 
SubmitJobOnce(const std::function<void ()> & job,int64_t delayUs,bool wait)233 void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
234 {
235     MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
236     int64_t time = InsertJob(job, delayUs, false);
237     if (wait) {
238         AutoLock lock(stateMutex_);
239         replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
240     }
241 }
242 
SubmitJob(const std::function<void ()> & job,int64_t delayUs,bool wait)243 void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
244 {
245     MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
246     int64_t time = InsertJob(job, delayUs, true);
247     if (wait) {
248         AutoLock lock(stateMutex_);
249         replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
250     }
251 }
252 
UpdateTop()253 void TaskInner::UpdateTop()
254 {
255     // jobQueue_ is only handled in STARTED state, msgQueue_ always got handled.
256     if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
257         topProcessUs_ = -1;
258         return;
259     }
260     if (msgQueue_.empty()) {
261         topProcessUs_ = jobQueue_.begin()->first;
262         topIsJob_ = true;
263     } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
264         topProcessUs_ = msgQueue_.begin()->first;
265         topIsJob_ = false;
266     } else {
267         int64_t msgProcessTime = msgQueue_.begin()->first;
268         int64_t jobProcessTime = jobQueue_.begin()->first;
269         int64_t nowUs =  GetNowUs();
270         if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
271             topProcessUs_ = msgProcessTime;
272             topIsJob_ = false;
273         } else  {
274             topProcessUs_ = jobProcessTime;
275             topIsJob_ = true;
276         }
277     }
278 }
279 
NextJobUs()280 int64_t TaskInner::NextJobUs()
281 {
282     AutoLock lock(stateMutex_);
283     return topProcessUs_;
284 }
285 
HandleJob()286 void TaskInner::HandleJob()
287 {
288     AutoLock lock(jobMutex_);
289     if (singleLoop_) {
290         stateMutex_.lock();
291         int64_t currentTopProcessUs = topProcessUs_;
292         if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
293             topProcessUs_ = -1;
294             stateMutex_.unlock();
295             return;
296         }
297         // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
298         stateMutex_.unlock();
299         int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
300 
301         AutoLock lock(stateMutex_);
302         // if topProcessUs_ is -1, we already pause/stop in job_()
303         // if topProcessUs_ is changed, we should ignore the returned delay time.
304         if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
305             topProcessUs_ = GetNowUs() + nextDelay;
306         }
307     } else {
308         std::function<void()> nextJob;
309         stateMutex_.lock();
310         if (topIsJob_) {
311             nextJob = std::move(jobQueue_.begin()->second);
312             jobQueue_.erase(jobQueue_.begin());
313         } else {
314             nextJob = std::move(msgQueue_.begin()->second);
315             msgQueue_.erase(msgQueue_.begin());
316         }
317         {
318 			// unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
319             stateMutex_.unlock();
320             nextJob();
321             replyCond_.NotifyAll();
322         }
323         AutoLock lock(stateMutex_);
324         UpdateTop();
325     }
326 }
327 
InsertJob(const std::function<void ()> & job,int64_t delayUs,bool inJobQueue)328 int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
329 {
330     pipelineThread_->LockJobState();
331     AutoLock lock(stateMutex_);
332     int64_t nowUs = GetNowUs();
333     if (delayUs < 0) {
334         delayUs = 0;
335     }
336     int64_t processTime = nowUs + delayUs;
337     if (inJobQueue) {
338         while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
339             MEDIA_LOG_W_T("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
340             processTime++;
341         }
342         jobQueue_[processTime] = std::move(job);
343     } else {
344         while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
345             MEDIA_LOG_W_T("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
346             processTime++;
347         }
348         msgQueue_[processTime] = std::move(job);
349     }
350     int64_t lastProcessUs = topProcessUs_;
351     // update top if only new job is more emgercy or jobqueue is empty
352     if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
353         UpdateTop();
354     }
355     // if top is updated we should wake pipeline thread
356     pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
357     return processTime;
358 }
359 } // namespace Media
360 } // namespace OHOS
361