1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define MEDIA_PIPELINE
17 #define HST_LOG_TAG "Pipeline"
18
19 #include <queue>
20 #include <stack>
21 #include "pipeline/pipeline.h"
22 #include "osal/task/autolock.h"
23 #include "osal/task/jobutils.h"
24 #include "common/log.h"
25 #include "osal/utils/hitrace_utils.h"
26
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" };
29 }
30
31 namespace OHOS {
32 namespace Media {
33 namespace Pipeline {
34 static std::atomic<uint16_t> pipeLineId = 0;
35
GetNextPipelineId()36 int32_t Pipeline::GetNextPipelineId()
37 {
38 return pipeLineId++;
39 }
40
~Pipeline()41 Pipeline::~Pipeline()
42 {
43 }
44
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback,const std::string & groupId)45 void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
46 const std::string &groupId)
47 {
48 MEDIA_LOG_I("Pipeline::Init");
49 eventReceiver_ = receiver;
50 filterCallback_ = callback;
51 groupId_ = groupId;
52 }
53
Prepare()54 Status Pipeline::Prepare()
55 {
56 MEDIA_LOG_I("Prepare enter.");
57 Status ret = Status::OK;
58 SubmitJobOnce([&] {
59 AutoLock lock(mutex_);
60 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
61 ret = (*it)->Prepare();
62 if (ret != Status::OK) {
63 return;
64 }
65 }
66 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
67 ret = (*it)->WaitAllState(FilterState::READY);
68 if (ret != Status::OK) {
69 return;
70 }
71 }
72 });
73 MEDIA_LOG_I("Prepare done ret = %{public}d", ret);
74 return ret;
75 }
76
Start()77 Status Pipeline::Start()
78 {
79 MEDIA_LOG_I("Start enter.");
80 Status ret = Status::OK;
81 SubmitJobOnce([&] {
82 AutoLock lock(mutex_);
83 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
84 ret = (*it)->Start();
85 if (ret != Status::OK) {
86 return;
87 }
88 }
89 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
90 ret = (*it)->WaitAllState(FilterState::RUNNING);
91 if (ret != Status::OK) {
92 return;
93 }
94 }
95 });
96 MEDIA_LOG_I("Start done ret = %{public}d", ret);
97 return ret;
98 }
99
Pause()100 Status Pipeline::Pause()
101 {
102 MEDIA_LOG_I("Pause enter.");
103 Status ret = Status::OK;
104 SubmitJobOnce([&] {
105 AutoLock lock(mutex_);
106 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
107 auto rtv = (*it)->Pause();
108 if (rtv != Status::OK) {
109 ret = rtv;
110 }
111 }
112 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
113 auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
114 if (rtv != Status::OK) {
115 ret = rtv;
116 }
117 }
118 });
119 MEDIA_LOG_I("Pause done ret = %{public}d", ret);
120 return ret;
121 }
122
Resume()123 Status Pipeline::Resume()
124 {
125 MEDIA_LOG_I("Resume enter.");
126 Status ret = Status::OK;
127 SubmitJobOnce([&] {
128 AutoLock lock(mutex_);
129 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
130 ret = (*it)->Resume();
131 if (ret != Status::OK) {
132 return;
133 }
134 }
135 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
136 ret = (*it)->WaitAllState(FilterState::RUNNING);
137 if (ret != Status::OK) {
138 return;
139 }
140 }
141 });
142 MEDIA_LOG_I("Resume done ret = %{public}d", ret);
143 return ret;
144 }
145
Stop()146 Status Pipeline::Stop()
147 {
148 MEDIA_LOG_I("Stop enter.");
149 Status ret = Status::OK;
150 SubmitJobOnce([&] {
151 AutoLock lock(mutex_);
152 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
153 if (*it == nullptr) {
154 MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size());
155 continue;
156 }
157 auto rtv = (*it)->Stop();
158 if (rtv != Status::OK) {
159 ret = rtv;
160 }
161 }
162 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
163 auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
164 if (rtv != Status::OK) {
165 ret = rtv;
166 }
167 }
168 filters_.clear();
169 });
170 MEDIA_LOG_I("Stop done ret = %{public}d", ret);
171 return ret;
172 }
173
Flush()174 Status Pipeline::Flush()
175 {
176 MEDIA_LOG_I("Flush enter.");
177 SubmitJobOnce([&] {
178 AutoLock lock(mutex_);
179 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
180 (*it)->Flush();
181 }
182 });
183 MEDIA_LOG_I("Flush done.");
184 return Status::OK;
185 }
186
Release()187 Status Pipeline::Release()
188 {
189 MEDIA_LOG_I("Release enter.");
190 SubmitJobOnce([&] {
191 AutoLock lock(mutex_);
192 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
193 (*it)->Release();
194 }
195 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
196 (*it)->WaitAllState(FilterState::RELEASED);
197 }
198 filters_.clear();
199 });
200 MEDIA_LOG_I("Release done.");
201 return Status::OK;
202 }
203
Preroll(bool render)204 Status Pipeline::Preroll(bool render)
205 {
206 MEDIA_LOG_I("Preroll enter.");
207 Status ret = Status::OK;
208 AutoLock lock(mutex_);
209 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
210 auto rtv = (*it)->Preroll();
211 if (rtv != Status::OK) {
212 ret = rtv;
213 MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
214 return ret;
215 }
216 }
217 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
218 auto rtv = (*it)->WaitPrerollDone(render);
219 if (rtv != Status::OK) {
220 ret = rtv;
221 MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
222 return ret;
223 }
224 }
225 MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
226 return ret;
227 }
228
SetPlayRange(int64_t start,int64_t end)229 Status Pipeline::SetPlayRange(int64_t start, int64_t end)
230 {
231 MEDIA_LOG_I("SetPlayRange enter.");
232 SubmitJobOnce([&] {
233 AutoLock lock(mutex_);
234 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
235 (*it)->SetPlayRange(start, end);
236 }
237 });
238 MEDIA_LOG_I("SetPlayRange done.");
239 return Status::OK;
240 }
241
AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)242 Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
243 {
244 MEDIA_LOG_I("AddHeadFilters enter.");
245 std::vector<std::shared_ptr<Filter>> filtersToAdd;
246 for (auto& filterIn : filtersIn) {
247 bool matched = false;
248 for (const auto& filter : filters_) {
249 if (filterIn == filter) {
250 matched = true;
251 break;
252 }
253 }
254 if (!matched) {
255 filtersToAdd.push_back(filterIn);
256 filterIn->LinkPipeLine(groupId_);
257 }
258 }
259 if (filtersToAdd.empty()) {
260 MEDIA_LOG_I("filter already exists");
261 return Status::OK;
262 }
263 SubmitJobOnce([&] {
264 AutoLock lock(mutex_);
265 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
266 });
267 MEDIA_LOG_I("AddHeadFilters done.");
268 return Status::OK;
269 }
270
RemoveHeadFilter(const std::shared_ptr<Filter> & filter)271 Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
272 {
273 SubmitJobOnce([&] {
274 AutoLock lock(mutex_);
275 auto it = std::find_if(filters_.begin(), filters_.end(),
276 [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
277 if (it != filters_.end()) {
278 filters_.erase(it);
279 }
280 filter->Release();
281 filter->WaitAllState(FilterState::RELEASED);
282 filter->ClearAllNextFilters();
283 return Status::OK;
284 });
285 return Status::OK;
286 }
287
LinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)288 Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
289 const std::vector<std::shared_ptr<Filter>> &nextFilters,
290 StreamType type)
291 {
292 for (auto nextFilter : nextFilters) {
293 auto ret = preFilter->LinkNext(nextFilter, type);
294 nextFilter->LinkPipeLine(groupId_);
295 FALSE_RETURN_V(ret == Status::OK, ret);
296 }
297 return Status::OK;
298 }
299
UpdateFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)300 Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
301 const std::vector<std::shared_ptr<Filter>> &nextFilters,
302 StreamType type)
303 {
304 for (auto nextFilter : nextFilters) {
305 preFilter->UpdateNext(nextFilter, type);
306 }
307 return Status::OK;
308 }
309
UnLinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)310 Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
311 const std::vector<std::shared_ptr<Filter>> &nextFilters,
312 StreamType type)
313 {
314 for (auto nextFilter : nextFilters) {
315 preFilter->UnLinkNext(nextFilter, type);
316 }
317 return Status::OK;
318 }
319
OnEvent(const Event & event)320 void Pipeline::OnEvent(const Event& event)
321 {
322 }
323
324 } // namespace Pipeline
325 } // namespace Media
326 } // namespace OHOS
327