1 /*
2  * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "DataStreamImpl"
17 
18 #include "data_stream_impl.h"
19 #include "foundation/log.h"
20 
21 namespace OHOS {
22 namespace Media {
DataStreamImpl(size_t size,size_t count,MemoryType type)23 DataStreamImpl::DataStreamImpl(size_t size, size_t count, MemoryType type)
24 {
25     FALSE_LOG(type == MemoryType::VIRTUAL_ADDR);
26     for (size_t i = 0; i < count; ++i) {
27         auto buffer = std::make_shared<VirtualDataBuffer>(size);
28         emptyBuffers_.push(buffer);
29         allBuffers_.emplace_back(buffer);
30     }
31 }
32 
GetDataBuffer(std::shared_ptr<DataBuffer> & buffer,int timeout)33 bool DataStreamImpl::GetDataBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout)
34 {
35     OSAL::ScopedLock lock(dataMutex_);
36     if (dataBuffers_.empty()) {
37         dataCondition_.Wait(lock, [this] { return !dataBuffers_.empty(); });
38     }
39     buffer = dataBuffers_.front();
40     dataBuffers_.pop();
41     return true;
42 }
43 
QueueEmptyBuffer(const std::shared_ptr<DataBuffer> & buffer)44 bool DataStreamImpl::QueueEmptyBuffer(const std::shared_ptr<DataBuffer>& buffer)
45 {
46     OSAL::ScopedLock lock(emptyMutex_);
47     emptyBuffers_.push(buffer);
48     emptyCondition_.NotifyAll();
49     return true;
50 }
51 
QueueEmptyBuffer(uint8_t * address)52 bool DataStreamImpl::QueueEmptyBuffer(uint8_t* address)
53 {
54     OSAL::ScopedLock lock(emptyMutex_);
55     for (size_t i = 0; i < allBuffers_.size(); i++) {
56         if (allBuffers_[i]->GetAddress() == address) {
57             emptyBuffers_.push(allBuffers_[i]);
58             emptyCondition_.NotifyAll();
59             return true;
60         }
61     }
62     MEDIA_LOG_E("Queue empty buffer address not in DataStream.");
63     return false;
64 }
65 
GetEmptyBuffer(std::shared_ptr<DataBuffer> & buffer,int timeout)66 bool DataStreamImpl::GetEmptyBuffer(std::shared_ptr<DataBuffer>& buffer, int timeout)
67 {
68     OSAL::ScopedLock lock(emptyMutex_);
69     if (emptyBuffers_.empty()) {
70         emptyCondition_.Wait(lock, [this] { return !emptyBuffers_.empty(); });
71     }
72     buffer = emptyBuffers_.front();
73     emptyBuffers_.pop();
74     return true;
75 }
76 
QueueDataBuffer(const std::shared_ptr<DataBuffer> & buffer)77 bool DataStreamImpl::QueueDataBuffer(const std::shared_ptr<DataBuffer>& buffer)
78 {
79     OSAL::ScopedLock lock(dataMutex_);
80     dataBuffers_.push(buffer);
81     dataCondition_.NotifyAll();
82     return true;
83 }
84 
VirtualDataBuffer(size_t capacity)85 VirtualDataBuffer::VirtualDataBuffer(size_t capacity) : capacity_(capacity)
86 {
87     address_ = new uint8_t[capacity];
88 }
89 
~VirtualDataBuffer()90 VirtualDataBuffer::~VirtualDataBuffer()
91 {
92     delete [] address_;
93 }
94 
IsEos()95 bool VirtualDataBuffer::IsEos()
96 {
97     return isEos_;
98 }
99 
SetEos(bool isEos)100 void VirtualDataBuffer::SetEos(bool isEos)
101 {
102     isEos_ = isEos;
103 }
104 
GetAddress()105 uint8_t* VirtualDataBuffer::GetAddress()
106 {
107     return address_;
108 }
109 
GetCapacity()110 size_t VirtualDataBuffer::GetCapacity()
111 {
112     return capacity_;
113 }
114 
GetSize()115 size_t VirtualDataBuffer::GetSize()
116 {
117     return size_;
118 }
119 
SetSize(size_t size)120 void VirtualDataBuffer::SetSize(size_t size)
121 {
122     size_ = size;
123 }
124 } // Media
125 } // OHOS