1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define MEDIA_PIPELINE
17 #define HST_LOG_TAG "Pipeline"
18 
19 #include <queue>
20 #include <stack>
21 #include "pipeline/pipeline.h"
22 #include "osal/task/autolock.h"
23 #include "osal/task/jobutils.h"
24 #include "common/log.h"
25 #include "osal/utils/hitrace_utils.h"
26 
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" };
29 }
30 
31 namespace OHOS {
32 namespace Media {
33 namespace Pipeline {
34 static std::atomic<uint16_t> pipeLineId = 0;
35 
GetNextPipelineId()36 int32_t Pipeline::GetNextPipelineId()
37 {
38     return pipeLineId++;
39 }
40 
~Pipeline()41 Pipeline::~Pipeline()
42 {
43 }
44 
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback,const std::string & groupId)45 void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
46     const std::string &groupId)
47 {
48     MEDIA_LOG_I("Pipeline::Init");
49     eventReceiver_ = receiver;
50     filterCallback_ = callback;
51     groupId_ = groupId;
52 }
53 
Prepare()54 Status Pipeline::Prepare()
55 {
56     MEDIA_LOG_I("Prepare enter.");
57     Status ret = Status::OK;
58     SubmitJobOnce([&] {
59         AutoLock lock(mutex_);
60         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
61             ret = (*it)->Prepare();
62             if (ret != Status::OK) {
63                 return;
64             }
65         }
66         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
67             ret = (*it)->WaitAllState(FilterState::READY);
68             if (ret != Status::OK) {
69                 return;
70             }
71         }
72     });
73     MEDIA_LOG_I("Prepare done ret = %{public}d", ret);
74     return ret;
75 }
76 
Start()77 Status Pipeline::Start()
78 {
79     MEDIA_LOG_I("Start enter.");
80     Status ret = Status::OK;
81     SubmitJobOnce([&] {
82         AutoLock lock(mutex_);
83         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
84             ret = (*it)->Start();
85             if (ret != Status::OK) {
86                 return;
87             }
88         }
89         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
90             ret = (*it)->WaitAllState(FilterState::RUNNING);
91             if (ret != Status::OK) {
92                 return;
93             }
94         }
95     });
96     MEDIA_LOG_I("Start done ret = %{public}d", ret);
97     return ret;
98 }
99 
Pause()100 Status Pipeline::Pause()
101 {
102     MEDIA_LOG_I("Pause enter.");
103     Status ret = Status::OK;
104     SubmitJobOnce([&] {
105         AutoLock lock(mutex_);
106         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
107             auto rtv = (*it)->Pause();
108             if (rtv != Status::OK) {
109                 ret = rtv;
110             }
111         }
112         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
113             auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
114             if (rtv != Status::OK) {
115                 ret = rtv;
116             }
117         }
118     });
119     MEDIA_LOG_I("Pause done ret = %{public}d", ret);
120     return ret;
121 }
122 
Resume()123 Status Pipeline::Resume()
124 {
125     MEDIA_LOG_I("Resume enter.");
126     Status ret = Status::OK;
127     SubmitJobOnce([&] {
128         AutoLock lock(mutex_);
129         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
130             ret = (*it)->Resume();
131             if (ret != Status::OK) {
132                 return;
133             }
134         }
135         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
136             ret = (*it)->WaitAllState(FilterState::RUNNING);
137             if (ret != Status::OK) {
138                 return;
139             }
140         }
141     });
142     MEDIA_LOG_I("Resume done ret = %{public}d", ret);
143     return ret;
144 }
145 
Stop()146 Status Pipeline::Stop()
147 {
148     MEDIA_LOG_I("Stop enter.");
149     Status ret = Status::OK;
150     SubmitJobOnce([&] {
151         AutoLock lock(mutex_);
152         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
153             if (*it == nullptr) {
154                 MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size());
155                 continue;
156             }
157             auto rtv = (*it)->Stop();
158             if (rtv != Status::OK) {
159                 ret = rtv;
160             }
161         }
162         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
163             auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
164             if (rtv != Status::OK) {
165                 ret = rtv;
166             }
167         }
168         filters_.clear();
169     });
170     MEDIA_LOG_I("Stop done ret = %{public}d", ret);
171     return ret;
172 }
173 
Flush()174 Status Pipeline::Flush()
175 {
176     MEDIA_LOG_I("Flush enter.");
177     SubmitJobOnce([&] {
178         AutoLock lock(mutex_);
179         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
180             (*it)->Flush();
181         }
182     });
183     MEDIA_LOG_I("Flush done.");
184     return Status::OK;
185 }
186 
Release()187 Status Pipeline::Release()
188 {
189     MEDIA_LOG_I("Release enter.");
190     SubmitJobOnce([&] {
191         AutoLock lock(mutex_);
192         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
193             (*it)->Release();
194         }
195         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
196             (*it)->WaitAllState(FilterState::RELEASED);
197         }
198         filters_.clear();
199     });
200     MEDIA_LOG_I("Release done.");
201     return Status::OK;
202 }
203 
Preroll(bool render)204 Status Pipeline::Preroll(bool render)
205 {
206     MEDIA_LOG_I("Preroll enter.");
207     Status ret = Status::OK;
208     AutoLock lock(mutex_);
209     for (auto it = filters_.begin(); it != filters_.end(); ++it) {
210         auto rtv = (*it)->Preroll();
211         if (rtv != Status::OK) {
212             ret = rtv;
213             MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
214             return ret;
215         }
216     }
217     for (auto it = filters_.begin(); it != filters_.end(); ++it) {
218         auto rtv = (*it)->WaitPrerollDone(render);
219         if (rtv != Status::OK) {
220             ret = rtv;
221             MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
222             return ret;
223         }
224     }
225     MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
226     return ret;
227 }
228 
SetPlayRange(int64_t start,int64_t end)229 Status Pipeline::SetPlayRange(int64_t start, int64_t end)
230 {
231     MEDIA_LOG_I("SetPlayRange enter.");
232     SubmitJobOnce([&] {
233         AutoLock lock(mutex_);
234         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
235             (*it)->SetPlayRange(start, end);
236         }
237     });
238     MEDIA_LOG_I("SetPlayRange done.");
239     return Status::OK;
240 }
241 
AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)242 Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
243 {
244     MEDIA_LOG_I("AddHeadFilters enter.");
245     std::vector<std::shared_ptr<Filter>> filtersToAdd;
246     for (auto& filterIn : filtersIn) {
247         bool matched = false;
248         for (const auto& filter : filters_) {
249             if (filterIn == filter) {
250                 matched = true;
251                 break;
252             }
253         }
254         if (!matched) {
255             filtersToAdd.push_back(filterIn);
256             filterIn->LinkPipeLine(groupId_);
257         }
258     }
259     if (filtersToAdd.empty()) {
260         MEDIA_LOG_I("filter already exists");
261         return Status::OK;
262     }
263     SubmitJobOnce([&] {
264         AutoLock lock(mutex_);
265         this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
266     });
267     MEDIA_LOG_I("AddHeadFilters done.");
268     return Status::OK;
269 }
270 
RemoveHeadFilter(const std::shared_ptr<Filter> & filter)271 Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
272 {
273     SubmitJobOnce([&] {
274         AutoLock lock(mutex_);
275         auto it = std::find_if(filters_.begin(), filters_.end(),
276                                [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
277         if (it != filters_.end()) {
278             filters_.erase(it);
279         }
280         filter->Release();
281         filter->WaitAllState(FilterState::RELEASED);
282         filter->ClearAllNextFilters();
283         return Status::OK;
284     });
285     return Status::OK;
286 }
287 
LinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)288 Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
289                              const std::vector<std::shared_ptr<Filter>> &nextFilters,
290                              StreamType type)
291 {
292     for (auto nextFilter : nextFilters) {
293         auto ret = preFilter->LinkNext(nextFilter, type);
294         nextFilter->LinkPipeLine(groupId_);
295         FALSE_RETURN_V(ret == Status::OK, ret);
296     }
297     return Status::OK;
298 }
299 
UpdateFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)300 Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
301                                const std::vector<std::shared_ptr<Filter>> &nextFilters,
302                                StreamType type)
303 {
304     for (auto nextFilter : nextFilters) {
305         preFilter->UpdateNext(nextFilter, type);
306     }
307     return Status::OK;
308 }
309 
UnLinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)310 Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
311                                const std::vector<std::shared_ptr<Filter>> &nextFilters,
312                                StreamType type)
313 {
314     for (auto nextFilter : nextFilters) {
315         preFilter->UnLinkNext(nextFilter, type);
316     }
317     return Status::OK;
318 }
319 
OnEvent(const Event & event)320 void Pipeline::OnEvent(const Event& event)
321 {
322 }
323 
324 } // namespace Pipeline
325 } // namespace Media
326 } // namespace OHOS
327