1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21 
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 }
25 
26 namespace OHOS {
27 namespace Media {
28 
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)29 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
30     uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
31 {
32     MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
33                 size, static_cast<uint32_t>(type), name.c_str());
34     return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
35 }
36 
GetLocalProducer()37 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
38 {
39     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
40     std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
41     if (localProducer_.expired()) {
42         auto shared_this = shared_from_this();
43         FALSE_RETURN_V(shared_this != nullptr, nullptr);
44         producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
45         localProducer_ = producer;
46     }
47 
48     return localProducer_.lock();
49 }
50 
GetLocalConsumer()51 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
52 {
53     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
54     std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
55     if (localConsumer_.expired()) {
56         auto shared_this = shared_from_this();
57         FALSE_RETURN_V(shared_this != nullptr, nullptr);
58         consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
59         localConsumer_ = consumer;
60     }
61     return localConsumer_.lock();
62 }
63 
GetProducer()64 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
65 {
66     std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
67     sptr<AVBufferQueueProducerImpl> producer = nullptr;
68     if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
69         auto shared_this = shared_from_this();
70         FALSE_RETURN_V(shared_this != nullptr, nullptr);
71         producer = new AVBufferQueueProducerImpl(shared_this);
72         producer_ = producer;
73     }
74 
75     return producer_.promote();
76 }
77 
GetConsumer()78 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
79 {
80     std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
81     sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
82     if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
83         auto shared_this = shared_from_this();
84         FALSE_RETURN_V(shared_this != nullptr, nullptr);
85         consumer = new AVBufferQueueConsumerImpl(shared_this);
86         consumer_ = consumer;
87     }
88 
89     return consumer_.promote();
90 }
91 
AVBufferQueueImpl(const std::string & name)92 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
93     : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
94 
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)95 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
96     : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
97 {
98     if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
99         size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
100     }
101 }
102 
GetQueueSize()103 uint32_t AVBufferQueueImpl::GetQueueSize()
104 {
105     return size_;
106 }
107 
SetQueueSize(uint32_t size)108 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
109 {
110     FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
111                    Status::ERROR_INVALID_BUFFER_SIZE);
112 
113     if (size > size_) {
114         size_ = size;
115         if (!disableAlloc_) {
116             requestCondition.notify_all();
117         }
118     } else {
119         std::lock_guard<std::mutex> lockGuard(queueMutex_);
120         DeleteBuffers(size_ - size);
121         size_ = size;
122     }
123 
124     return Status::OK;
125 }
126 
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)127 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
128 {
129     FALSE_RETURN_V(buffer != nullptr, false);
130     auto uniqueId = buffer->GetUniqueId();
131     return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
132 }
133 
GetCachedBufferCount() const134 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
135 {
136     // 确保cachedBufferMap_.size()不会超过MAX_UINT32
137     return static_cast<uint32_t>(cachedBufferMap_.size());
138 }
139 
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)140 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
141 {
142     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
143         if (config <= cachedBufferMap_[*it].config) {
144             buffer = cachedBufferMap_[*it].buffer;
145             freeBufferList_.erase(it);
146             return Status::OK;
147         }
148     }
149 
150     if (freeBufferList_.empty()) {
151         buffer = nullptr;
152         // 没有可以重用的freeBuffer
153         return Status::ERROR_NO_FREE_BUFFER;
154     }
155 
156     buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
157     freeBufferList_.pop_front();
158 
159     return Status::OK;
160 }
161 
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)162 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
163 {
164     FALSE_RETURN_V(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
165 
166     buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
167     dirtyBufferList_.pop_front();
168     return Status::OK;
169 }
170 
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)171 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
172 {
173     auto bufferImpl = AVBuffer::CreateAVBuffer(config);
174     FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
175 
176     auto uniqueId = bufferImpl->GetUniqueId();
177     AVBufferElement ele = {
178         .config = bufferImpl->GetConfig(),
179         .state = AVBUFFER_STATE_RELEASED,
180         .isDeleting = false,
181         .buffer = bufferImpl,
182     };
183     cachedBufferMap_[uniqueId] = ele;
184     buffer = bufferImpl;
185 
186     return Status::OK;
187 }
188 
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)189 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
190 {
191     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
192 
193     auto uniqueId = buffer->GetUniqueId();
194     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
195 
196     if (config <= cachedBufferMap_[uniqueId].config) {
197         // 不需要重新分配,直接更新buffer大小
198         cachedBufferMap_[uniqueId].config.size = config.size;
199     } else {
200         // 重新分配
201         DeleteCachedBufferById(uniqueId);
202         NOK_RETURN(AllocBuffer(buffer, config));
203     }
204 
205     // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
206     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
207     return Status::OK;
208 }
209 
DeleteBuffers(uint32_t count)210 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
211 {
212     FALSE_RETURN(count > 0);
213 
214     while (!freeBufferList_.empty()) {
215         DeleteCachedBufferById(freeBufferList_.front());
216         freeBufferList_.pop_front();
217         count--;
218         if (count <= 0) {
219             return;
220         }
221     }
222 
223     while (!dirtyBufferList_.empty()) {
224         DeleteCachedBufferById(dirtyBufferList_.front());
225         dirtyBufferList_.pop_front();
226         count--;
227         if (count <= 0) {
228             return;
229         }
230     }
231 
232     for (auto&& ele : cachedBufferMap_) {
233         ele.second.isDeleting = true;
234         // we don't have to do anything
235         count--;
236         if (count <= 0) {
237             break;
238         }
239     }
240 }
241 
DeleteCachedBufferById(uint64_t uniqueId)242 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
243 {
244     auto it = cachedBufferMap_.find(uniqueId);
245     if (it != cachedBufferMap_.end()) {
246         cachedBufferMap_.erase(it);
247     }
248 }
249 
CheckConfig(const AVBufferConfig & config)250 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
251 {
252     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
253         MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
254         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
255     }
256     // memoryType_初始化之后将无法改变。
257     if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
258         MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
259         return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
260     }
261     memoryType_ = config.memoryType;
262     return Status::OK;
263 }
264 
wait_for(std::unique_lock<std::mutex> & lock,int32_t timeoutMs)265 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int32_t timeoutMs)
266 {
267     MEDIA_LOG_D("wait for free buffer, timeout = %d", timeoutMs);
268     if (timeoutMs > 0) {
269         return requestCondition.wait_for(
270             lock, std::chrono::milliseconds(timeoutMs), [this]() {
271                 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
272             });
273     } else if (timeoutMs < 0) {
274         requestCondition.wait(lock);
275     }
276     return true;
277 }
278 
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)279 Status AVBufferQueueImpl::RequestBuffer(
280     std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
281 {
282     auto configCopy = config;
283     if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
284         MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
285                     "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
286         configCopy.memoryType = memoryType_;
287     }
288 
289     // check param
290     std::unique_lock<std::mutex> lock(queueMutex_);
291     auto res = CheckConfig(configCopy);
292     FALSE_RETURN_V_MSG(res == Status::OK,
293         res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
294     // dequeue from free list
295     auto ret = PopFromFreeBufferList(buffer, configCopy);
296     if (ret == Status::OK) {
297         return RequestReuseBuffer(buffer, configCopy);
298     }
299 
300     // check queue size
301     if (GetCachedBufferCount() >= GetQueueSize()) {
302         if (!wait_for(lock, timeoutMs)) {
303             MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutMs)");
304             return Status::ERROR_WAIT_TIMEOUT;
305         }
306         // 被条件唤醒后,再次尝试从freeBufferList中取buffer
307         ret = PopFromFreeBufferList(buffer, configCopy);
308         if (ret == Status::OK) {
309             return RequestReuseBuffer(buffer, configCopy);
310         }
311         if (GetCachedBufferCount() >= GetQueueSize()) {
312             return Status::ERROR_NO_FREE_BUFFER;
313         }
314     }
315 
316     NOK_RETURN(AllocBuffer(buffer, configCopy));
317     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
318 
319     return Status::OK;
320 }
321 
InsertFreeBufferInOrder(uint64_t uniqueId)322 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
323 {
324     for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
325         if ((*it != uniqueId) &&
326                 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
327             freeBufferList_.insert(it, uniqueId);
328             return;
329         }
330     }
331     freeBufferList_.emplace_back(uniqueId);
332 }
333 
CancelBuffer(uint64_t uniqueId)334 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
335 {
336     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
337 
338     FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
339                    cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
340                    Status::ERROR_INVALID_BUFFER_STATE);
341 
342     InsertFreeBufferInOrder(uniqueId);
343 
344     cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
345 
346     requestCondition.notify_all();
347 
348     MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
349 
350     return Status::OK;
351 }
352 
PushBuffer(uint64_t uniqueId,bool available)353 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
354 {
355     std::shared_ptr<AVBuffer> buffer = nullptr;
356     {
357         std::lock_guard<std::mutex> lockGuard(queueMutex_);
358         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
359                        Status::ERROR_INVALID_BUFFER_ID);
360 
361         auto& ele = cachedBufferMap_[uniqueId];
362         if (ele.isDeleting) {
363             DeleteCachedBufferById(uniqueId);
364             MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
365             return Status::OK;
366         }
367 
368         if (available) {
369             FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
370         }
371 
372         FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
373                        Status::ERROR_INVALID_BUFFER_STATE);
374 
375         ele.state = AVBUFFER_STATE_PUSHED;
376         buffer = cachedBufferMap_[uniqueId].buffer;
377     }
378 
379     if (available) {
380         std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
381         if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
382             brokerListeners_.back()->OnBufferFilled(buffer);
383             return Status::OK;
384         }
385     }
386 
387     return ReturnBuffer(uniqueId, available);
388 }
389 
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)390 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
391 {
392     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
393 
394     return PushBuffer(buffer->GetUniqueId(), available);
395 }
396 
ReturnBuffer(uint64_t uniqueId,bool available)397 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
398 {
399     {
400         std::lock_guard<std::mutex> lockGuard(queueMutex_);
401         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
402                        Status::ERROR_INVALID_BUFFER_ID);
403 
404         if (cachedBufferMap_[uniqueId].isDeleting) {
405             DeleteCachedBufferById(uniqueId);
406             MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
407             return Status::OK;
408         }
409 
410         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
411                        Status::ERROR_INVALID_BUFFER_STATE);
412 
413         if (!available) {
414             NOK_RETURN(CancelBuffer(uniqueId));
415         } else {
416             auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
417             bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
418             if (!isEosBuffer) {
419                 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
420             }
421             cachedBufferMap_[uniqueId].config = config;
422             cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
423             dirtyBufferList_.push_back(uniqueId);
424         }
425     }
426 
427     if (!available) {
428         std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
429         if (producerListener_ != nullptr) {
430             producerListener_->OnBufferAvailable();
431         }
432         return Status::OK;
433     }
434 
435     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
436     FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
437     consumerListener_->OnBufferAvailable();
438 
439     return Status::OK;
440 }
441 
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)442 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
443 {
444     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
445 
446     return ReturnBuffer(buffer->GetUniqueId(), available);
447 }
448 
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)449 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
450     std::shared_ptr<AVBuffer>& buffer, bool isFilled)
451 {
452     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
453     auto config = buffer->GetConfig();
454     auto uniqueId = buffer->GetUniqueId();
455     {
456         std::lock_guard<std::mutex> lockGuard(queueMutex_);
457         if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
458             SetQueueSizeBeforeAttachBufferLocked(size);
459         }
460         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
461                        Status::ERROR_INVALID_BUFFER_ID);
462         NOK_RETURN(CheckConfig(config));
463         Status result = AttachAvailableBufferLocked(buffer);
464         FALSE_RETURN_V(result == Status::OK, result);
465     }
466     if (isFilled) {
467         return PushBufferOnFilled(uniqueId, isFilled);
468     }
469     return ReleaseBuffer(uniqueId);
470 }
471 
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)472 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
473 {
474     auto config = buffer->GetConfig();
475     auto uniqueId = buffer->GetUniqueId();
476     AVBufferElement ele = {
477         .config = config,
478         .state = AVBUFFER_STATE_ATTACHED,
479         .isDeleting = false,
480         .buffer = buffer
481     };
482 
483     auto cachedCount = GetCachedBufferCount();
484     auto queueSize = GetQueueSize();
485     if (cachedCount >= queueSize) {
486         auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
487         auto toBeDeleteCount = cachedCount - queueSize;
488         // 这里表示有可以删除的buffer,或者
489         if (validCount > toBeDeleteCount) {
490             // 在什么场景下需要在此处删除buffer?
491             DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
492             cachedBufferMap_[uniqueId] = ele;
493             MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
494         } else {
495             MEDIA_LOG_E("attach failed, out of range");
496             return Status::ERROR_OUT_OF_RANGE;
497         }
498     } else {
499         cachedBufferMap_[uniqueId] = ele;
500         MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
501     }
502     return Status::OK;
503 }
504 
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)505 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
506 {
507     auto ret = PushBuffer(uniqueId, isFilled);
508     if (ret != Status::OK) {
509         // PushBuffer失败,强制Detach
510         DetachBuffer(uniqueId, true);
511     }
512     return ret;
513 }
514 
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)515 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
516 {
517     if (size > size_) {
518         size_ = size;
519         if (!disableAlloc_) {
520             requestCondition.notify_all();
521         }
522     } else {
523         DeleteBuffers(size_ - size);
524         size_ = size;
525     }
526 }
527 
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)528 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
529 {
530     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
531 
532     auto config = buffer->GetConfig();
533     auto uniqueId = buffer->GetUniqueId();
534     {
535         std::lock_guard<std::mutex> lockGuard(queueMutex_);
536         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
537                        Status::ERROR_INVALID_BUFFER_ID);
538 
539         NOK_RETURN(CheckConfig(config));
540 
541         Status result = AttachAvailableBufferLocked(buffer);
542         FALSE_RETURN_V(result == Status::OK, result);
543     }
544 
545     if (isFilled) {
546         return PushBufferOnFilled(uniqueId, isFilled);
547     }
548 
549     return ReleaseBuffer(uniqueId);
550 }
551 
DetachBuffer(uint64_t uniqueId,bool force)552 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
553 {
554     FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
555 
556     const auto& ele = cachedBufferMap_[uniqueId];
557 
558     if (!force) {
559         // 只有生产者或消费者在获取到buffer后才能detach
560         if (ele.state == AVBUFFER_STATE_REQUESTED) {
561             MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
562         } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
563             MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
564         } else {
565             MEDIA_LOG_W("detach buffer(%llu) on state %d forbidden", uniqueId, ele.state);
566             return Status::ERROR_INVALID_BUFFER_STATE;
567         }
568     }
569 
570     cachedBufferMap_.erase(uniqueId);
571 
572     return Status::OK;
573 }
574 
DetachBuffer(uint64_t uniqueId)575 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
576 {
577     std::lock_guard<std::mutex> lockGuard(queueMutex_);
578     return DetachBuffer(uniqueId, false);
579 }
580 
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)581 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
582 {
583     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
584 
585     return DetachBuffer(buffer->GetUniqueId());
586 }
587 
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)588 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
589 {
590     std::lock_guard<std::mutex> lockGuard(queueMutex_);
591     auto ret = PopFromDirtyBufferList(buffer);
592     if (ret != Status::OK) {
593         MEDIA_LOG_E("acquire buffer failed");
594         return ret;
595     }
596 
597     cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
598 
599     return Status::OK;
600 }
601 
ReleaseBuffer(uint64_t uniqueId)602 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
603 {
604     {
605         std::lock_guard<std::mutex> lockGuard(queueMutex_);
606         FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
607 
608         FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
609             cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
610 
611         cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
612         if (cachedBufferMap_[uniqueId].isDeleting) {
613             DeleteCachedBufferById(uniqueId);
614             return Status::OK;
615         }
616 
617         InsertFreeBufferInOrder(uniqueId);
618 
619         requestCondition.notify_all();
620     }
621 
622     // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
623     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
624     if (producerListener_ != nullptr) {
625         producerListener_->OnBufferAvailable();
626     }
627 
628     return Status::OK;
629 }
630 
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)631 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
632 {
633     FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
634 
635     return ReleaseBuffer(buffer->GetUniqueId());
636 }
637 
Clear()638 Status AVBufferQueueImpl::Clear()
639 {
640     MEDIA_LOG_E("AVBufferQueueImpl Clear");
641     std::lock_guard<std::mutex> lockGuard(queueMutex_);
642     dirtyBufferList_.clear();
643     for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
644         if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
645             it->second.state = AVBUFFER_STATE_RELEASED;
646             InsertFreeBufferInOrder(it->first);
647         }
648     }
649     requestCondition.notify_all();
650     return Status::OK;
651 }
652 
SetBrokerListener(sptr<IBrokerListener> & listener)653 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
654 {
655     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
656     brokerListeners_.push_back(listener);
657     return Status::OK;
658 }
659 
RemoveBrokerListener(sptr<IBrokerListener> & listener)660 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
661 {
662     std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
663     if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
664         brokerListeners_.pop_back();
665         MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
666     } else {
667         MEDIA_LOG_E("removed item is not the back one.");
668     }
669     return Status::OK;
670 }
671 
SetProducerListener(sptr<IProducerListener> & listener)672 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
673 {
674     std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
675     producerListener_ = listener;
676 
677     return Status::OK;
678 }
679 
SetConsumerListener(sptr<IConsumerListener> & listener)680 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
681 {
682     std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
683     consumerListener_ = listener;
684 
685     return Status::OK;
686 }
687 
688 } // namespace Media
689 } // namespace OHOS
690