1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "Filter"
17 #define MEDIA_PIPELINE
18 
19 #include "filter/filter.h"
20 #include "osal/utils/util.h"
21 #include "common/log.h"
22 #include <algorithm>
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Filter" };
26 }
27 
28 namespace OHOS {
29 namespace Media {
30 namespace Pipeline {
Filter(std::string name,FilterType type,bool isAyncMode)31 Filter::Filter(std::string name, FilterType type, bool isAyncMode)
32     : name_(std::move(name)), filterType_(type), isAsyncMode_(isAyncMode)
33 {
34 }
35 
~Filter()36 Filter::~Filter()
37 {
38     nextFiltersMap_.clear();
39 }
40 
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback)41 void Filter::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback)
42 {
43     receiver_ = receiver;
44     callback_ = callback;
45 }
46 
LinkPipeLine(const std::string & groupId)47 void Filter::LinkPipeLine(const std::string &groupId)
48 {
49     groupId_ = groupId;
50     MEDIA_LOG_I("Filter %{public}s LinkPipeLine:%{public}s, isAsyncMode_:%{public}d",
51         name_.c_str(), groupId.c_str(), isAsyncMode_);
52     if (isAsyncMode_) {
53         TaskType taskType;
54         switch (filterType_) {
55             case FilterType::FILTERTYPE_VENC:
56             case FilterType::FILTERTYPE_VDEC:
57             case FilterType::VIDEO_CAPTURE:
58                 taskType = TaskType::SINGLETON;
59                 break;
60             case FilterType::FILTERTYPE_ASINK:
61             case FilterType::AUDIO_CAPTURE:
62                 taskType = TaskType::AUDIO;
63                 break;
64             default:
65                 taskType = TaskType::SINGLETON;
66                 break;
67         }
68         filterTask_ = std::make_unique<Task>(name_, groupId_, taskType, TaskPriority::HIGH, false);
69         filterTask_->SubmitJobOnce([this] {
70             Status ret = DoInitAfterLink();
71             SetErrCode(ret);
72             ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
73         });
74     } else {
75         Status ret = DoInitAfterLink();
76         SetErrCode(ret);
77         ChangeState(ret == Status::OK ? FilterState::INITIALIZED : FilterState::ERROR);
78     }
79 }
80 
Prepare()81 Status Filter::Prepare()
82 {
83     MEDIA_LOG_D("Prepare %{public}s, pState:%{public}d", name_.c_str(), curState_);
84     if (filterTask_) {
85         filterTask_->SubmitJobOnce([this] {
86             PrepareDone();
87         });
88     } else {
89         return PrepareDone();
90     }
91     return Status::OK;
92 }
93 
PrepareDone()94 Status Filter::PrepareDone()
95 {
96     MEDIA_LOG_I("Prepare enter %{public}s", name_.c_str());
97     if (curState_ == FilterState::ERROR) {
98         // if DoInitAfterLink error, do not prepare.
99         return Status::ERROR_INVALID_OPERATION;
100     }
101     // next filters maybe added in DoPrepare, so we must DoPrepare first
102     Status ret = DoPrepare();
103     SetErrCode(ret);
104     if (ret != Status::OK) {
105         ChangeState(FilterState::ERROR);
106         return ret;
107     }
108     for (auto iter : nextFiltersMap_) {
109         for (auto filter : iter.second) {
110             auto ret = filter->Prepare();
111             if (ret != Status::OK) {
112                 return ret;
113             }
114         }
115     }
116     ChangeState(FilterState::READY);
117     return ret;
118 }
119 
Start()120 Status Filter::Start()
121 {
122     MEDIA_LOG_D("Start %{public}s, pState:%{public}d", name_.c_str(), curState_);
123     if (filterTask_) {
124         filterTask_->SubmitJobOnce([this] {
125             StartDone();
126             filterTask_->Start();
127         });
128         for (auto iter : nextFiltersMap_) {
129             for (auto filter : iter.second) {
130                 filter->Start();
131             }
132         }
133     } else {
134         for (auto iter : nextFiltersMap_) {
135             for (auto filter : iter.second) {
136                 filter->Start();
137             }
138         }
139         return StartDone();
140     }
141     return Status::OK;
142 }
143 
StartDone()144 Status Filter::StartDone()
145 {
146     MEDIA_LOG_I("Start in %{public}s", name_.c_str());
147     Status ret = DoStart();
148     SetErrCode(ret);
149     ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
150     return ret;
151 }
152 
Pause()153 Status Filter::Pause()
154 {
155     MEDIA_LOG_D("Pause %{public}s, pState:%{public}d", name_.c_str(), curState_);
156     // In offload case, we need pause to interrupt audio_sink_plugin write function,  so do not use asyncmode
157     auto ret = PauseDone();
158     if (filterTask_) {
159         filterTask_->Pause();
160     }
161     for (auto iter : nextFiltersMap_) {
162         for (auto filter : iter.second) {
163             filter->Pause();
164         }
165     }
166     return ret;
167 }
168 
PauseDragging()169 Status Filter::PauseDragging()
170 {
171     MEDIA_LOG_D("PauseDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
172     auto ret = DoPauseDragging();
173     if (filterTask_) {
174         filterTask_->Pause();
175     }
176     for (auto iter : nextFiltersMap_) {
177         for (auto filter : iter.second) {
178             auto curRet = filter->PauseDragging();
179             if (curRet != Status::OK) {
180                 ret = curRet;
181             }
182         }
183     }
184     return ret;
185 }
186 
PauseAudioAlign()187 Status Filter::PauseAudioAlign()
188 {
189     MEDIA_LOG_D("PauseAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
190     auto ret = DoPauseAudioAlign();
191     if (filterTask_) {
192         filterTask_->Pause();
193     }
194     for (auto iter : nextFiltersMap_) {
195         for (auto filter : iter.second) {
196             auto curRet = filter->PauseAudioAlign();
197             if (curRet != Status::OK) {
198                 ret = curRet;
199             }
200         }
201     }
202     return ret;
203 }
204 
PauseDone()205 Status Filter::PauseDone()
206 {
207     MEDIA_LOG_I("Pause in %{public}s", name_.c_str());
208     Status ret = DoPause();
209     SetErrCode(ret);
210     ChangeState(ret == Status::OK ? FilterState::PAUSED : FilterState::ERROR);
211     return ret;
212 }
213 
Resume()214 Status Filter::Resume()
215 {
216     MEDIA_LOG_D("Resume %{public}s, pState:%{public}d", name_.c_str(), curState_);
217     if (filterTask_) {
218         filterTask_->SubmitJobOnce([this]() {
219             ResumeDone();
220             filterTask_->Start();
221         });
222         for (auto iter : nextFiltersMap_) {
223             for (auto filter : iter.second) {
224                 filter->Resume();
225             }
226         }
227     } else {
228         for (auto iter : nextFiltersMap_) {
229             for (auto filter : iter.second) {
230                 filter->Resume();
231             }
232         }
233         return ResumeDone();
234     }
235     return Status::OK;
236 }
237 
ResumeDone()238 Status Filter::ResumeDone()
239 {
240     MEDIA_LOG_I("Resume in %{public}s", name_.c_str());
241     Status ret = DoResume();
242     SetErrCode(ret);
243     ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR);
244     return ret;
245 }
246 
ResumeDragging()247 Status Filter::ResumeDragging()
248 {
249     MEDIA_LOG_D("ResumeDragging %{public}s, pState:%{public}d", name_.c_str(), curState_);
250     auto ret = Status::OK;
251     ret = DoResumeDragging();
252     if (filterTask_) {
253         filterTask_->Start();
254     }
255     for (auto iter : nextFiltersMap_) {
256         for (auto filter : iter.second) {
257             auto curRet = filter->ResumeDragging();
258             if (curRet != Status::OK) {
259                 ret = curRet;
260             }
261         }
262     }
263     return ret;
264 }
265 
ResumeAudioAlign()266 Status Filter::ResumeAudioAlign()
267 {
268     MEDIA_LOG_D("ResumeAudioAlign %{public}s, pState:%{public}d", name_.c_str(), curState_);
269     auto ret = Status::OK;
270     ret = DoResumeAudioAlign();
271     if (filterTask_) {
272         filterTask_->Start();
273     }
274     for (auto iter : nextFiltersMap_) {
275         for (auto filter : iter.second) {
276             auto curRet = filter->ResumeAudioAlign();
277             if (curRet != Status::OK) {
278                 ret = curRet;
279             }
280         }
281     }
282     return ret;
283 }
284 
Stop()285 Status Filter::Stop()
286 {
287     MEDIA_LOG_D("Stop %{public}s, pState:%{public}d", name_.c_str(), curState_);
288     // In offload case, we need stop to interrupt audio_sink_plugin write function,  so do not use asyncmode
289     auto ret = StopDone();
290     if (filterTask_) {
291         filterTask_->Stop();
292     }
293     for (auto iter : nextFiltersMap_) {
294         for (auto filter : iter.second) {
295             filter->Stop();
296         }
297     }
298     return ret;
299 }
300 
StopDone()301 Status Filter::StopDone()
302 {
303     MEDIA_LOG_I("Stop in %{public}s", name_.c_str());
304     Status ret = DoStop();
305     SetErrCode(ret);
306     ChangeState(ret == Status::OK ? FilterState::STOPPED : FilterState::ERROR);
307     return ret;
308 }
309 
Flush()310 Status Filter::Flush()
311 {
312     MEDIA_LOG_D("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_);
313     for (auto iter : nextFiltersMap_) {
314         for (auto filter : iter.second) {
315             filter->Flush();
316         }
317     }
318     jobIdxBase_ = jobIdx_;
319     return DoFlush();
320 }
321 
Release()322 Status Filter::Release()
323 {
324     MEDIA_LOG_D("Release %{public}s, pState:%{public}d", name_.c_str(), curState_);
325     if (filterTask_) {
326         filterTask_->SubmitJobOnce([this]() {
327             ReleaseDone();
328         });
329         for (auto iter : nextFiltersMap_) {
330             for (auto filter : iter.second) {
331                 filter->Release();
332             }
333         }
334     } else {
335         for (auto iter : nextFiltersMap_) {
336             for (auto filter : iter.second) {
337                 filter->Release();
338             }
339         }
340         return ReleaseDone();
341     }
342     return Status::OK;
343 }
344 
ReleaseDone()345 Status Filter::ReleaseDone()
346 {
347     MEDIA_LOG_I("Release in %{public}s", name_.c_str());
348     Status ret = DoRelease();
349     SetErrCode(ret);
350     ChangeState(ret == Status::OK ? FilterState::RELEASED : FilterState::ERROR);
351     return ret;
352 }
353 
Preroll()354 Status Filter::Preroll()
355 {
356     Status ret = DoPreroll();
357     if (ret != Status::OK) {
358         return ret;
359     }
360     for (auto iter : nextFiltersMap_) {
361         for (auto filter : iter.second) {
362             ret = filter->Preroll();
363             if (ret != Status::OK) {
364                 return ret;
365             }
366         }
367     }
368     return Status::OK;
369 }
370 
WaitPrerollDone(bool render)371 Status Filter::WaitPrerollDone(bool render)
372 {
373     Status ret = Status::OK;
374     for (auto iter : nextFiltersMap_) {
375         for (auto filter : iter.second) {
376             auto curRet = filter->WaitPrerollDone(render);
377             if (curRet != Status::OK) {
378                 ret = curRet;
379             }
380         }
381     }
382     auto curRet = DoWaitPrerollDone(render);
383     if (curRet != Status::OK) {
384         ret = curRet;
385     }
386     return ret;
387 }
388 
StartFilterTask()389 void Filter::StartFilterTask()
390 {
391     if (filterTask_) {
392         filterTask_->Start();
393     }
394 }
395 
PauseFilterTask()396 void Filter::PauseFilterTask()
397 {
398     if (filterTask_) {
399         filterTask_->Pause();
400     }
401 }
402 
ClearAllNextFilters()403 Status Filter::ClearAllNextFilters()
404 {
405     nextFiltersMap_.clear();
406     return Status::OK;
407 }
408 
SetPlayRange(int64_t start,int64_t end)409 Status Filter::SetPlayRange(int64_t start, int64_t end)
410 {
411     MEDIA_LOG_D("SetPlayRange %{public}ld, pState:%{public}ld", name_.c_str(), curState_);
412     for (auto iter : nextFiltersMap_) {
413         for (auto filter : iter.second) {
414             filter->SetPlayRange(start, end);
415         }
416     }
417     return DoSetPlayRange(start, end);
418 }
419 
ProcessInputBuffer(int sendArg,int64_t delayUs)420 Status Filter::ProcessInputBuffer(int sendArg, int64_t delayUs)
421 {
422     MEDIA_LOG_D("Filter::ProcessInputBuffer  %{public}s", name_.c_str());
423     if (filterTask_) {
424         jobIdx_++;
425         filterTask_->SubmitJob([this, sendArg]() {
426             processIdx_++;
427             DoProcessInputBuffer(sendArg, processIdx_ <= jobIdxBase_);  // drop frame after flush
428         }, delayUs, 0);
429     } else {
430         Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
431         DoProcessInputBuffer(sendArg, false);
432     }
433     return Status::OK;
434 }
435 
ProcessOutputBuffer(int sendArg,int64_t delayUs,bool byIdx,uint32_t idx,int64_t renderTime)436 Status Filter::ProcessOutputBuffer(int sendArg, int64_t delayUs, bool byIdx, uint32_t idx, int64_t renderTime)
437 {
438     MEDIA_LOG_D("Filter::ProcessOutputBuffer  %{public}s", name_.c_str());
439     if (filterTask_) {
440         jobIdx_++;
441         int64_t processIdx = jobIdx_;
442         filterTask_->SubmitJob([this, sendArg, processIdx, byIdx, idx, renderTime]() {
443             processIdx_++;
444             // drop frame after flush
445             DoProcessOutputBuffer(sendArg, processIdx <= jobIdxBase_, byIdx, idx, renderTime);
446         }, delayUs, 0);
447     } else {
448         Task::SleepInTask(delayUs / 1000); // 1000 convert to ms
449         DoProcessOutputBuffer(sendArg, false, false, idx, renderTime);
450     }
451     return Status::OK;
452 }
453 
DoInitAfterLink()454 Status Filter::DoInitAfterLink()
455 {
456     MEDIA_LOG_I("Filter::DoInitAfterLink");
457     return Status::OK;
458 }
459 
DoPrepare()460 Status Filter::DoPrepare()
461 {
462     return Status::OK;
463 }
464 
DoStart()465 Status Filter::DoStart()
466 {
467     return Status::OK;
468 }
469 
DoPause()470 Status Filter::DoPause()
471 {
472     return Status::OK;
473 }
474 
DoPauseDragging()475 Status Filter::DoPauseDragging()
476 {
477     return Status::OK;
478 }
479 
DoPauseAudioAlign()480 Status Filter::DoPauseAudioAlign()
481 {
482     return Status::OK;
483 }
484 
DoResume()485 Status Filter::DoResume()
486 {
487     return Status::OK;
488 }
489 
DoResumeDragging()490 Status Filter::DoResumeDragging()
491 {
492     return Status::OK;
493 }
494 
DoResumeAudioAlign()495 Status Filter::DoResumeAudioAlign()
496 {
497     return Status::OK;
498 }
499 
DoStop()500 Status Filter::DoStop()
501 {
502     return Status::OK;
503 }
504 
DoFlush()505 Status Filter::DoFlush()
506 {
507     return Status::OK;
508 }
509 
DoRelease()510 Status Filter::DoRelease()
511 {
512     return Status::OK;
513 }
514 
DoPreroll()515 Status Filter::DoPreroll()
516 {
517     return Status::OK;
518 }
519 
DoWaitPrerollDone(bool render)520 Status Filter::DoWaitPrerollDone(bool render)
521 {
522     return Status::OK;
523 }
524 
DoSetPlayRange(int64_t start,int64_t end)525 Status Filter::DoSetPlayRange(int64_t start, int64_t end)
526 {
527     return Status::OK;
528 }
529 
DoProcessInputBuffer(int recvArg,bool dropFrame)530 Status Filter::DoProcessInputBuffer(int recvArg, bool dropFrame)
531 {
532     return Status::OK;
533 }
534 
DoProcessOutputBuffer(int recvArg,bool dropFrame,bool byIdx,uint32_t idx,int64_t renderTimee)535 Status Filter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTimee)
536 {
537     return Status::OK;
538 }
539 
ChangeState(FilterState state)540 void Filter::ChangeState(FilterState state)
541 {
542     MEDIA_LOG_I("%{public}s > %{public}d", name_.c_str(), state);
543     AutoLock lock(stateMutex_);
544     curState_ = state;
545     cond_.NotifyOne();
546 }
547 
WaitAllState(FilterState state)548 Status Filter::WaitAllState(FilterState state)
549 {
550     AutoLock lock(stateMutex_);
551     MEDIA_LOG_I("%{public}s wait %{public}d", name_.c_str(), state);
552     if (curState_ != state) {
553         bool result = cond_.WaitFor(lock, 30000, [this, state] { // 30000 ms timeout
554             return curState_ == state || curState_ == FilterState::ERROR;
555         });
556         if (!result) {
557             SetErrCode(Status::ERROR_TIMED_OUT);
558             return Status::ERROR_TIMED_OUT;
559         }
560         if (curState_ != state) {
561             MEDIA_LOG_E("Filter(%{public}s) wait state %{public}d fail, curState %{public}d",
562                 name_.c_str(), state, curState_);
563             return GetErrCode();
564         }
565     }
566 
567     Status res = Status::OK;
568     for (auto iter : nextFiltersMap_) {
569         for (auto filter : iter.second) {
570             if (filter->WaitAllState(state) != Status::OK) {
571                 res = filter->GetErrCode();
572             }
573         }
574     }
575     return res;
576 }
577 
SetErrCode(Status errCode)578 void Filter::SetErrCode(Status errCode)
579 {
580     errCode_ = errCode;
581 }
582 
GetErrCode()583 Status Filter::GetErrCode()
584 {
585     return errCode_;
586 }
587 
SetParameter(const std::shared_ptr<Meta> & meta)588 void Filter::SetParameter(const std::shared_ptr<Meta>& meta)
589 {
590     meta_ = meta;
591 }
592 
GetParameter(std::shared_ptr<Meta> & meta)593 void Filter::GetParameter(std::shared_ptr<Meta>& meta)
594 {
595     meta = meta_;
596 }
597 
LinkNext(const std::shared_ptr<Filter> &,StreamType)598 Status Filter::LinkNext(const std::shared_ptr<Filter>&, StreamType)
599 {
600     return Status::OK;
601 }
602 
UpdateNext(const std::shared_ptr<Filter> &,StreamType)603 Status Filter::UpdateNext(const std::shared_ptr<Filter>&, StreamType)
604 {
605     return Status::OK;
606 }
607 
UnLinkNext(const std::shared_ptr<Filter> &,StreamType)608 Status Filter::UnLinkNext(const std::shared_ptr<Filter>&, StreamType)
609 {
610     return Status::OK;
611 }
612 
GetFilterType()613 FilterType Filter::GetFilterType()
614 {
615     return filterType_;
616 };
617 
OnLinked(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)618 Status Filter::OnLinked(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
619 {
620     return Status::OK;
621 };
622 
OnUpdated(StreamType,const std::shared_ptr<Meta> &,const std::shared_ptr<FilterLinkCallback> &)623 Status Filter::OnUpdated(StreamType, const std::shared_ptr<Meta>&, const std::shared_ptr<FilterLinkCallback>&)
624 {
625     return Status::OK;
626 }
627 
OnUnLinked(StreamType,const std::shared_ptr<FilterLinkCallback> &)628 Status Filter::OnUnLinked(StreamType, const std::shared_ptr<FilterLinkCallback>&)
629 {
630     return Status::OK;
631 }
632 
633 } // namespace Pipeline
634 } // namespace Media
635 } // namespace OHOS
636