1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "BlockQueuePool"
17 
18 #include "common/log.h"
19 #include "block_queue_pool.h"
20 
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_DEMUXER, "BlockQueuePool" };
23 }
24 
25 namespace OHOS {
26 namespace Media {
27 
~BlockQueuePool()28 BlockQueuePool::~BlockQueuePool()
29 {
30     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
31     for (auto que : quePool_) {
32         FreeQueue(que.first);
33     }
34     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
35 }
36 
AddTrackQueue(uint32_t trackIndex)37 Status BlockQueuePool::AddTrackQueue(uint32_t trackIndex)
38 {
39     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
40     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
41     if (!HasQueue(trackIndex)) {
42         uint32_t queIndex = GetValidQueue();
43         queMap_[trackIndex] = std::vector<uint32_t>({ queIndex });
44         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", add track " PUBLIC_LOG_U32
45             ", get queue " PUBLIC_LOG_U32, name_.c_str(), trackIndex, queIndex);
46         sizeMap_[trackIndex] = 0;
47     } else {
48         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " is already in queue",
49             name_.c_str(), trackIndex);
50     }
51     return Status::OK;
52 }
53 
RemoveTrackQueue(uint32_t trackIndex)54 Status BlockQueuePool::RemoveTrackQueue(uint32_t trackIndex)
55 {
56     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
57     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
58     if (!HasQueue(trackIndex)) {
59         MEDIA_LOG_D("Block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " not in queue", name_.c_str(), trackIndex);
60     } else {
61         for (auto queIndex : queMap_[trackIndex]) {
62             ResetQueue(queIndex);
63         }
64         queMap_[trackIndex].clear();
65         queMap_.erase(trackIndex);
66         sizeMap_.erase(trackIndex);
67     }
68     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
69     return Status::OK;
70 }
71 
GetCacheSize(uint32_t trackIndex)72 size_t BlockQueuePool::GetCacheSize(uint32_t trackIndex)
73 {
74     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
75     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
76     size_t size = 0;
77     for (auto queIndex : queMap_[trackIndex]) {
78         if (quePool_[queIndex].blockQue == nullptr) {
79             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
80             continue;
81         }
82         if (quePool_[queIndex].blockQue->Size() > 0) {
83             MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " has cache", name_.c_str());
84             size += quePool_[queIndex].blockQue->Size();
85         }
86     }
87     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", size = " PUBLIC_LOG_ZU, name_.c_str(), size);
88     return size;
89 }
90 
GetCacheDataSize(uint32_t trackIndex)91 uint32_t BlockQueuePool::GetCacheDataSize(uint32_t trackIndex)
92 {
93     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
94     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
95     uint32_t dataSize = 0;
96     for (auto queIndex : queMap_[trackIndex]) {
97         if (static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize) > UINT32_MAX) {
98             MEDIA_LOG_D("DataSize(" PUBLIC_LOG_U64 ") is out of uint32",
99                 static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize));
100             return UINT32_MAX;
101         }
102         dataSize += quePool_[queIndex].dataSize;
103     }
104     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " cache size = " PUBLIC_LOG_U32,
105         name_.c_str(), trackIndex, dataSize);
106     return dataSize;
107 }
108 
HasCache(uint32_t trackIndex)109 bool BlockQueuePool::HasCache(uint32_t trackIndex)
110 {
111     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
112     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
113     for (auto queIndex : queMap_[trackIndex]) {
114         if (quePool_[queIndex].blockQue == nullptr) {
115             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
116             continue;
117         }
118         if (quePool_[queIndex].blockQue->Size() > 0) {
119             MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " have cache", name_.c_str());
120             return true;
121         }
122     }
123     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " don't have cache", name_.c_str());
124     return false;
125 }
126 
ResetQueue(uint32_t queueIndex)127 void BlockQueuePool::ResetQueue(uint32_t queueIndex)
128 {
129     if (quePool_.count(queueIndex) == 0) {
130         MEDIA_LOG_D("Error queueIndex");
131         return;
132     }
133     auto blockQue = quePool_[queueIndex].blockQue;
134     if (blockQue == nullptr) {
135         return;
136     }
137     blockQue->Clear();
138     quePool_[queueIndex].dataSize = 0;
139     quePool_[queueIndex].isValid = true;
140     return;
141 }
142 
FreeQueue(uint32_t queueIndex)143 void BlockQueuePool::FreeQueue(uint32_t queueIndex)
144 {
145     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
146     ResetQueue(queueIndex);
147     quePool_[queueIndex].blockQue = nullptr;
148 }
149 
Push(uint32_t trackIndex,std::shared_ptr<SamplePacket> block)150 bool BlockQueuePool::Push(uint32_t trackIndex, std::shared_ptr<SamplePacket> block)
151 {
152     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
153     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
154     if (!HasQueue(trackIndex)) {
155         Status ret = AddTrackQueue(trackIndex);
156         FALSE_RETURN_V_MSG_E(ret == Status::OK, false, "Add new queue error: " PUBLIC_LOG_D32, ret);
157     }
158     auto& queVector = queMap_[trackIndex];
159     uint32_t pushIndex;
160     if (queVector.size() > 0) {
161         pushIndex = queVector[queVector.size() - 1];
162     } else {
163         pushIndex = GetValidQueue();
164         queMap_[trackIndex].push_back(pushIndex);
165         MEDIA_LOG_D("Track has no queue, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
166     }
167     if (InnerQueueIsFull(pushIndex)) {
168         pushIndex = GetValidQueue();
169         queMap_[trackIndex].push_back(pushIndex);
170         MEDIA_LOG_D("Track que is full, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
171     }
172     if (quePool_[pushIndex].blockQue == nullptr) {
173         MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, failed to push data", pushIndex);
174         return false;
175     }
176     sizeMap_[trackIndex] += 1;
177     for (auto pkt : block->pkts) {
178         quePool_[pushIndex].dataSize += static_cast<uint32_t>(pkt->size);
179     }
180     return quePool_[pushIndex].blockQue->Push(block);
181 }
182 
Pop(uint32_t trackIndex)183 std::shared_ptr<SamplePacket> BlockQueuePool::Pop(uint32_t trackIndex)
184 {
185     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
186     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
187     if (!HasQueue(trackIndex)) {
188         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
189         return nullptr;
190     }
191     auto& queVector = queMap_[trackIndex];
192     for (auto index = 0; index < static_cast<int32_t>(queVector.size()); ++index) {
193         auto queIndex = queVector[index];
194         if (quePool_[queIndex].blockQue == nullptr) {
195             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
196             continue;
197         }
198         if (quePool_[queIndex].blockQue->Size() <= 0) {
199             continue;
200         }
201         auto block = quePool_[queIndex].blockQue->Pop();
202         if (block == nullptr) {
203             MEDIA_LOG_D("Block is nullptr");
204             continue;
205         }
206         for (auto pkt : block->pkts) {
207             if (pkt == nullptr) {
208                 MEDIA_LOG_D("Pkt is nullptr, will find next");
209                 continue;
210             }
211             uint32_t pktSize = static_cast<uint32_t>(pkt->size);
212             quePool_[queIndex].dataSize =
213                 quePool_[queIndex].dataSize >= pktSize ? quePool_[queIndex].dataSize -= pktSize : 0;
214         }
215         if (quePool_[queIndex].blockQue->Empty()) {
216             ResetQueue(queIndex);
217             MEDIA_LOG_D("Track " PUBLIC_LOG_U32 " queue " PUBLIC_LOG_D32 " is empty, will return to pool",
218                 trackIndex, queIndex);
219             queVector.erase(queVector.begin() + index);
220         }
221         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " for track " PUBLIC_LOG_U32,
222             name_.c_str(), trackIndex);
223         if (sizeMap_[trackIndex] > 0) {
224             sizeMap_[trackIndex] -= 1;
225         }
226         return block;
227     }
228     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
229     return nullptr;
230 }
231 
Front(uint32_t trackIndex)232 std::shared_ptr<SamplePacket> BlockQueuePool::Front(uint32_t trackIndex)
233 {
234     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
235     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
236     if (!HasQueue(trackIndex)) {
237         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
238         return nullptr;
239     }
240     auto queVector = queMap_[trackIndex];
241     for (int i = 0; i < static_cast<int32_t>(queVector.size()); ++i) {
242         auto queIndex = queVector[i];
243         if (quePool_[queIndex].blockQue == nullptr) {
244             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
245             continue;
246         }
247         if (quePool_[queIndex].blockQue->Size() > 0) {
248             auto block = quePool_[queIndex].blockQue->Front();
249             return block;
250         }
251     }
252     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
253     return nullptr;
254 }
255 
Back(uint32_t trackIndex)256 std::shared_ptr<SamplePacket> BlockQueuePool::Back(uint32_t trackIndex)
257 {
258     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
259     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
260     if (!HasQueue(trackIndex)) {
261         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
262         return nullptr;
263     }
264     auto queVector = queMap_[trackIndex];
265     if (queVector.size() > 0) {
266         auto lastQueIndex = queVector[queVector.size() - 1];
267         if (quePool_[lastQueIndex].blockQue != nullptr && quePool_[lastQueIndex].blockQue->Size() > 0) {
268             auto block = quePool_[lastQueIndex].blockQue->Back();
269             return block;
270         }
271     }
272     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
273     return nullptr;
274 }
275 
GetValidQueue()276 uint32_t BlockQueuePool::GetValidQueue()
277 {
278     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
279     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
280     for (auto pair : quePool_) {
281         if (pair.second.isValid && pair.second.blockQue != nullptr && pair.second.blockQue->Empty()) {
282             quePool_[pair.first].isValid = false;
283             return pair.first;
284         }
285     }
286     quePool_[queCount_] = {
287         false,
288         0,
289         std::make_shared<BlockQueue<std::shared_ptr<SamplePacket>>>("source_que_" + std::to_string(queCount_),
290             singleQueSize_)
291     };
292     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", valid queue index: " PUBLIC_LOG_U32,
293                  name_.c_str(), queCount_);
294     queCount_++;
295     return (queCount_ - 1);
296 }
297 
InnerQueueIsFull(uint32_t queueIndex)298 bool BlockQueuePool::InnerQueueIsFull(uint32_t queueIndex)
299 {
300     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
301     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", queue " PUBLIC_LOG_U32, name_.c_str(), queueIndex);
302     if (quePool_[queueIndex].blockQue == nullptr) {
303         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_D32 " is nullptr", queueIndex);
304         return true;
305     }
306     return quePool_[queueIndex].blockQue->Size() >= quePool_[queueIndex].blockQue->Capacity();
307 }
308 
HasQueue(uint32_t trackIndex)309 bool BlockQueuePool::HasQueue(uint32_t trackIndex)
310 {
311     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
312     return queMap_.count(trackIndex) > 0;
313 }
314 } // namespace Media
315 } // namespace OHOS