1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef HISTREAMER_FOUNDATION_OSAL_TASK_INNER_H
17 #define HISTREAMER_FOUNDATION_OSAL_TASK_INNER_H
18 
19 #include <atomic>
20 #include <functional>
21 #include <string>
22 #include <list>
23 #include <map>
24 #include "osal/task/condition_variable.h"
25 #include "osal/task/mutex.h"
26 #include "osal/task/autolock.h"
27 #include "osal/task/pipeline_threadpool.h"
28 
29 #ifdef MEDIA_FOUNDATION_FFRT
30     #include "ffrt.h"
31 #else
32     #include <map>
33 #endif
34 
35 
36 namespace OHOS {
37 namespace Media {
38 
39 class TaskInner : public std::enable_shared_from_this<TaskInner> {
40 public:
41     explicit TaskInner(const std::string& name, const std::string& groupId, TaskType type,
42         TaskPriority priority, bool singleLoop);
43 
44     virtual ~TaskInner();
45 
46     virtual void Init();
47 
48     virtual void DeInit();
49 
50     virtual void Start();
51 
52     virtual void Stop();
53 
54     virtual void StopAsync();
55 
56     virtual void Pause();
57 
58     virtual void PauseAsync();
59 
60     virtual void RegisterJob(const std::function<int64_t()>& job);
61 
62     virtual void SubmitJobOnce(const std::function<void()>& job, int64_t delay, bool wait);
63 
64     virtual void SubmitJob(const std::function<void()>& job, int64_t delay, bool wait);
65 
IsTaskRunning()66     virtual bool IsTaskRunning() { return runningState_ == RunningState::STARTED; }
67 
68     virtual void UpdateDelayTime(int64_t delayUs);
69 
SetEnableStateChangeLog(bool enable)70     void SetEnableStateChangeLog(bool enable) { isStateLogEnabled_ = enable; }
71 
72     int64_t NextJobUs();
73 
74     void HandleJob();
75 
76     static void SleepInTask(unsigned ms);
77 
78 private:
79     enum class RunningState : int {
80         STARTED,
81         PAUSING,
82         PAUSED,
83         STOPPING,
84         STOPPED,
85     };
86 
87     const std::string name_;
88     std::atomic<RunningState> runningState_{RunningState::PAUSED};
89     std::atomic<bool> jobState_{false};
90     std::function<int64_t()> job_;
91     bool singleLoop_ = false;
92     int64_t topProcessUs_ {-1};
93     bool topIsJob_ = false;
94     std::shared_ptr<PipeLineThread> pipelineThread_;
95     std::atomic<bool> isStateLogEnabled_{true};
96 #ifdef MEDIA_FOUNDATION_FFRT
97     void DoJob(const std::function<void()>& job);
98     std::shared_ptr<ffrt::queue> jobQueue_;
99     Mutex stateMutex_;
100     ConditionVariable syncCond_;
101     ffrt::recursive_mutex jobMutex_;
102 #else
103     void UpdateTop();
104 
105     int64_t InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue);
106 
107     Mutex stateMutex_{};
108     FairMutex jobMutex_{};
109     ConditionVariable syncCond_{};
110     ConditionVariable replyCond_{};
111     std::map<int64_t, std::function<void()>> msgQueue_;  // msg will be sorted by timeUs
112     std::map<int64_t, std::function<void()>> jobQueue_;  // msg will be sorted by timeUs
113 #endif
114 };
115 } // namespace Media
116 } // namespace OHOS
117 #endif // HISTREAMER_FOUNDATION_OSAL_TASK_H
118 
119