1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "BlockQueuePool"
17
18 #include "common/log.h"
19 #include "block_queue_pool.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_DEMUXER, "BlockQueuePool" };
23 }
24
25 namespace OHOS {
26 namespace Media {
27
~BlockQueuePool()28 BlockQueuePool::~BlockQueuePool()
29 {
30 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
31 for (auto que : quePool_) {
32 FreeQueue(que.first);
33 }
34 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
35 }
36
AddTrackQueue(uint32_t trackIndex)37 Status BlockQueuePool::AddTrackQueue(uint32_t trackIndex)
38 {
39 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
40 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
41 if (!HasQueue(trackIndex)) {
42 uint32_t queIndex = GetValidQueue();
43 queMap_[trackIndex] = std::vector<uint32_t>({ queIndex });
44 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", add track " PUBLIC_LOG_U32
45 ", get queue " PUBLIC_LOG_U32, name_.c_str(), trackIndex, queIndex);
46 sizeMap_[trackIndex] = 0;
47 } else {
48 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " is already in queue",
49 name_.c_str(), trackIndex);
50 }
51 return Status::OK;
52 }
53
RemoveTrackQueue(uint32_t trackIndex)54 Status BlockQueuePool::RemoveTrackQueue(uint32_t trackIndex)
55 {
56 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
57 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
58 if (!HasQueue(trackIndex)) {
59 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " not in queue", name_.c_str(), trackIndex);
60 } else {
61 for (auto queIndex : queMap_[trackIndex]) {
62 ResetQueue(queIndex);
63 }
64 queMap_[trackIndex].clear();
65 queMap_.erase(trackIndex);
66 sizeMap_.erase(trackIndex);
67 }
68 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
69 return Status::OK;
70 }
71
GetCacheSize(uint32_t trackIndex)72 size_t BlockQueuePool::GetCacheSize(uint32_t trackIndex)
73 {
74 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
75 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
76 size_t size = 0;
77 for (auto queIndex : queMap_[trackIndex]) {
78 if (quePool_[queIndex].blockQue == nullptr) {
79 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
80 continue;
81 }
82 if (quePool_[queIndex].blockQue->Size() > 0) {
83 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " has cache", name_.c_str());
84 size += quePool_[queIndex].blockQue->Size();
85 }
86 }
87 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", size = " PUBLIC_LOG_ZU, name_.c_str(), size);
88 return size;
89 }
90
GetCacheDataSize(uint32_t trackIndex)91 uint32_t BlockQueuePool::GetCacheDataSize(uint32_t trackIndex)
92 {
93 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
94 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
95 uint32_t dataSize = 0;
96 for (auto queIndex : queMap_[trackIndex]) {
97 if (static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize) > UINT32_MAX) {
98 MEDIA_LOG_D("DataSize(" PUBLIC_LOG_U64 ") is out of uint32",
99 static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize));
100 return UINT32_MAX;
101 }
102 dataSize += quePool_[queIndex].dataSize;
103 }
104 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " cache size = " PUBLIC_LOG_U32,
105 name_.c_str(), trackIndex, dataSize);
106 return dataSize;
107 }
108
HasCache(uint32_t trackIndex)109 bool BlockQueuePool::HasCache(uint32_t trackIndex)
110 {
111 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
112 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
113 for (auto queIndex : queMap_[trackIndex]) {
114 if (quePool_[queIndex].blockQue == nullptr) {
115 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
116 continue;
117 }
118 if (quePool_[queIndex].blockQue->Size() > 0) {
119 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " have cache", name_.c_str());
120 return true;
121 }
122 }
123 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " don't have cache", name_.c_str());
124 return false;
125 }
126
ResetQueue(uint32_t queueIndex)127 void BlockQueuePool::ResetQueue(uint32_t queueIndex)
128 {
129 if (quePool_.count(queueIndex) == 0) {
130 MEDIA_LOG_D("Error queueIndex");
131 return;
132 }
133 auto blockQue = quePool_[queueIndex].blockQue;
134 if (blockQue == nullptr) {
135 return;
136 }
137 blockQue->Clear();
138 quePool_[queueIndex].dataSize = 0;
139 quePool_[queueIndex].isValid = true;
140 return;
141 }
142
FreeQueue(uint32_t queueIndex)143 void BlockQueuePool::FreeQueue(uint32_t queueIndex)
144 {
145 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
146 ResetQueue(queueIndex);
147 quePool_[queueIndex].blockQue = nullptr;
148 }
149
Push(uint32_t trackIndex,std::shared_ptr<SamplePacket> block)150 bool BlockQueuePool::Push(uint32_t trackIndex, std::shared_ptr<SamplePacket> block)
151 {
152 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
153 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
154 if (!HasQueue(trackIndex)) {
155 Status ret = AddTrackQueue(trackIndex);
156 FALSE_RETURN_V_MSG_E(ret == Status::OK, false, "Add new queue error: " PUBLIC_LOG_D32, ret);
157 }
158 auto& queVector = queMap_[trackIndex];
159 uint32_t pushIndex;
160 if (queVector.size() > 0) {
161 pushIndex = queVector[queVector.size() - 1];
162 } else {
163 pushIndex = GetValidQueue();
164 queMap_[trackIndex].push_back(pushIndex);
165 MEDIA_LOG_D("Track has no queue, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
166 }
167 if (InnerQueueIsFull(pushIndex)) {
168 pushIndex = GetValidQueue();
169 queMap_[trackIndex].push_back(pushIndex);
170 MEDIA_LOG_D("Track que is full, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
171 }
172 if (quePool_[pushIndex].blockQue == nullptr) {
173 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, failed to push data", pushIndex);
174 return false;
175 }
176 sizeMap_[trackIndex] += 1;
177 for (auto pkt : block->pkts) {
178 quePool_[pushIndex].dataSize += static_cast<uint32_t>(pkt->size);
179 }
180 return quePool_[pushIndex].blockQue->Push(block);
181 }
182
Pop(uint32_t trackIndex)183 std::shared_ptr<SamplePacket> BlockQueuePool::Pop(uint32_t trackIndex)
184 {
185 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
186 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
187 if (!HasQueue(trackIndex)) {
188 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
189 return nullptr;
190 }
191 auto& queVector = queMap_[trackIndex];
192 for (auto index = 0; index < static_cast<int32_t>(queVector.size()); ++index) {
193 auto queIndex = queVector[index];
194 if (quePool_[queIndex].blockQue == nullptr) {
195 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
196 continue;
197 }
198 if (quePool_[queIndex].blockQue->Size() <= 0) {
199 continue;
200 }
201 auto block = quePool_[queIndex].blockQue->Pop();
202 if (block == nullptr) {
203 MEDIA_LOG_D("Block is nullptr");
204 continue;
205 }
206 for (auto pkt : block->pkts) {
207 if (pkt == nullptr) {
208 MEDIA_LOG_D("Pkt is nullptr, will find next");
209 continue;
210 }
211 uint32_t pktSize = static_cast<uint32_t>(pkt->size);
212 quePool_[queIndex].dataSize =
213 quePool_[queIndex].dataSize >= pktSize ? quePool_[queIndex].dataSize -= pktSize : 0;
214 }
215 if (quePool_[queIndex].blockQue->Empty()) {
216 ResetQueue(queIndex);
217 MEDIA_LOG_D("Track " PUBLIC_LOG_U32 " queue " PUBLIC_LOG_D32 " is empty, will return to pool",
218 trackIndex, queIndex);
219 queVector.erase(queVector.begin() + index);
220 }
221 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " for track " PUBLIC_LOG_U32,
222 name_.c_str(), trackIndex);
223 if (sizeMap_[trackIndex] > 0) {
224 sizeMap_[trackIndex] -= 1;
225 }
226 return block;
227 }
228 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
229 return nullptr;
230 }
231
Front(uint32_t trackIndex)232 std::shared_ptr<SamplePacket> BlockQueuePool::Front(uint32_t trackIndex)
233 {
234 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
235 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
236 if (!HasQueue(trackIndex)) {
237 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
238 return nullptr;
239 }
240 auto queVector = queMap_[trackIndex];
241 for (int i = 0; i < static_cast<int32_t>(queVector.size()); ++i) {
242 auto queIndex = queVector[i];
243 if (quePool_[queIndex].blockQue == nullptr) {
244 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
245 continue;
246 }
247 if (quePool_[queIndex].blockQue->Size() > 0) {
248 auto block = quePool_[queIndex].blockQue->Front();
249 return block;
250 }
251 }
252 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
253 return nullptr;
254 }
255
Back(uint32_t trackIndex)256 std::shared_ptr<SamplePacket> BlockQueuePool::Back(uint32_t trackIndex)
257 {
258 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
259 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
260 if (!HasQueue(trackIndex)) {
261 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
262 return nullptr;
263 }
264 auto queVector = queMap_[trackIndex];
265 if (queVector.size() > 0) {
266 auto lastQueIndex = queVector[queVector.size() - 1];
267 if (quePool_[lastQueIndex].blockQue != nullptr && quePool_[lastQueIndex].blockQue->Size() > 0) {
268 auto block = quePool_[lastQueIndex].blockQue->Back();
269 return block;
270 }
271 }
272 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
273 return nullptr;
274 }
275
GetValidQueue()276 uint32_t BlockQueuePool::GetValidQueue()
277 {
278 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
279 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
280 for (auto pair : quePool_) {
281 if (pair.second.isValid && pair.second.blockQue != nullptr && pair.second.blockQue->Empty()) {
282 quePool_[pair.first].isValid = false;
283 return pair.first;
284 }
285 }
286 quePool_[queCount_] = {
287 false,
288 0,
289 std::make_shared<BlockQueue<std::shared_ptr<SamplePacket>>>("source_que_" + std::to_string(queCount_),
290 singleQueSize_)
291 };
292 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", valid queue index: " PUBLIC_LOG_U32,
293 name_.c_str(), queCount_);
294 queCount_++;
295 return (queCount_ - 1);
296 }
297
InnerQueueIsFull(uint32_t queueIndex)298 bool BlockQueuePool::InnerQueueIsFull(uint32_t queueIndex)
299 {
300 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
301 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", queue " PUBLIC_LOG_U32, name_.c_str(), queueIndex);
302 if (quePool_[queueIndex].blockQue == nullptr) {
303 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_D32 " is nullptr", queueIndex);
304 return true;
305 }
306 return quePool_[queueIndex].blockQue->Size() >= quePool_[queueIndex].blockQue->Capacity();
307 }
308
HasQueue(uint32_t trackIndex)309 bool BlockQueuePool::HasQueue(uint32_t trackIndex)
310 {
311 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
312 return queMap_.count(trackIndex) > 0;
313 }
314 } // namespace Media
315 } // namespace OHOS