1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "audio_codec_worker.h"
17 #include "avcodec_trace.h"
18 #include "avcodec_errors.h"
19 #include "avcodec_log.h"
20 #include "utils.h"
21 
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_AUDIO, "AvCodec-AudioCodecWorker"};
24 constexpr uint8_t LOGD_FREQUENCY = 5;
25 } // namespace
26 
27 namespace OHOS {
28 namespace MediaAVCodec {
29 constexpr short DEFAULT_TRY_DECODE_TIME = 10;
30 constexpr short DEFAULT_BUFFER_COUNT = 8;
31 constexpr int TIMEOUT_MS = 1000;
32 const std::string_view INPUT_BUFFER = "inputBuffer";
33 const std::string_view OUTPUT_BUFFER = "outputBuffer";
34 const std::string_view ASYNC_HANDLE_INPUT = "OS_AuCodecIn";
35 const std::string_view ASYNC_DECODE_FRAME = "OS_AuCodecOut";
36 
AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> & codec,const std::shared_ptr<AVCodecCallback> & callback)37 AudioCodecWorker::AudioCodecWorker(const std::shared_ptr<AudioBaseCodec> &codec,
38                                    const std::shared_ptr<AVCodecCallback> &callback)
39     : isFirFrame_(true),
40       isRunning(true),
41       codec_(codec),
42       inputBufferSize(codec_->GetInputBufferSize()),
43       outputBufferSize(codec_->GetOutputBufferSize()),
44       bufferCount(DEFAULT_BUFFER_COUNT),
45       name_(codec->GetCodecType()),
46       inputTask_(std::make_unique<TaskThread>(ASYNC_HANDLE_INPUT)),
47       outputTask_(std::make_unique<TaskThread>(ASYNC_DECODE_FRAME)),
48       callback_(callback),
49       inputBuffer_(std::make_shared<AudioBuffersManager>(inputBufferSize, INPUT_BUFFER, DEFAULT_BUFFER_COUNT)),
50       outputBuffer_(std::make_shared<AudioBuffersManager>(outputBufferSize, OUTPUT_BUFFER, DEFAULT_BUFFER_COUNT))
51 {
52     inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
53     outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
54 }
55 
~AudioCodecWorker()56 AudioCodecWorker::~AudioCodecWorker()
57 {
58     AVCODEC_LOGD("release all data of %{public}s codec worker in destructor.", name_.data());
59     Dispose();
60     ResetTask();
61     ReleaseAllInBufferQueue();
62     ReleaseAllInBufferAvaQueue();
63 
64     inputBuffer_->ReleaseAll();
65     outputBuffer_->ReleaseAll();
66 
67     if (codec_) {
68         codec_ = nullptr;
69     }
70 
71     if (callback_) {
72         callback_.reset();
73         callback_ = nullptr;
74     }
75 }
76 
PushInputData(const uint32_t & index)77 bool AudioCodecWorker::PushInputData(const uint32_t &index)
78 {
79     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "%{public}s Worker PushInputData enter,index:%{public}u", name_.data(), index);
80 
81     if (!isRunning) {
82         return true;
83     }
84 
85     if (!callback_) {
86         AVCODEC_LOGE("push input buffer failed in worker, callback is nullptr, please check the callback.");
87         Dispose();
88         return false;
89     }
90     if (!codec_) {
91         AVCODEC_LOGE("push input buffer failed in worker, codec is nullptr, please check the codec.");
92         Dispose();
93         return false;
94     }
95 
96     std::lock_guard<std::mutex> lock(stateMutex_);
97     inBufIndexQue_.push(index);
98     outputCondition_.notify_all();
99     return true;
100 }
101 
Configure()102 bool AudioCodecWorker::Configure()
103 {
104     AVCODEC_LOGD("%{public}s Worker Configure enter", name_.data());
105     if (!codec_) {
106         AVCODEC_LOGE("Configure failed in worker, codec is nullptr, please check the codec.");
107         return false;
108     }
109     if (inputTask_ != nullptr) {
110         inputTask_->RegisterHandler([this] { ProduceInputBuffer(); });
111     } else {
112         AVCODEC_LOGE("Configure failed in worker, inputTask_ is nullptr, please check the inputTask_.");
113         return false;
114     }
115     if (outputTask_ != nullptr) {
116         outputTask_->RegisterHandler([this] { ConsumerOutputBuffer(); });
117     } else {
118         AVCODEC_LOGE("Configure failed in worker, outputTask_ is nullptr, please check the outputTask_.");
119         return false;
120     }
121     return true;
122 }
123 
Start()124 bool AudioCodecWorker::Start()
125 {
126     AVCODEC_SYNC_TRACE;
127     AVCODEC_LOGD("Worker Start enter");
128     if (!callback_) {
129         AVCODEC_LOGE("Start failed in worker, callback is nullptr, please check the callback.");
130         return false;
131     }
132     if (!codec_) {
133         AVCODEC_LOGE("Start failed in worker, codec_ is nullptr, please check the codec_.");
134         return false;
135     }
136     bool result = Begin();
137     return result;
138 }
139 
Stop()140 bool AudioCodecWorker::Stop()
141 {
142     AVCODEC_SYNC_TRACE;
143     AVCODEC_LOGD("Worker Stop enter");
144     Dispose();
145 
146     if (inputTask_) {
147         inputTask_->StopAsync();
148     } else {
149         AVCODEC_LOGE("Stop failed in worker, inputTask_ is nullptr, please check the inputTask_.");
150         return false;
151     }
152     if (outputTask_) {
153         outputTask_->StopAsync();
154     } else {
155         AVCODEC_LOGE("Stop failed in worker, outputTask_ is nullptr, please check the outputTask_.");
156         return false;
157     }
158 
159     ReleaseAllInBufferQueue();
160     ReleaseAllInBufferAvaQueue();
161 
162     inputBuffer_->ReleaseAll();
163     outputBuffer_->ReleaseAll();
164     return true;
165 }
166 
Pause()167 bool AudioCodecWorker::Pause()
168 {
169     AVCODEC_SYNC_TRACE;
170     AVCODEC_LOGD("Worker Pause enter");
171     Dispose();
172 
173     if (inputTask_) {
174         inputTask_->PauseAsync();
175     } else {
176         AVCODEC_LOGE("Pause failed in worker, inputTask_ is nullptr, please check the inputTask_.");
177         return false;
178     }
179     if (outputTask_) {
180         outputTask_->PauseAsync();
181     } else {
182         AVCODEC_LOGE("Pause failed in worker, outputTask_ is nullptr, please check the outputTask_.");
183         return false;
184     }
185 
186     ReleaseAllInBufferQueue();
187     ReleaseAllInBufferAvaQueue();
188 
189     inputBuffer_->ReleaseAll();
190     outputBuffer_->ReleaseAll();
191     return true;
192 }
193 
Resume()194 bool AudioCodecWorker::Resume()
195 {
196     AVCODEC_SYNC_TRACE;
197     AVCODEC_LOGD("Worker Resume enter");
198     if (!callback_) {
199         AVCODEC_LOGE("Resume failed in worker, callback_ is nullptr, please check the callback_.");
200         return false;
201     }
202     if (!codec_) {
203         AVCODEC_LOGE("Resume failed in worker, codec_ is nullptr, please check the codec_.");
204         return false;
205     }
206     bool result = Begin();
207     return result;
208 }
209 
Release()210 bool AudioCodecWorker::Release()
211 {
212     AVCODEC_SYNC_TRACE;
213     AVCODEC_LOGD("Worker Release enter");
214     Dispose();
215     ResetTask();
216     ReleaseAllInBufferQueue();
217     ReleaseAllInBufferAvaQueue();
218 
219     inputBuffer_->ReleaseAll();
220     outputBuffer_->ReleaseAll();
221     if (codec_) {
222         codec_ = nullptr;
223     }
224     if (callback_) {
225         callback_.reset();
226         callback_ = nullptr;
227     }
228     AVCODEC_LOGD("Worker Release end");
229     return true;
230 }
231 
GetInputBuffer() const232 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetInputBuffer() const noexcept
233 {
234     AVCODEC_LOGD("Worker GetInputBuffer enter");
235     return inputBuffer_;
236 }
237 
GetOutputBuffer() const238 std::shared_ptr<AudioBuffersManager> AudioCodecWorker::GetOutputBuffer() const noexcept
239 {
240     AVCODEC_LOGD("Worker GetOutputBuffer enter");
241     return outputBuffer_;
242 }
243 
GetOutputBufferInfo(const uint32_t & index) const244 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetOutputBufferInfo(const uint32_t &index) const noexcept
245 {
246     return outputBuffer_->getMemory(index);
247 }
248 
GetInputBufferInfo(const uint32_t & index) const249 std::shared_ptr<AudioBufferInfo> AudioCodecWorker::GetInputBufferInfo(const uint32_t &index) const noexcept
250 {
251     return inputBuffer_->getMemory(index);
252 }
253 
ProduceInputBuffer()254 void AudioCodecWorker::ProduceInputBuffer()
255 {
256     AVCODEC_SYNC_TRACE;
257     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer enter");
258     if (!isRunning) {
259         usleep(DEFAULT_TRY_DECODE_TIME);
260         return;
261     }
262     std::unique_lock lock(inputMutex_);
263     while (!inBufAvaIndexQue_.empty() && isRunning) {
264         uint32_t index;
265         {
266             std::lock_guard<std::mutex> avaLock(inAvaMutex_);
267             index = inBufAvaIndexQue_.front();
268             inBufAvaIndexQue_.pop();
269         }
270         auto inputBuffer = GetInputBufferInfo(index);
271         inputBuffer->SetBufferOwned();
272         callback_->OnInputBufferAvailable(index, inputBuffer->GetBuffer());
273     }
274     inputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
275                              [this] { return (!inBufAvaIndexQue_.empty() || !isRunning); });
276     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker produceInputBuffer exit");
277 }
278 
HandInputBuffer(int32_t & ret)279 bool AudioCodecWorker::HandInputBuffer(int32_t &ret)
280 {
281     uint32_t inputIndex;
282     {
283         std::lock_guard<std::mutex> lock(stateMutex_);
284         inputIndex = inBufIndexQue_.front();
285         inBufIndexQue_.pop();
286     }
287     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "handle input buffer. index:%{public}u", inputIndex);
288     auto inputBuffer = GetInputBufferInfo(inputIndex);
289     bool isEos = inputBuffer->CheckIsEos();
290     ret = codec_->ProcessSendData(inputBuffer);
291     inputBuffer_->ReleaseBuffer(inputIndex);
292     {
293         std::lock_guard<std::mutex> lock(inAvaMutex_);
294         inBufAvaIndexQue_.push(inputIndex);
295         inputCondition_.notify_all();
296     }
297     if (ret == AVCodecServiceErrCode::AVCS_ERR_INVALID_DATA) {
298         callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
299     }
300     return isEos;
301 }
302 
ReleaseOutputBuffer(const uint32_t & index,const int32_t & ret)303 void AudioCodecWorker::ReleaseOutputBuffer(const uint32_t &index, const int32_t &ret)
304 {
305     outputBuffer_->ReleaseBuffer(index);
306     callback_->OnError(AVCodecErrorType::AVCODEC_ERROR_INTERNAL, ret);
307 }
308 
SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> & outBuffer,bool isEos,uint32_t index)309 void AudioCodecWorker::SetFirstAndEosStatus(std::shared_ptr<AudioBufferInfo> &outBuffer, bool isEos, uint32_t index)
310 {
311     if (isEos) {
312         AVCODEC_LOGD("set buffer EOS. index:%{public}u", index);
313         outBuffer->SetEos(isEos);
314     }
315     if (isFirFrame_) {
316         outBuffer->SetFirstFrame();
317         isFirFrame_ = false;
318     }
319 }
320 
ConsumerOutputBuffer()321 void AudioCodecWorker::ConsumerOutputBuffer()
322 {
323     AVCODEC_SYNC_TRACE;
324     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Worker consumerOutputBuffer enter");
325     if (!isRunning) {
326         usleep(DEFAULT_TRY_DECODE_TIME);
327         return;
328     }
329     std::unique_lock lock(outputMutex_);
330     while (!inBufIndexQue_.empty() && isRunning) {
331         int32_t ret;
332         bool isEos = HandInputBuffer(ret);
333         if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
334             AVCODEC_LOGW("current input buffer is not enough,skip this frame");
335             continue;
336         }
337         if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
338             AVCODEC_LOGE("input error!");
339             return;
340         }
341         uint32_t index;
342         if (outputBuffer_->RequestAvailableIndex(index)) {
343             auto outBuffer = GetOutputBufferInfo(index);
344             SetFirstAndEosStatus(outBuffer, isEos, index);
345             ret = codec_->ProcessRecieveData(outBuffer);
346             if (ret == AVCodecServiceErrCode::AVCS_ERR_NOT_ENOUGH_DATA) {
347                 AVCODEC_LOGD("current ouput buffer is not enough,skip this frame. index:%{public}u", index);
348                 outputBuffer_->ReleaseBuffer(index);
349                 continue;
350             }
351             if (ret != AVCodecServiceErrCode::AVCS_ERR_OK && ret != AVCodecServiceErrCode::AVCS_ERR_END_OF_STREAM) {
352                 AVCODEC_LOGE("process output buffer error! index:%{public}u", index);
353                 ReleaseOutputBuffer(index, ret);
354                 return;
355             }
356             AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work %{public}s consumerOutputBuffer callback_ index:%{public}u",
357                                name_.data(), index);
358             callback_->OnOutputBufferAvailable(index, outBuffer->GetBufferAttr(), outBuffer->GetFlag(),
359                                                outBuffer->GetBuffer());
360         }
361     }
362     outputCondition_.wait_for(lock, std::chrono::milliseconds(TIMEOUT_MS),
363                               [this] { return (inBufIndexQue_.size() > 0 || !isRunning); });
364     AVCODEC_LOGD_LIMIT(LOGD_FREQUENCY, "Work consumerOutputBuffer exit");
365 }
366 
Dispose()367 void AudioCodecWorker::Dispose()
368 {
369     AVCODEC_LOGD("Worker dispose enter");
370     isRunning = false;
371     outputBuffer_->DisableRunning();
372     {
373         std::unique_lock lock(inputMutex_);
374         inputCondition_.notify_all();
375     }
376     {
377         std::unique_lock lock(outputMutex_);
378         outputCondition_.notify_all();
379     }
380 }
381 
Begin()382 bool AudioCodecWorker::Begin()
383 {
384     AVCODEC_LOGD("Worker begin enter");
385     for (uint32_t i = 0; i < static_cast<uint32_t>(bufferCount); i++) {
386         inBufAvaIndexQue_.push(i);
387     }
388     isRunning = true;
389 
390     inputBuffer_->SetRunning();
391     outputBuffer_->SetRunning();
392 
393     if (inputTask_) {
394         inputTask_->Start();
395     } else {
396         return false;
397     }
398     if (outputTask_) {
399         outputTask_->Start();
400     } else {
401         return false;
402     }
403     inputCondition_.notify_all();
404     outputCondition_.notify_all();
405     return true;
406 }
407 
ReleaseAllInBufferQueue()408 void AudioCodecWorker::ReleaseAllInBufferQueue()
409 {
410     std::lock_guard<std::mutex> lock(stateMutex_);
411     while (!inBufIndexQue_.empty()) {
412         inBufIndexQue_.pop();
413     }
414 }
415 
ReleaseAllInBufferAvaQueue()416 void AudioCodecWorker::ReleaseAllInBufferAvaQueue()
417 {
418     std::lock_guard<std::mutex> lock(inAvaMutex_);
419     while (!inBufAvaIndexQue_.empty()) {
420         inBufAvaIndexQue_.pop();
421     }
422 }
423 
ResetTask()424 void AudioCodecWorker::ResetTask()
425 {
426     if (inputTask_) {
427         inputTask_->Stop();
428         inputTask_.reset();
429         inputTask_ = nullptr;
430     }
431     if (outputTask_) {
432         outputTask_->Stop();
433         outputTask_.reset();
434         outputTask_ = nullptr;
435     }
436 }
437 } // namespace MediaAVCodec
438 } // namespace OHOS