1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "Filter"
17 #define MEDIA_PIPELINE
18
19 #include "filter/filter.h"
20 #include "osal/utils/util.h"
21 #include "common/log.h"
22 #include <algorithm>
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Filter" };
26 }
27
28 namespace OHOS {
29 namespace Media {
30 namespace Pipeline {
Filter(std::string name,FilterType type,bool isAyncMode)31 Filter::Filter(std::string name, FilterType type, bool isAyncMode)
32 : name_(std::move(name)), filterType_(type), isAsyncMode_(isAyncMode)
33 {
34 }
35
~Filter()36 Filter::~Filter()
37 {
38 nextFiltersMap_.clear();
39 }
40
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback)41 void Filter::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback)
42 {
43 receiver_ = receiver;
44 callback_ = callback;
45 }
46
LinkPipeLine(const std::string & groupId)47 void Filter::LinkPipeLine(const std::string &groupId)
48 {
49 groupId_ = groupId;
50 MEDIA_LOG_I("Filter %{public}s LinkPipeLine:%{public}s, isAsyncMode_:%{public}d",
51 name_.c_str(), groupId.c_str(), isAsyncMode_);
52 if (isAsyncMode_) {
53 TaskType taskType;
54 switch (filterType_) {
55 case FilterType::FILTERTYPE_VENC:
56 case FilterType::FILTERTYPE_VDEC:
57 case FilterType::VIDEO_CAPTURE:
58 taskType = TaskType::SINGLETON;
59 break;
60 case FilterType::FILTERTYPE_ASINK:
61 case FilterType::AUDIO_CAPTURE:
62 taskType = TaskType::AUDIO;
63 break;
64 default:
65 taskType = TaskType::SINGLETON;
66 break;
67 }
68 filterTask_ = std::make_unique<Task>(name_, groupId_, taskType, TaskPriority::HIGH, false);
69 filterTask_->SubmitJobOnce([this] {
70 Status ret = DoInitAfterLink();
71 SetErrCode(ret);
72 ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
73 });
74 } else {
75 Status ret = DoInitAfterLink();
76 SetErrCode(ret);
77 ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
78 }
79 }
80
Prepare()81 Status Filter::Prepare()
82 {
83 MEDIA_LOG_D("Prepare %{public}s, pState:%{public}d", name_.c_str(), curState_);
84 if (filterTask_) {
85 filterTask_->SubmitJobOnce([this] {
86 PrepareDone();
87 });
88 } else {
89 return PrepareDone();
90 }
91 return Status::OK;
92 }
93
PrepareDone()94 Status Filter::PrepareDone()
95 {
96 MEDIA_LOG_I("Prepare enter %{public}s", name_.c_str());
97 if (curState_ == FilterState::ERROR) {
98 // if DoInitAfterLink error, do not prepare.
99 return Status::ERROR_INVALID_OPERATION;
100 }
101 // next filters maybe added in DoPrepare, so we must DoPrepare first
102 Status ret = DoPrepare();
103 SetErrCode(ret);
104 if (ret != Status::OK) {
105 ChangeState(FilterState::ERROR);
106 return ret;
107 }
108 for (auto iter : nextFiltersMap_) {
109 for (auto filter : iter.second) {
110 auto ret = filter->Prepare();
111 if (ret != Status::OK) {
112 return ret;
113 }
114 }
115 }
116 ChangeState(FilterState::READY);
117 return ret;
118 }
119
Start()120 Status Filter::Start()
121 {
122 MEDIA_LOG_D("Start %{public}s, pState:%{public}d", name_.c_str(), curState_);
123 if (filterTask_) {
124 filterTask_->SubmitJobOnce([this] {
125 StartDone();
126 filterTask_->Start();
127 });
128 for (auto iter : nextFiltersMap_) {
129 for (auto filter : iter.second) {
130 filter->Start();
131 }
132 }
133 } else {
134 for (auto iter : nextFiltersMap_) {
135 for (auto filter : iter.second) {
136 filter->Start();
137 }
138 }
139 return StartDone();
140 }
141 return Status::OK;
142 }
143
StartDone()144 Status Filter::StartDone()
145 {
146 MEDIA_LOG_I("Start in %{public}s", name_.c_str());
147 Status ret = DoStart();
148 SetErrCode(ret);
149 ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
150 return ret;
151 }
152
Pause()153 Status Filter::Pause()
154 {
155 MEDIA_LOG_D("Pause %{public}s, pState:%{public}d", name_.c_str(), curState_);
156 // In offload case, we need pause to interrupt audio_sink_plugin write function, so do not use asyncmode
157 auto ret = PauseDone();
158 if (filterTask_) {
159 filterTask_->Pause();
160 }
161 for (auto iter : nextFiltersMap_) {
162 for (auto filter : iter.second) {
163 filter->Pause();
164 }
165 }
166 return ret;
167 }
168
PauseDragging()169 Status Filter::PauseDragging()
170 {
171 MEDIA_LOG_D("PauseDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
172 auto ret = DoPauseDragging();
173 if (filterTask_) {
174 filterTask_->Pause();
175 }
176 for (auto iter : nextFiltersMap_) {
177 for (auto filter : iter.second) {
178 auto curRet = filter->PauseDragging();
179 if (curRet != Status::OK) {
180 ret = curRet;
181 }
182 }
183 }
184 return ret;
185 }
186
PauseAudioAlign()187 Status Filter::PauseAudioAlign()
188 {
189 MEDIA_LOG_D("PauseAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
190 auto ret = DoPauseAudioAlign();
191 if (filterTask_) {
192 filterTask_->Pause();
193 }
194 for (auto iter : nextFiltersMap_) {
195 for (auto filter : iter.second) {
196 auto curRet = filter->PauseAudioAlign();
197 if (curRet != Status::OK) {
198 ret = curRet;
199 }
200 }
201 }
202 return ret;
203 }
204
PauseDone()205 Status Filter::PauseDone()
206 {
207 MEDIA_LOG_I("Pause in %{public}s", name_.c_str());
208 Status ret = DoPause();
209 SetErrCode(ret);
210 ChangeState(ret == Status::OK ? FilterState::PAUSED : FilterState::ERROR);
211 return ret;
212 }
213
Resume()214 Status Filter::Resume()
215 {
216 MEDIA_LOG_D("Resume %{public}s, pState:%{public}d", name_.c_str(), curState_);
217 if (filterTask_) {
218 filterTask_->SubmitJobOnce([this]() {
219 ResumeDone();
220 filterTask_->Start();
221 });
222 for (auto iter : nextFiltersMap_) {
223 for (auto filter : iter.second) {
224 filter->Resume();
225 }
226 }
227 } else {
228 for (auto iter : nextFiltersMap_) {
229 for (auto filter : iter.second) {
230 filter->Resume();
231 }
232 }
233 return ResumeDone();
234 }
235 return Status::OK;
236 }
237
ResumeDone()238 Status Filter::ResumeDone()
239 {
240 MEDIA_LOG_I("Resume in %{public}s", name_.c_str());
241 Status ret = DoResume();
242 SetErrCode(ret);
243 ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
244 return ret;
245 }
246
ResumeDragging()247 Status Filter::ResumeDragging()
248 {
249 MEDIA_LOG_D("ResumeDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
250 auto ret = Status::OK;
251 ret = DoResumeDragging();
252 if (filterTask_) {
253 filterTask_->Start();
254 }
255 for (auto iter : nextFiltersMap_) {
256 for (auto filter : iter.second) {
257 auto curRet = filter->ResumeDragging();
258 if (curRet != Status::OK) {
259 ret = curRet;
260 }
261 }
262 }
263 return ret;
264 }
265
ResumeAudioAlign()266 Status Filter::ResumeAudioAlign()
267 {
268 MEDIA_LOG_D("ResumeAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
269 auto ret = Status::OK;
270 ret = DoResumeAudioAlign();
271 if (filterTask_) {
272 filterTask_->Start();
273 }
274 for (auto iter : nextFiltersMap_) {
275 for (auto filter : iter.second) {
276 auto curRet = filter->ResumeAudioAlign();
277 if (curRet != Status::OK) {
278 ret = curRet;
279 }
280 }
281 }
282 return ret;
283 }
284
Stop()285 Status Filter::Stop()
286 {
287 MEDIA_LOG_D("Stop %{public}s, pState:%{public}d", name_.c_str(), curState_);
288 // In offload case, we need stop to interrupt audio_sink_plugin write function, so do not use asyncmode
289 auto ret = StopDone();
290 if (filterTask_) {
291 filterTask_->Stop();
292 }
293 for (auto iter : nextFiltersMap_) {
294 for (auto filter : iter.second) {
295 filter->Stop();
296 }
297 }
298 return ret;
299 }
300
StopDone()301 Status Filter::StopDone()
302 {
303 MEDIA_LOG_I("Stop in %{public}s", name_.c_str());
304 Status ret = DoStop();
305 SetErrCode(ret);
306 ChangeState(ret == Status::OK ? FilterState::STOPPED : FilterState::ERROR);
307 return ret;
308 }
309
Flush()310 Status Filter::Flush()
311 {
312 MEDIA_LOG_D("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_);
313 for (auto iter : nextFiltersMap_) {
314 for (auto filter : iter.second) {
315 filter->Flush();
316 }
317 }
318 jobIdxBase_ = jobIdx_;
319 return DoFlush();
320 }
321
Release()322 Status Filter::Release()
323 {
324 MEDIA_LOG_D("Release %{public}s, pState:%{public}d", name_.c_str(), curState_);
325 if (filterTask_) {
326 filterTask_->SubmitJobOnce([this]() {
327 ReleaseDone();
328 });
329 for (auto iter : nextFiltersMap_) {
330 for (auto filter : iter.second) {
331 filter->Release();
332 }
333 }
334 } else {
335 for (auto iter : nextFiltersMap_) {
336 for (auto filter : iter.second) {
337 filter->Release();
338 }
339 }
340 return ReleaseDone();
341 }
342 return Status::OK;
343 }
344
ReleaseDone()345 Status Filter::ReleaseDone()
346 {
347 MEDIA_LOG_I("Release in %{public}s", name_.c_str());
348 Status ret = DoRelease();
349 SetErrCode(ret);
350 ChangeState(ret == Status::OK ? FilterState::RELEASED : FilterState::ERROR);
351 return ret;
352 }
353
Preroll()354 Status Filter::Preroll()
355 {
356 Status ret = DoPreroll();
357 if (ret != Status::OK) {
358 return ret;
359 }
360 for (auto iter : nextFiltersMap_) {
361 for (auto filter : iter.second) {
362 ret = filter->Preroll();
363 if (ret != Status::OK) {
364 return ret;
365 }
366 }
367 }
368 return Status::OK;
369 }
370
WaitPrerollDone(bool render)371 Status Filter::WaitPrerollDone(bool render)
372 {
373 Status ret = Status::OK;
374 for (auto iter : nextFiltersMap_) {
375 for (auto filter : iter.second) {
376 auto curRet = filter->WaitPrerollDone(render);
377 if (curRet != Status::OK) {
378 ret = curRet;
379 }
380 }
381 }
382 auto curRet = DoWaitPrerollDone(render);
383 if (curRet != Status::OK) {
384 ret = curRet;
385 }
386 return ret;
387 }
388
StartFilterTask()389 void Filter::StartFilterTask()
390 {
391 if (filterTask_) {
392 filterTask_->Start();
393 }
394 }
395
PauseFilterTask()396 void Filter::PauseFilterTask()
397 {
398 if (filterTask_) {
399 filterTask_->Pause();
400 }
401 }
402
ClearAllNextFilters()403 Status Filter::ClearAllNextFilters()
404 {
405 nextFiltersMap_.clear();
406 return Status::OK;
407 }
408
SetPlayRange(int64_t start,int64_t end)409 Status Filter::SetPlayRange(int64_t start, int64_t end)
410 {
411 MEDIA_LOG_D("SetPlayRange %{public}ld, pState:%{public}ld", name_.c_str(), curState_);
412 for (auto iter : nextFiltersMap_) {
413 for (auto filter : iter.second) {
414 filter->SetPlayRange(start, end);
415 }
416 }
417 return DoSetPlayRange(start, end);
418 }
419
ProcessInputBuffer(int sendArg,int64_t delayUs)420 Status Filter::ProcessInputBuffer(int sendArg, int64_t delayUs)
421 {
422 MEDIA_LOG_D("Filter::ProcessInputBuffer %{public}s", name_.c_str());
423 if (filterTask_) {
424 jobIdx_++;
425 filterTask_->SubmitJob([this, sendArg]() {
426 processIdx_++;
427 DoProcessInputBuffer(sendArg, processIdx_ <= jobIdxBase_); // drop frame after flush
428 }, delayUs, 0);
429 } else {
430 Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
431 DoProcessInputBuffer(sendArg, false);
432 }
433 return Status::OK;
434 }
435
ProcessOutputBuffer(int sendArg,int64_t delayUs,bool byIdx,uint32_t idx,int64_t renderTime)436 Status Filter::ProcessOutputBuffer(int sendArg, int64_t delayUs, bool byIdx, uint32_t idx, int64_t renderTime)
437 {
438 MEDIA_LOG_D("Filter::ProcessOutputBuffer %{public}s", name_.c_str());
439 if (filterTask_) {
440 jobIdx_++;
441 int64_t processIdx = jobIdx_;
442 filterTask_->SubmitJob([this, sendArg, processIdx, byIdx, idx, renderTime]() {
443 processIdx_++;
444 // drop frame after flush
445 DoProcessOutputBuffer(sendArg, processIdx <= jobIdxBase_, byIdx, idx, renderTime);
446 }, delayUs, 0);
447 } else {
448 Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
449 DoProcessOutputBuffer(sendArg, false, false, idx, renderTime);
450 }
451 return Status::OK;
452 }
453
DoInitAfterLink()454 Status Filter::DoInitAfterLink()
455 {
456 MEDIA_LOG_I("Filter::DoInitAfterLink");
457 return Status::OK;
458 }
459
DoPrepare()460 Status Filter::DoPrepare()
461 {
462 return Status::OK;
463 }
464
DoStart()465 Status Filter::DoStart()
466 {
467 return Status::OK;
468 }
469
DoPause()470 Status Filter::DoPause()
471 {
472 return Status::OK;
473 }
474
DoPauseDragging()475 Status Filter::DoPauseDragging()
476 {
477 return Status::OK;
478 }
479
DoPauseAudioAlign()480 Status Filter::DoPauseAudioAlign()
481 {
482 return Status::OK;
483 }
484
DoResume()485 Status Filter::DoResume()
486 {
487 return Status::OK;
488 }
489
DoResumeDragging()490 Status Filter::DoResumeDragging()
491 {
492 return Status::OK;
493 }
494
DoResumeAudioAlign()495 Status Filter::DoResumeAudioAlign()
496 {
497 return Status::OK;
498 }
499
DoStop()500 Status Filter::DoStop()
501 {
502 return Status::OK;
503 }
504
DoFlush()505 Status Filter::DoFlush()
506 {
507 return Status::OK;
508 }
509
DoRelease()510 Status Filter::DoRelease()
511 {
512 return Status::OK;
513 }
514
DoPreroll()515 Status Filter::DoPreroll()
516 {
517 return Status::OK;
518 }
519
DoWaitPrerollDone(bool render)520 Status Filter::DoWaitPrerollDone(bool render)
521 {
522 return Status::OK;
523 }
524
DoSetPlayRange(int64_t start,int64_t end)525 Status Filter::DoSetPlayRange(int64_t start, int64_t end)
526 {
527 return Status::OK;
528 }
529
DoProcessInputBuffer(int recvArg,bool dropFrame)530 Status Filter::DoProcessInputBuffer(int recvArg, bool dropFrame)
531 {
532 return Status::OK;
533 }
534
DoProcessOutputBuffer(int recvArg,bool dropFrame,bool byIdx,uint32_t idx,int64_t renderTimee)535 Status Filter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTimee)
536 {
537 return Status::OK;
538 }
539
ChangeState(FilterState state)540 void Filter::ChangeState(FilterState state)
541 {
542 MEDIA_LOG_I("%{public}s > %{public}d", name_.c_str(), state);
543 AutoLock lock(stateMutex_);
544 curState_ = state;
545 cond_.NotifyOne();
546 }
547
WaitAllState(FilterState state)548 Status Filter::WaitAllState(FilterState state)
549 {
550 AutoLock lock(stateMutex_);
551 MEDIA_LOG_I("%{public}s wait %{public}d", name_.c_str(), state);
552 if (curState_ != state) {
553 bool result = cond_.WaitFor(lock, 30000, [this, state] { // 30000 ms timeout
554 return curState_ == state || curState_ == FilterState::ERROR;
555 });
556 if (!result) {
557 SetErrCode(Status::ERROR_TIMED_OUT);
558 return Status::ERROR_TIMED_OUT;
559 }
560 if (curState_ != state) {
561 MEDIA_LOG_E("Filter(%{public}s) wait state %{public}d fail, curState %{public}d",
562 name_.c_str(), state, curState_);
563 return GetErrCode();
564 }
565 }
566
567 Status res = Status::OK;
568 for (auto iter : nextFiltersMap_) {
569 for (auto filter : iter.second) {
570 if (filter->WaitAllState(state) != Status::OK) {
571 res = filter->GetErrCode();
572 }
573 }
574 }
575 return res;
576 }
577
SetErrCode(Status errCode)578 void Filter::SetErrCode(Status errCode)
579 {
580 errCode_ = errCode;
581 }
582
GetErrCode()583 Status Filter::GetErrCode()
584 {
585 return errCode_;
586 }
587
SetParameter(const std::shared_ptr<Meta> & meta)588 void Filter::SetParameter(const std::shared_ptr<Meta>& meta)
589 {
590 meta_ = meta;
591 }
592
GetParameter(std::shared_ptr<Meta> & meta)593 void Filter::GetParameter(std::shared_ptr<Meta>& meta)
594 {
595 meta = meta_;
596 }
597
LinkNext(const std::shared_ptr<Filter> &,StreamType)598 Status Filter::LinkNext(const std::shared_ptr<Filter>&, StreamType)
599 {
600 return Status::OK;
601 }
602
UpdateNext(const std::shared_ptr<Filter> &,StreamType)603 Status Filter::UpdateNext(const std::shared_ptr<Filter>&, StreamType)
604 {
605 return Status::OK;
606 }
607
UnLinkNext(const std::shared_ptr<Filter> &,StreamType)608 Status Filter::UnLinkNext(const std::shared_ptr<Filter>&, StreamType)
609 {
610 return Status::OK;
611 }
612
GetFilterType()613 FilterType Filter::GetFilterType()
614 {
615 return filterType_;
616 };
617
OnLinked(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)618 Status Filter::OnLinked(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
619 {
620 return Status::OK;
621 };
622
OnUpdated(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)623 Status Filter::OnUpdated(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
624 {
625 return Status::OK;
626 }
627
OnUnLinked(StreamType,const std::shared_ptr<FilterLinkCallback> &)628 Status Filter::OnUnLinked(StreamType, const std::shared_ptr<FilterLinkCallback>&)
629 {
630 return Status::OK;
631 }
632
633 } // namespace Pipeline
634 } // namespace Media
635 } // namespace OHOS
636