1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "audio_codec_worker.h"
17 #include "avcodec_trace.h"
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "utils.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_AUDIO, "AvCodec-AudioCodecWorker"};
24 constexpr uint8_t LOGD_FREQUENCY = 5;
25 } // namespace
26
27 namespace OHOS {
28 namespace MediaAVCodec {
29 constexpr short DEFAULT_TRY_DECODE_TIME = 10;
30 constexpr short DEFAULT_BUFFER_COUNT = 8;
31 constexpr int TIMEOUT_MS = 1000;
32 const std::string_view INPUT_BUFFER = "inputBuffer";
33 const std::string_view OUTPUT_BUFFER = "outputBuffer";
34 const std::string_view ASYNC_HANDLE_INPUT = "OS_AuCodecIn";
35 const std::string_view ASYNC_DECODE_FRAME = "OS_AuCodecOut";
36
AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> & codec,const std::shared_ptr<AVCodecCallback> & callback)37 AudioCodecWorker::AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> &codec,
38 const std::shared_ptr<AVCodecCallback> &callback)
39 : isFirFrame_(true),
40 isRunning(true),
41 codec_(codec),
42 inputBufferSize(codec_->GetInputBufferSize()),
43 outputBufferSize(codec_->GetOutputBufferSize()),
44 bufferCount(DEFAULT_BUFFER_COUNT),
45 name_(codec->GetCodecType()),
46 inputTask_(std::make_unique<TaskThread>(ASYNC_HANDLE_INPUT)),
47 outputTask_(std::make_unique<TaskThread>(ASYNC_DECODE_FRAME)),
48 callback_(callback),
49 inputBuffer_(std::make_shared<AudioBuffersManager>(inputBufferSize, INPUT_BUFFER, DEFAULT_BUFFER_COUNT)),
50 outputBuffer_(std::make_shared<AudioBuffersManager>(outputBufferSize, OUTPUT_BUFFER, DEFAULT_BUFFER_COUNT))
51 {
52 inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
53 outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
54 }
55
~AudioCodecWorker()56 AudioCodecWorker::~AudioCodecWorker()
57 {
58 AVCODEC_LOGD("release all data of %{public}s codec worker in destructor.", name_.data());
59 Dispose();
60 ResetTask();
61 ReleaseAllInBufferQueue();
62 ReleaseAllInBufferAvaQueue();
63
64 inputBuffer_->ReleaseAll();
65 outputBuffer_->ReleaseAll();
66
67 if (codec_) {
68 codec_ = nullptr;
69 }
70
71 if (callback_) {
72 callback_.reset();
73 callback_ = nullptr;
74 }
75 }
76
PushInputData(const uint32_t & index)77 bool AudioCodecWorker::PushInputData(const uint32_t &index)
78 {
79 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "%{public}s Worker PushInputData enter,index:%{public}u", name_.data(), index);
80
81 if (!isRunning) {
82 return true;
83 }
84
85 if (!callback_) {
86 AVCODEC_LOGE("push input buffer failed in worker, callback is nullptr, please check the callback.");
87 Dispose();
88 return false;
89 }
90 if (!codec_) {
91 AVCODEC_LOGE("push input buffer failed in worker, codec is nullptr, please check the codec.");
92 Dispose();
93 return false;
94 }
95
96 std::lock_guard<std::mutex> lock(stateMutex_);
97 inBufIndexQue_.push(index);
98 outputCondition_.notify_all();
99 return true;
100 }
101
Configure()102 bool AudioCodecWorker::Configure()
103 {
104 AVCODEC_LOGD("%{public}s Worker Configure enter", name_.data());
105 if (!codec_) {
106 AVCODEC_LOGE("Configure failed in worker, codec is nullptr, please check the codec.");
107 return false;
108 }
109 if (inputTask_ != nullptr) {
110 inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
111 } else {
112 AVCODEC_LOGE("Configure failed in worker, inputTask_ is nullptr, please check the inputTask_.");
113 return false;
114 }
115 if (outputTask_ != nullptr) {
116 outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
117 } else {
118 AVCODEC_LOGE("Configure failed in worker, outputTask_ is nullptr, please check the outputTask_.");
119 return false;
120 }
121 return true;
122 }
123
Start()124 bool AudioCodecWorker::Start()
125 {
126 AVCODEC_SYNC_TRACE;
127 AVCODEC_LOGD("Worker Start enter");
128 if (!callback_) {
129 AVCODEC_LOGE("Start failed in worker, callback is nullptr, please check the callback.");
130 return false;
131 }
132 if (!codec_) {
133 AVCODEC_LOGE("Start failed in worker, codec_ is nullptr, please check the codec_.");
134 return false;
135 }
136 bool result = Begin();
137 return result;
138 }
139
Stop()140 bool AudioCodecWorker::Stop()
141 {
142 AVCODEC_SYNC_TRACE;
143 AVCODEC_LOGD("Worker Stop enter");
144 Dispose();
145
146 if (inputTask_) {
147 inputTask_->StopAsync();
148 } else {
149 AVCODEC_LOGE("Stop failed in worker, inputTask_ is nullptr, please check the inputTask_.");
150 return false;
151 }
152 if (outputTask_) {
153 outputTask_->StopAsync();
154 } else {
155 AVCODEC_LOGE("Stop failed in worker, outputTask_ is nullptr, please check the outputTask_.");
156 return false;
157 }
158
159 ReleaseAllInBufferQueue();
160 ReleaseAllInBufferAvaQueue();
161
162 inputBuffer_->ReleaseAll();
163 outputBuffer_->ReleaseAll();
164 return true;
165 }
166
Pause()167 bool AudioCodecWorker::Pause()
168 {
169 AVCODEC_SYNC_TRACE;
170 AVCODEC_LOGD("Worker Pause enter");
171 Dispose();
172
173 if (inputTask_) {
174 inputTask_->PauseAsync();
175 } else {
176 AVCODEC_LOGE("Pause failed in worker, inputTask_ is nullptr, please check the inputTask_.");
177 return false;
178 }
179 if (outputTask_) {
180 outputTask_->PauseAsync();
181 } else {
182 AVCODEC_LOGE("Pause failed in worker, outputTask_ is nullptr, please check the outputTask_.");
183 return false;
184 }
185
186 ReleaseAllInBufferQueue();
187 ReleaseAllInBufferAvaQueue();
188
189 inputBuffer_->ReleaseAll();
190 outputBuffer_->ReleaseAll();
191 return true;
192 }
193
Resume()194 bool AudioCodecWorker::Resume()
195 {
196 AVCODEC_SYNC_TRACE;
197 AVCODEC_LOGD("Worker Resume enter");
198 if (!callback_) {
199 AVCODEC_LOGE("Resume failed in worker, callback_ is nullptr, please check the callback_.");
200 return false;
201 }
202 if (!codec_) {
203 AVCODEC_LOGE("Resume failed in worker, codec_ is nullptr, please check the codec_.");
204 return false;
205 }
206 bool result = Begin();
207 return result;
208 }
209
Release()210 bool AudioCodecWorker::Release()
211 {
212 AVCODEC_SYNC_TRACE;
213 AVCODEC_LOGD("Worker Release enter");
214 Dispose();
215 ResetTask();
216 ReleaseAllInBufferQueue();
217 ReleaseAllInBufferAvaQueue();
218
219 inputBuffer_->ReleaseAll();
220 outputBuffer_->ReleaseAll();
221 if (codec_) {
222 codec_ = nullptr;
223 }
224 if (callback_) {
225 callback_.reset();
226 callback_ = nullptr;
227 }
228 AVCODEC_LOGD("Worker Release end");
229 return true;
230 }
231
GetInputBuffer() const232 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetInputBuffer() const noexcept
233 {
234 AVCODEC_LOGD("Worker GetInputBuffer enter");
235 return inputBuffer_;
236 }
237
GetOutputBuffer() const238 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetOutputBuffer() const noexcept
239 {
240 AVCODEC_LOGD("Worker GetOutputBuffer enter");
241 return outputBuffer_;
242 }
243
GetOutputBufferInfo(const uint32_t & index) const244 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetOutputBufferInfo(const uint32_t &index) const noexcept
245 {
246 return outputBuffer_->getMemory(index);
247 }
248
GetInputBufferInfo(const uint32_t & index) const249 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetInputBufferInfo(const uint32_t &index) const noexcept
250 {
251 return inputBuffer_->getMemory(index);
252 }
253
ProduceInputBuffer()254 void AudioCodecWorker::ProduceInputBuffer()
255 {
256 AVCODEC_SYNC_TRACE;
257 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer enter");
258 if (!isRunning) {
259 usleep(DEFAULT_TRY_DECODE_TIME);
260 return;
261 }
262 std::unique_lock lock(inputMutex_);
263 while (!inBufAvaIndexQue_.empty() && isRunning) {
264 uint32_t index;
265 {
266 std::lock_guard<std::mutex> avaLock(inAvaMutex_);
267 index = inBufAvaIndexQue_.front();
268 inBufAvaIndexQue_.pop();
269 }
270 auto inputBuffer = GetInputBufferInfo(index);
271 inputBuffer->SetBufferOwned();
272 callback_->OnInputBufferAvailable(index, inputBuffer->GetBuffer());
273 }
274 inputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
275 [this] { return (!inBufAvaIndexQue_.empty() || !isRunning); });
276 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer exit");
277 }
278
HandInputBuffer(int32_t & ret)279 bool AudioCodecWorker::HandInputBuffer(int32_t &ret)
280 {
281 uint32_t inputIndex;
282 {
283 std::lock_guard<std::mutex> lock(stateMutex_);
284 inputIndex = inBufIndexQue_.front();
285 inBufIndexQue_.pop();
286 }
287 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "handle input buffer. index:%{public}u", inputIndex);
288 auto inputBuffer = GetInputBufferInfo(inputIndex);
289 bool isEos = inputBuffer->CheckIsEos();
290 ret = codec_->ProcessSendData(inputBuffer);
291 inputBuffer_->ReleaseBuffer(inputIndex);
292 {
293 std::lock_guard<std::mutex> lock(inAvaMutex_);
294 inBufAvaIndexQue_.push(inputIndex);
295 inputCondition_.notify_all();
296 }
297 if (ret == AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA) {
298 callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
299 }
300 return isEos;
301 }
302
ReleaseOutputBuffer(const uint32_t & index,const int32_t & ret)303 void AudioCodecWorker::ReleaseOutputBuffer(const uint32_t &index, const int32_t &ret)
304 {
305 outputBuffer_->ReleaseBuffer(index);
306 callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
307 }
308
SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> & outBuffer,bool isEos,uint32_t index)309 void AudioCodecWorker::SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> &outBuffer, bool isEos, uint32_t index)
310 {
311 if (isEos) {
312 AVCODEC_LOGD("set buffer EOS. index:%{public}u", index);
313 outBuffer->SetEos(isEos);
314 }
315 if (isFirFrame_) {
316 outBuffer->SetFirstFrame();
317 isFirFrame_ = false;
318 }
319 }
320
ConsumerOutputBuffer()321 void AudioCodecWorker::ConsumerOutputBuffer()
322 {
323 AVCODEC_SYNC_TRACE;
324 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker consumerOutputBuffer enter");
325 if (!isRunning) {
326 usleep(DEFAULT_TRY_DECODE_TIME);
327 return;
328 }
329 std::unique_lock lock(outputMutex_);
330 while (!inBufIndexQue_.empty() && isRunning) {
331 int32_t ret;
332 bool isEos = HandInputBuffer(ret);
333 if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
334 AVCODEC_LOGW("current input buffer is not enough,skip this frame");
335 continue;
336 }
337 if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
338 AVCODEC_LOGE("input error!");
339 return;
340 }
341 uint32_t index;
342 if (outputBuffer_->RequestAvailableIndex(index)) {
343 auto outBuffer = GetOutputBufferInfo(index);
344 SetFirstAndEosStatus(outBuffer, isEos, index);
345 ret = codec_->ProcessRecieveData(outBuffer);
346 if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
347 AVCODEC_LOGD("current ouput buffer is not enough,skip this frame. index:%{public}u", index);
348 outputBuffer_->ReleaseBuffer(index);
349 continue;
350 }
351 if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
352 AVCODEC_LOGE("process output buffer error! index:%{public}u", index);
353 ReleaseOutputBuffer(index, ret);
354 return;
355 }
356 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work %{public}s consumerOutputBuffer callback_ index:%{public}u",
357 name_.data(), index);
358 callback_->OnOutputBufferAvailable(index, outBuffer->GetBufferAttr(), outBuffer->GetFlag(),
359 outBuffer->GetBuffer());
360 }
361 }
362 outputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
363 [this] { return (inBufIndexQue_.size() > 0 || !isRunning); });
364 AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work consumerOutputBuffer exit");
365 }
366
Dispose()367 void AudioCodecWorker::Dispose()
368 {
369 AVCODEC_LOGD("Worker dispose enter");
370 isRunning = false;
371 outputBuffer_->DisableRunning();
372 {
373 std::unique_lock lock(inputMutex_);
374 inputCondition_.notify_all();
375 }
376 {
377 std::unique_lock lock(outputMutex_);
378 outputCondition_.notify_all();
379 }
380 }
381
Begin()382 bool AudioCodecWorker::Begin()
383 {
384 AVCODEC_LOGD("Worker begin enter");
385 for (uint32_t i = 0; i < static_cast<uint32_t>(bufferCount); i++) {
386 inBufAvaIndexQue_.push(i);
387 }
388 isRunning = true;
389
390 inputBuffer_->SetRunning();
391 outputBuffer_->SetRunning();
392
393 if (inputTask_) {
394 inputTask_->Start();
395 } else {
396 return false;
397 }
398 if (outputTask_) {
399 outputTask_->Start();
400 } else {
401 return false;
402 }
403 inputCondition_.notify_all();
404 outputCondition_.notify_all();
405 return true;
406 }
407
ReleaseAllInBufferQueue()408 void AudioCodecWorker::ReleaseAllInBufferQueue()
409 {
410 std::lock_guard<std::mutex> lock(stateMutex_);
411 while (!inBufIndexQue_.empty()) {
412 inBufIndexQue_.pop();
413 }
414 }
415
ReleaseAllInBufferAvaQueue()416 void AudioCodecWorker::ReleaseAllInBufferAvaQueue()
417 {
418 std::lock_guard<std::mutex> lock(inAvaMutex_);
419 while (!inBufAvaIndexQue_.empty()) {
420 inBufAvaIndexQue_.pop();
421 }
422 }
423
ResetTask()424 void AudioCodecWorker::ResetTask()
425 {
426 if (inputTask_) {
427 inputTask_->Stop();
428 inputTask_.reset();
429 inputTask_ = nullptr;
430 }
431 if (outputTask_) {
432 outputTask_->Stop();
433 outputTask_.reset();
434 outputTask_ = nullptr;
435 }
436 }
437 } // namespace MediaAVCodec
438 } // namespace OHOS