1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 }
25
26 namespace OHOS {
27 namespace Media {
28
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)29 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
30 uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
31 {
32 MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
33 size, static_cast<uint32_t>(type), name.c_str());
34 return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
35 }
36
GetLocalProducer()37 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
38 {
39 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
40 std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
41 if (localProducer_.expired()) {
42 auto shared_this = shared_from_this();
43 FALSE_RETURN_V(shared_this != nullptr, nullptr);
44 producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
45 localProducer_ = producer;
46 }
47
48 return localProducer_.lock();
49 }
50
GetLocalConsumer()51 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
52 {
53 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
54 std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
55 if (localConsumer_.expired()) {
56 auto shared_this = shared_from_this();
57 FALSE_RETURN_V(shared_this != nullptr, nullptr);
58 consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
59 localConsumer_ = consumer;
60 }
61 return localConsumer_.lock();
62 }
63
GetProducer()64 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
65 {
66 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
67 sptr<AVBufferQueueProducerImpl> producer = nullptr;
68 if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
69 auto shared_this = shared_from_this();
70 FALSE_RETURN_V(shared_this != nullptr, nullptr);
71 producer = new AVBufferQueueProducerImpl(shared_this);
72 producer_ = producer;
73 }
74
75 return producer_.promote();
76 }
77
GetConsumer()78 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
79 {
80 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
81 sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
82 if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
83 auto shared_this = shared_from_this();
84 FALSE_RETURN_V(shared_this != nullptr, nullptr);
85 consumer = new AVBufferQueueConsumerImpl(shared_this);
86 consumer_ = consumer;
87 }
88
89 return consumer_.promote();
90 }
91
AVBufferQueueImpl(const std::string & name)92 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
93 : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
94
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)95 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
96 : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
97 {
98 if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
99 size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
100 }
101 }
102
GetQueueSize()103 uint32_t AVBufferQueueImpl::GetQueueSize()
104 {
105 return size_;
106 }
107
SetQueueSize(uint32_t size)108 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
109 {
110 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
111 Status::ERROR_INVALID_BUFFER_SIZE);
112
113 if (size > size_) {
114 size_ = size;
115 if (!disableAlloc_) {
116 requestCondition.notify_all();
117 }
118 } else {
119 std::lock_guard<std::mutex> lockGuard(queueMutex_);
120 DeleteBuffers(size_ - size);
121 size_ = size;
122 }
123
124 return Status::OK;
125 }
126
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)127 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
128 {
129 FALSE_RETURN_V(buffer != nullptr, false);
130 auto uniqueId = buffer->GetUniqueId();
131 return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
132 }
133
GetCachedBufferCount() const134 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
135 {
136 // 确保cachedBufferMap_.size()不会超过MAX_UINT32
137 return static_cast<uint32_t>(cachedBufferMap_.size());
138 }
139
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)140 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
141 {
142 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
143 if (config <= cachedBufferMap_[*it].config) {
144 buffer = cachedBufferMap_[*it].buffer;
145 freeBufferList_.erase(it);
146 return Status::OK;
147 }
148 }
149
150 if (freeBufferList_.empty()) {
151 buffer = nullptr;
152 // 没有可以重用的freeBuffer
153 return Status::ERROR_NO_FREE_BUFFER;
154 }
155
156 buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
157 freeBufferList_.pop_front();
158
159 return Status::OK;
160 }
161
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)162 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
163 {
164 FALSE_RETURN_V(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
165
166 buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
167 dirtyBufferList_.pop_front();
168 return Status::OK;
169 }
170
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)171 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
172 {
173 auto bufferImpl = AVBuffer::CreateAVBuffer(config);
174 FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
175
176 auto uniqueId = bufferImpl->GetUniqueId();
177 AVBufferElement ele = {
178 .config = bufferImpl->GetConfig(),
179 .state = AVBUFFER_STATE_RELEASED,
180 .isDeleting = false,
181 .buffer = bufferImpl,
182 };
183 cachedBufferMap_[uniqueId] = ele;
184 buffer = bufferImpl;
185
186 return Status::OK;
187 }
188
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)189 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
190 {
191 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
192
193 auto uniqueId = buffer->GetUniqueId();
194 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
195
196 if (config <= cachedBufferMap_[uniqueId].config) {
197 // 不需要重新分配,直接更新buffer大小
198 cachedBufferMap_[uniqueId].config.size = config.size;
199 } else {
200 // 重新分配
201 DeleteCachedBufferById(uniqueId);
202 NOK_RETURN(AllocBuffer(buffer, config));
203 }
204
205 // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
206 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
207 return Status::OK;
208 }
209
DeleteBuffers(uint32_t count)210 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
211 {
212 FALSE_RETURN(count > 0);
213
214 while (!freeBufferList_.empty()) {
215 DeleteCachedBufferById(freeBufferList_.front());
216 freeBufferList_.pop_front();
217 count--;
218 if (count <= 0) {
219 return;
220 }
221 }
222
223 while (!dirtyBufferList_.empty()) {
224 DeleteCachedBufferById(dirtyBufferList_.front());
225 dirtyBufferList_.pop_front();
226 count--;
227 if (count <= 0) {
228 return;
229 }
230 }
231
232 for (auto&& ele : cachedBufferMap_) {
233 ele.second.isDeleting = true;
234 // we don't have to do anything
235 count--;
236 if (count <= 0) {
237 break;
238 }
239 }
240 }
241
DeleteCachedBufferById(uint64_t uniqueId)242 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
243 {
244 auto it = cachedBufferMap_.find(uniqueId);
245 if (it != cachedBufferMap_.end()) {
246 cachedBufferMap_.erase(it);
247 }
248 }
249
CheckConfig(const AVBufferConfig & config)250 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
251 {
252 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
253 MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
254 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
255 }
256 // memoryType_初始化之后将无法改变。
257 if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
258 MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
259 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
260 }
261 memoryType_ = config.memoryType;
262 return Status::OK;
263 }
264
wait_for(std::unique_lock<std::mutex> & lock,int32_t timeoutMs)265 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int32_t timeoutMs)
266 {
267 MEDIA_LOG_D("wait for free buffer, timeout = %d", timeoutMs);
268 if (timeoutMs > 0) {
269 return requestCondition.wait_for(
270 lock, std::chrono::milliseconds(timeoutMs), [this]() {
271 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
272 });
273 } else if (timeoutMs < 0) {
274 requestCondition.wait(lock);
275 }
276 return true;
277 }
278
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)279 Status AVBufferQueueImpl::RequestBuffer(
280 std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
281 {
282 auto configCopy = config;
283 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
284 MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
285 "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
286 configCopy.memoryType = memoryType_;
287 }
288
289 // check param
290 std::unique_lock<std::mutex> lock(queueMutex_);
291 auto res = CheckConfig(configCopy);
292 FALSE_RETURN_V_MSG(res == Status::OK,
293 res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
294 // dequeue from free list
295 auto ret = PopFromFreeBufferList(buffer, configCopy);
296 if (ret == Status::OK) {
297 return RequestReuseBuffer(buffer, configCopy);
298 }
299
300 // check queue size
301 if (GetCachedBufferCount() >= GetQueueSize()) {
302 if (!wait_for(lock, timeoutMs)) {
303 MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutMs)");
304 return Status::ERROR_WAIT_TIMEOUT;
305 }
306 // 被条件唤醒后,再次尝试从freeBufferList中取buffer
307 ret = PopFromFreeBufferList(buffer, configCopy);
308 if (ret == Status::OK) {
309 return RequestReuseBuffer(buffer, configCopy);
310 }
311 if (GetCachedBufferCount() >= GetQueueSize()) {
312 return Status::ERROR_NO_FREE_BUFFER;
313 }
314 }
315
316 NOK_RETURN(AllocBuffer(buffer, configCopy));
317 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
318
319 return Status::OK;
320 }
321
InsertFreeBufferInOrder(uint64_t uniqueId)322 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
323 {
324 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
325 if ((*it != uniqueId) &&
326 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
327 freeBufferList_.insert(it, uniqueId);
328 return;
329 }
330 }
331 freeBufferList_.emplace_back(uniqueId);
332 }
333
CancelBuffer(uint64_t uniqueId)334 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
335 {
336 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
337
338 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
339 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
340 Status::ERROR_INVALID_BUFFER_STATE);
341
342 InsertFreeBufferInOrder(uniqueId);
343
344 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
345
346 requestCondition.notify_all();
347
348 MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
349
350 return Status::OK;
351 }
352
PushBuffer(uint64_t uniqueId,bool available)353 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
354 {
355 std::shared_ptr<AVBuffer> buffer = nullptr;
356 {
357 std::lock_guard<std::mutex> lockGuard(queueMutex_);
358 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
359 Status::ERROR_INVALID_BUFFER_ID);
360
361 auto& ele = cachedBufferMap_[uniqueId];
362 if (ele.isDeleting) {
363 DeleteCachedBufferById(uniqueId);
364 MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
365 return Status::OK;
366 }
367
368 if (available) {
369 FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
370 }
371
372 FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
373 Status::ERROR_INVALID_BUFFER_STATE);
374
375 ele.state = AVBUFFER_STATE_PUSHED;
376 buffer = cachedBufferMap_[uniqueId].buffer;
377 }
378
379 if (available) {
380 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
381 if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
382 brokerListeners_.back()->OnBufferFilled(buffer);
383 return Status::OK;
384 }
385 }
386
387 return ReturnBuffer(uniqueId, available);
388 }
389
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)390 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
391 {
392 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
393
394 return PushBuffer(buffer->GetUniqueId(), available);
395 }
396
ReturnBuffer(uint64_t uniqueId,bool available)397 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
398 {
399 {
400 std::lock_guard<std::mutex> lockGuard(queueMutex_);
401 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
402 Status::ERROR_INVALID_BUFFER_ID);
403
404 if (cachedBufferMap_[uniqueId].isDeleting) {
405 DeleteCachedBufferById(uniqueId);
406 MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
407 return Status::OK;
408 }
409
410 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
411 Status::ERROR_INVALID_BUFFER_STATE);
412
413 if (!available) {
414 NOK_RETURN(CancelBuffer(uniqueId));
415 } else {
416 auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
417 bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
418 if (!isEosBuffer) {
419 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
420 }
421 cachedBufferMap_[uniqueId].config = config;
422 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
423 dirtyBufferList_.push_back(uniqueId);
424 }
425 }
426
427 if (!available) {
428 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
429 if (producerListener_ != nullptr) {
430 producerListener_->OnBufferAvailable();
431 }
432 return Status::OK;
433 }
434
435 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
436 FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
437 consumerListener_->OnBufferAvailable();
438
439 return Status::OK;
440 }
441
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)442 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
443 {
444 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
445
446 return ReturnBuffer(buffer->GetUniqueId(), available);
447 }
448
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)449 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
450 std::shared_ptr<AVBuffer>& buffer, bool isFilled)
451 {
452 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
453 auto config = buffer->GetConfig();
454 auto uniqueId = buffer->GetUniqueId();
455 {
456 std::lock_guard<std::mutex> lockGuard(queueMutex_);
457 if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
458 SetQueueSizeBeforeAttachBufferLocked(size);
459 }
460 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
461 Status::ERROR_INVALID_BUFFER_ID);
462 NOK_RETURN(CheckConfig(config));
463 Status result = AttachAvailableBufferLocked(buffer);
464 FALSE_RETURN_V(result == Status::OK, result);
465 }
466 if (isFilled) {
467 return PushBufferOnFilled(uniqueId, isFilled);
468 }
469 return ReleaseBuffer(uniqueId);
470 }
471
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)472 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
473 {
474 auto config = buffer->GetConfig();
475 auto uniqueId = buffer->GetUniqueId();
476 AVBufferElement ele = {
477 .config = config,
478 .state = AVBUFFER_STATE_ATTACHED,
479 .isDeleting = false,
480 .buffer = buffer
481 };
482
483 auto cachedCount = GetCachedBufferCount();
484 auto queueSize = GetQueueSize();
485 if (cachedCount >= queueSize) {
486 auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
487 auto toBeDeleteCount = cachedCount - queueSize;
488 // 这里表示有可以删除的buffer,或者
489 if (validCount > toBeDeleteCount) {
490 // 在什么场景下需要在此处删除buffer?
491 DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
492 cachedBufferMap_[uniqueId] = ele;
493 MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
494 } else {
495 MEDIA_LOG_E("attach failed, out of range");
496 return Status::ERROR_OUT_OF_RANGE;
497 }
498 } else {
499 cachedBufferMap_[uniqueId] = ele;
500 MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
501 }
502 return Status::OK;
503 }
504
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)505 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
506 {
507 auto ret = PushBuffer(uniqueId, isFilled);
508 if (ret != Status::OK) {
509 // PushBuffer失败,强制Detach
510 DetachBuffer(uniqueId, true);
511 }
512 return ret;
513 }
514
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)515 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
516 {
517 if (size > size_) {
518 size_ = size;
519 if (!disableAlloc_) {
520 requestCondition.notify_all();
521 }
522 } else {
523 DeleteBuffers(size_ - size);
524 size_ = size;
525 }
526 }
527
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)528 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
529 {
530 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
531
532 auto config = buffer->GetConfig();
533 auto uniqueId = buffer->GetUniqueId();
534 {
535 std::lock_guard<std::mutex> lockGuard(queueMutex_);
536 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
537 Status::ERROR_INVALID_BUFFER_ID);
538
539 NOK_RETURN(CheckConfig(config));
540
541 Status result = AttachAvailableBufferLocked(buffer);
542 FALSE_RETURN_V(result == Status::OK, result);
543 }
544
545 if (isFilled) {
546 return PushBufferOnFilled(uniqueId, isFilled);
547 }
548
549 return ReleaseBuffer(uniqueId);
550 }
551
DetachBuffer(uint64_t uniqueId,bool force)552 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
553 {
554 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
555
556 const auto& ele = cachedBufferMap_[uniqueId];
557
558 if (!force) {
559 // 只有生产者或消费者在获取到buffer后才能detach
560 if (ele.state == AVBUFFER_STATE_REQUESTED) {
561 MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
562 } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
563 MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
564 } else {
565 MEDIA_LOG_W("detach buffer(%llu) on state %d forbidden", uniqueId, ele.state);
566 return Status::ERROR_INVALID_BUFFER_STATE;
567 }
568 }
569
570 cachedBufferMap_.erase(uniqueId);
571
572 return Status::OK;
573 }
574
DetachBuffer(uint64_t uniqueId)575 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
576 {
577 std::lock_guard<std::mutex> lockGuard(queueMutex_);
578 return DetachBuffer(uniqueId, false);
579 }
580
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)581 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
582 {
583 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
584
585 return DetachBuffer(buffer->GetUniqueId());
586 }
587
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)588 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
589 {
590 std::lock_guard<std::mutex> lockGuard(queueMutex_);
591 auto ret = PopFromDirtyBufferList(buffer);
592 if (ret != Status::OK) {
593 MEDIA_LOG_E("acquire buffer failed");
594 return ret;
595 }
596
597 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
598
599 return Status::OK;
600 }
601
ReleaseBuffer(uint64_t uniqueId)602 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
603 {
604 {
605 std::lock_guard<std::mutex> lockGuard(queueMutex_);
606 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
607
608 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
609 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
610
611 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
612 if (cachedBufferMap_[uniqueId].isDeleting) {
613 DeleteCachedBufferById(uniqueId);
614 return Status::OK;
615 }
616
617 InsertFreeBufferInOrder(uniqueId);
618
619 requestCondition.notify_all();
620 }
621
622 // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
623 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
624 if (producerListener_ != nullptr) {
625 producerListener_->OnBufferAvailable();
626 }
627
628 return Status::OK;
629 }
630
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)631 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
632 {
633 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
634
635 return ReleaseBuffer(buffer->GetUniqueId());
636 }
637
Clear()638 Status AVBufferQueueImpl::Clear()
639 {
640 MEDIA_LOG_E("AVBufferQueueImpl Clear");
641 std::lock_guard<std::mutex> lockGuard(queueMutex_);
642 dirtyBufferList_.clear();
643 for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
644 if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
645 it->second.state = AVBUFFER_STATE_RELEASED;
646 InsertFreeBufferInOrder(it->first);
647 }
648 }
649 requestCondition.notify_all();
650 return Status::OK;
651 }
652
SetBrokerListener(sptr<IBrokerListener> & listener)653 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
654 {
655 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
656 brokerListeners_.push_back(listener);
657 return Status::OK;
658 }
659
RemoveBrokerListener(sptr<IBrokerListener> & listener)660 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
661 {
662 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
663 if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
664 brokerListeners_.pop_back();
665 MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
666 } else {
667 MEDIA_LOG_E("removed item is not the back one.");
668 }
669 return Status::OK;
670 }
671
SetProducerListener(sptr<IProducerListener> & listener)672 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
673 {
674 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
675 producerListener_ = listener;
676
677 return Status::OK;
678 }
679
SetConsumerListener(sptr<IConsumerListener> & listener)680 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
681 {
682 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
683 consumerListener_ = listener;
684
685 return Status::OK;
686 }
687
688 } // namespace Media
689 } // namespace OHOS
690