/* * Copyright (c) 2023-2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define MEDIA_PIPELINE #define HST_LOG_TAG "Pipeline" #include #include #include "pipeline/pipeline.h" #include "osal/task/autolock.h" #include "osal/task/jobutils.h" #include "common/log.h" #include "osal/utils/hitrace_utils.h" namespace { constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" }; } namespace OHOS { namespace Media { namespace Pipeline { static std::atomic pipeLineId = 0; int32_t Pipeline::GetNextPipelineId() { return pipeLineId++; } Pipeline::~Pipeline() { } void Pipeline::Init(const std::shared_ptr& receiver, const std::shared_ptr& callback, const std::string &groupId) { MEDIA_LOG_I("Pipeline::Init"); eventReceiver_ = receiver; filterCallback_ = callback; groupId_ = groupId; } Status Pipeline::Prepare() { MEDIA_LOG_I("Prepare enter."); Status ret = Status::OK; SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->Prepare(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->WaitAllState(FilterState::READY); if (ret != Status::OK) { return; } } }); MEDIA_LOG_I("Prepare done ret = %{public}d", ret); return ret; } Status Pipeline::Start() { MEDIA_LOG_I("Start enter."); Status ret = Status::OK; SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->Start(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->WaitAllState(FilterState::RUNNING); if (ret != Status::OK) { return; } } }); MEDIA_LOG_I("Start done ret = %{public}d", ret); return ret; } Status Pipeline::Pause() { MEDIA_LOG_I("Pause enter."); Status ret = Status::OK; SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { auto rtv = (*it)->Pause(); if (rtv != Status::OK) { ret = rtv; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { auto rtv = (*it)->WaitAllState(FilterState::PAUSED); if (rtv != Status::OK) { ret = rtv; } } }); MEDIA_LOG_I("Pause done ret = %{public}d", ret); return ret; } Status Pipeline::Resume() { MEDIA_LOG_I("Resume enter."); Status ret = Status::OK; SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->Resume(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { ret = (*it)->WaitAllState(FilterState::RUNNING); if (ret != Status::OK) { return; } } }); MEDIA_LOG_I("Resume done ret = %{public}d", ret); return ret; } Status Pipeline::Stop() { MEDIA_LOG_I("Stop enter."); Status ret = Status::OK; SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { if (*it == nullptr) { MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size()); continue; } auto rtv = (*it)->Stop(); if (rtv != Status::OK) { ret = rtv; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { auto rtv = (*it)->WaitAllState(FilterState::STOPPED); if (rtv != Status::OK) { ret = rtv; } } filters_.clear(); }); MEDIA_LOG_I("Stop done ret = %{public}d", ret); return ret; } Status Pipeline::Flush() { MEDIA_LOG_I("Flush enter."); SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { (*it)->Flush(); } }); MEDIA_LOG_I("Flush done."); return Status::OK; } Status Pipeline::Release() { MEDIA_LOG_I("Release enter."); SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { (*it)->Release(); } for (auto it = filters_.begin(); it != filters_.end(); ++it) { (*it)->WaitAllState(FilterState::RELEASED); } filters_.clear(); }); MEDIA_LOG_I("Release done."); return Status::OK; } Status Pipeline::Preroll(bool render) { MEDIA_LOG_I("Preroll enter."); Status ret = Status::OK; AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { auto rtv = (*it)->Preroll(); if (rtv != Status::OK) { ret = rtv; MEDIA_LOG_I("Preroll done ret = %{public}d", ret); return ret; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { auto rtv = (*it)->WaitPrerollDone(render); if (rtv != Status::OK) { ret = rtv; MEDIA_LOG_I("Preroll done ret = %{public}d", ret); return ret; } } MEDIA_LOG_I("Preroll done ret = %{public}d", ret); return ret; } Status Pipeline::SetPlayRange(int64_t start, int64_t end) { MEDIA_LOG_I("SetPlayRange enter."); SubmitJobOnce([&] { AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { (*it)->SetPlayRange(start, end); } }); MEDIA_LOG_I("SetPlayRange done."); return Status::OK; } Status Pipeline::AddHeadFilters(std::vector> filtersIn) { MEDIA_LOG_I("AddHeadFilters enter."); std::vector> filtersToAdd; for (auto& filterIn : filtersIn) { bool matched = false; for (const auto& filter : filters_) { if (filterIn == filter) { matched = true; break; } } if (!matched) { filtersToAdd.push_back(filterIn); filterIn->LinkPipeLine(groupId_); } } if (filtersToAdd.empty()) { MEDIA_LOG_I("filter already exists"); return Status::OK; } SubmitJobOnce([&] { AutoLock lock(mutex_); this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end()); }); MEDIA_LOG_I("AddHeadFilters done."); return Status::OK; } Status Pipeline::RemoveHeadFilter(const std::shared_ptr& filter) { SubmitJobOnce([&] { AutoLock lock(mutex_); auto it = std::find_if(filters_.begin(), filters_.end(), [&filter](const std::shared_ptr& filterPtr) { return filterPtr == filter; }); if (it != filters_.end()) { filters_.erase(it); } filter->Release(); filter->WaitAllState(FilterState::RELEASED); filter->ClearAllNextFilters(); return Status::OK; }); return Status::OK; } Status Pipeline::LinkFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { for (auto nextFilter : nextFilters) { auto ret = preFilter->LinkNext(nextFilter, type); nextFilter->LinkPipeLine(groupId_); FALSE_RETURN_V(ret == Status::OK, ret); } return Status::OK; } Status Pipeline::UpdateFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { for (auto nextFilter : nextFilters) { preFilter->UpdateNext(nextFilter, type); } return Status::OK; } Status Pipeline::UnLinkFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { for (auto nextFilter : nextFilters) { preFilter->UnLinkNext(nextFilter, type); } return Status::OK; } void Pipeline::OnEvent(const Event& event) { } } // namespace Pipeline } // namespace Media } // namespace OHOS