1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define HST_LOG_TAG "DataPacker"
16
17 #include "data_packer.h"
18 #include <cstring>
19 #include "foundation/log.h"
20 #include "foundation/utils/dump_buffer.h"
21
22 namespace OHOS {
23 namespace Media {
24 #define EXEC_WHEN_GET(isGet, exec) \
25 do { \
26 if (isGet) { \
27 exec; \
28 } \
29 } while (0)
30
31 static const DataPacker::Position INVALID_POSITION = DataPacker::Position(-1, 0, 0);
32 static constexpr size_t MAX_BUFFER_NUMBER_IN_DATA_PACKER = 30;
33
DataPacker()34 DataPacker::DataPacker() : mutex_(), que_(), size_(0), mediaOffset_(0), pts_(0), dts_(0),
35 prevGet_(INVALID_POSITION), currentGet_(INVALID_POSITION), capacity_(MAX_BUFFER_NUMBER_IN_DATA_PACKER)
36 {
37 MEDIA_LOG_I("DataPacker ctor...");
38 }
39
~DataPacker()40 DataPacker::~DataPacker()
41 {
42 MEDIA_LOG_I("DataPacker dtor...");
43 cvEmpty_.NotifyAll();
44 cvFull_.NotifyAll();
45 }
46
GetBufferSize(AVBufferPtr & ptr)47 inline static size_t GetBufferSize(AVBufferPtr& ptr)
48 {
49 return ptr->GetMemory()->GetSize();
50 }
51
GetBufferWritableData(AVBufferPtr & ptr,size_t size,size_t position=0)52 inline static uint8_t* GetBufferWritableData(AVBufferPtr& ptr, size_t size, size_t position = 0)
53 {
54 return ptr->GetMemory()->GetWritableAddr(size, position);
55 }
56
GetBufferReadOnlyData(AVBufferPtr & ptr)57 inline static const uint8_t* GetBufferReadOnlyData(AVBufferPtr& ptr)
58 {
59 return ptr->GetMemory()->GetReadOnlyData();
60 }
61
PushData(AVBufferPtr bufferPtr,uint64_t offset)62 void DataPacker::PushData(AVBufferPtr bufferPtr, uint64_t offset)
63 {
64 size_t bufferSize = GetBufferSize(bufferPtr);
65 MEDIA_LOG_DD("DataPacker PushData begin... buffer (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_ZU ")",
66 offset, bufferSize);
67 DUMP_BUFFER2LOG("DataPacker Push", bufferPtr, offset);
68 FALSE_RETURN_MSG(bufferSize > 0, "Can not push zero length buffer.");
69
70 OSAL::ScopedLock lock(mutex_);
71 if (que_.size() >= capacity_) {
72 MEDIA_LOG_D("DataPacker is full, waiting for pop.");
73 do {
74 cvFull_.WaitFor(lock, 1000, // 1000 ms
75 [this] { return que_.size() < capacity_ || stopped_.load(); });
76 if (stopped_.load()) {
77 MEDIA_LOG_D("DataPacker stopped, so return.");
78 return;
79 }
80 } while (que_.size() >= capacity_);
81 }
82
83 size_ += GetBufferSize(bufferPtr);
84 if (que_.empty()) {
85 mediaOffset_ = offset;
86 dts_ = bufferPtr->dts;
87 pts_ = bufferPtr->pts;
88 }
89 que_.emplace_back(std::move(bufferPtr));
90 cvEmpty_.NotifyOne();
91 MEDIA_LOG_DD("DataPacker PushData end. " PUBLIC_LOG_S, ToString().c_str());
92 }
93
94 // curOffset: the offset end of this dataPacker. if IsDataAvailable() false, we can get data from source from curOffset
IsDataAvailable(uint64_t offset,uint32_t size,uint64_t & curOffset)95 bool DataPacker::IsDataAvailable(uint64_t offset, uint32_t size, uint64_t &curOffset)
96 {
97 MEDIA_LOG_DD("dataPacker (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_U32 "), curOffsetEnd is " PUBLIC_LOG_U64,
98 mediaOffset_, size_.load(), mediaOffset_ + size_.load());
99 MEDIA_LOG_DD(PUBLIC_LOG_S, ToString().c_str());
100 OSAL::ScopedLock lock(mutex_);
101 auto curOffsetTemp = mediaOffset_;
102 if (que_.empty() || offset < curOffsetTemp || offset > curOffsetTemp + size_) { // 原有数据无法命中, 则删除原有数据
103 curOffset = offset;
104 FlushInternal();
105 MEDIA_LOG_DD("IsDataAvailable false, offset not in cached data, clear it.");
106 return false;
107 }
108 size_t bufCnt = que_.size();
109 uint64_t offsetEnd = offset + size;
110 uint64_t curOffsetEnd = mediaOffset_ + GetBufferSize(que_.front());
111 if (bufCnt == 1) {
112 curOffset = curOffsetEnd;
113 MEDIA_LOG_DD("IsDataAvailable bufCnt == 1, result " PUBLIC_LOG_D32, offsetEnd <= curOffsetEnd);
114 return offsetEnd <= curOffsetEnd;
115 }
116 auto preOffsetEnd = curOffsetEnd;
117 for (size_t i = 1; i < bufCnt; ++i) {
118 curOffsetEnd = preOffsetEnd + GetBufferSize(que_[i]);
119 if (curOffsetEnd >= offsetEnd) {
120 MEDIA_LOG_DD("IsDataAvailable true, last buffer index " PUBLIC_LOG_ZU ", offsetEnd " PUBLIC_LOG_U64
121 ", curOffsetEnd " PUBLIC_LOG_U64, i, offsetEnd, curOffsetEnd);
122 return true;
123 } else {
124 preOffsetEnd = curOffsetEnd;
125 }
126 }
127 if (preOffsetEnd >= offsetEnd) {
128 MEDIA_LOG_DD("IsDataAvailable true, use all buffers, last buffer index " PUBLIC_LOG_ZU ", offsetEnd "
129 PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64, bufCnt - 1, offsetEnd, curOffsetEnd);
130 return true;
131 }
132 curOffset = preOffsetEnd;
133 MEDIA_LOG_DD("IsDataAvailable false, offsetEnd " PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64,
134 offsetEnd, preOffsetEnd);
135 return false;
136 }
137
PeekRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)138 bool DataPacker::PeekRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
139 {
140 OSAL::ScopedLock lock(mutex_);
141 if (que_.empty()) {
142 MEDIA_LOG_D("DataPacker is empty, waiting for push.");
143 cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
144 }
145
146 return PeekRangeInternal(offset, size, bufferPtr, false);
147 }
148
149 // Should call IsDataAvailable() before to make sure there is enough buffer to copy.
150 // offset : the offset (of the media file) to peek ( 要peek的数据起始位置 在media file文件 中的 offset )
151 // size : the size of data to peek
152 // bufferPtr : out buffer
153 // isGet : is it called from GetRange.
PeekRangeInternal(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr,bool isGet)154 bool DataPacker::PeekRangeInternal(uint64_t offset, uint32_t size, AVBufferPtr &bufferPtr, bool isGet)
155 {
156 MEDIA_LOG_DD("PeekRangeInternal (offset, size) = (" PUBLIC_LOG_U64 ", " PUBLIC_LOG_U32 ")...", offset, size);
157 int32_t startIndex = 0; // The index of buffer that we first use
158 size_t copySize = 0;
159 uint32_t needCopySize = size;
160 uint32_t firstBufferOffset = 0;
161 uint8_t* dstPtr = GetBufferWritableData(bufferPtr, needCopySize);
162 FALSE_RETURN_V(dstPtr != nullptr, false);
163
164 auto offsetEnd = offset + needCopySize;
165 auto curOffsetEnd = mediaOffset_ + GetBufferSize(que_[startIndex]);
166 if (offsetEnd <= curOffsetEnd) { // first buffer is enough
167 auto bufferOffset = static_cast<int32_t>(offset - mediaOffset_);
168 FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
169 firstBufferOffset = bufferOffset;
170 copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
171 needCopySize -= copySize;
172 FALSE_LOG_MSG(needCopySize == 0, "First buffer is enough, but copySize is not enough");
173 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
174 return true;
175 } else { // first buffer not enough
176 // Find the first buffer that should copy
177 uint64_t prevOffset; // The media offset of the startIndex buffer start byte
178 FALSE_RETURN_V_MSG_E(FindFirstBufferToCopy(offset, startIndex, prevOffset), false,
179 "Read offset(" PUBLIC_LOG_D64 ") size(" PUBLIC_LOG_D32 ") from " PUBLIC_LOG_S,
180 offset, size, ToString().c_str());
181 auto bufferOffset = static_cast<int32_t>(offset - prevOffset);
182 FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
183 firstBufferOffset = bufferOffset;
184 copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
185
186 needCopySize -= copySize;
187 if (needCopySize == 0) { // First buffer is enough
188 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
189 return true;
190 }
191 dstPtr += copySize;
192
193 // First buffer is not enough, copy from successive buffers
194 (void)CopyFromSuccessiveBuffer(prevOffset, offsetEnd, startIndex, dstPtr, needCopySize);
195 }
196 EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
197
198 // Update to the real size, especially at the end.
199 bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
200 return true;
201 }
202
203 // Call IsDataAvailable() first before call GetRange
GetRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)204 bool DataPacker::GetRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
205 {
206 MEDIA_LOG_DD("DataPacker GetRange(offset, size) = (" PUBLIC_LOG_U64 ", "
207 PUBLIC_LOG_U32 ")...", offset, size);
208 DUMP_BUFFER2LOG("GetRange Input", bufferPtr, 0);
209 FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
210 "GetRange input bufferPtr empty or capacity not enough.");
211
212 OSAL::ScopedLock lock(mutex_);
213 if (que_.empty()) {
214 MEDIA_LOG_D("DataPacker is empty, waiting for push");
215 cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
216 }
217
218 FALSE_RETURN_V(!que_.empty(), false);
219 prevGet_ = currentGet_; // store last get position to prevGet_
220
221 FALSE_RETURN_V(PeekRangeInternal(offset, size, bufferPtr, true), false);
222 if (isEos_ && size_ <= size) { // Is EOS, and this time get all the data.
223 FlushInternal();
224 } else {
225 if (prevGet_ < currentGet_) {
226 RemoveOldData(currentGet_);
227 }
228 }
229
230 if (que_.size() < capacity_) {
231 cvFull_.NotifyOne();
232 }
233 return true;
234 }
235
236 // GetRange in live play mode
237 // 1. not use offset
238 // 2. remove the data have been read
GetRange(uint32_t size,AVBufferPtr & bufferPtr)239 bool DataPacker::GetRange(uint32_t size, AVBufferPtr& bufferPtr)
240 {
241 MEDIA_LOG_D("DataPacker live play GetRange(size) = (" PUBLIC_LOG_U32 ")...", size);
242 FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
243 "Live play GetRange input bufferPtr empty or capacity not enough.");
244
245 OSAL::ScopedLock lock(mutex_);
246 if (que_.empty()) {
247 FALSE_RETURN_V_W(!isEos_, false);
248 MEDIA_LOG_D("DataPacker is empty, live play GetRange waiting for push");
249 cvEmpty_.Wait(lock, [this] { return !que_.empty() || isEos_; });
250 if (isEos_) {
251 MEDIA_LOG_D("Eos wakeup the cvEmpty ConditionVariable");
252 return false;
253 }
254 }
255
256 FALSE_RETURN_V(!que_.empty(), false);
257
258 int32_t needCopySize = static_cast<int32_t>(size);
259 int32_t index = 0;
260 uint32_t lastBufferOffsetEnd = 0;
261
262 uint8_t* dstPtr = GetBufferWritableData(bufferPtr, size);
263 FALSE_RETURN_V(dstPtr != nullptr, false);
264
265 while (static_cast<uint32_t>(index) < que_.size()) {
266 AVBufferPtr& buffer = que_[index];
267 size_t bufferSize = GetBufferSize(buffer);
268 int32_t currCopySize = std::min(static_cast<int32_t>(bufferSize), needCopySize);
269 currCopySize = CopyFirstBuffer(currCopySize, index, dstPtr, bufferPtr, 0);
270 lastBufferOffsetEnd = currCopySize;
271 dstPtr += currCopySize;
272 needCopySize -= currCopySize;
273 if (needCopySize <= 0) { // it is enough
274 break;
275 }
276 index++;
277 lastBufferOffsetEnd = 0;
278 }
279 FALSE_LOG(needCopySize >= 0);
280 if (needCopySize < 0) {
281 needCopySize = 0;
282 }
283 bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
284
285 auto endPosition = Position(index, lastBufferOffsetEnd, mediaOffset_ + size - needCopySize);
286 RemoveOldData(endPosition); // Live play, remove the got data
287 if (que_.size() < capacity_) {
288 cvFull_.NotifyOne();
289 }
290 return true;
291 }
292
Flush()293 void DataPacker::Flush()
294 {
295 MEDIA_LOG_I("DataPacker Flush called.");
296 OSAL::ScopedLock lock(mutex_);
297 FlushInternal();
298 }
299
SetEos()300 void DataPacker::SetEos()
301 {
302 MEDIA_LOG_I("DataPacker SetEos called.");
303 OSAL::ScopedLock lock(mutex_);
304 isEos_ = true;
305 cvEmpty_.NotifyOne();
306 }
307
IsEmpty()308 bool DataPacker::IsEmpty()
309 {
310 OSAL::ScopedLock lock(mutex_);
311 return size_ > 0;
312 }
313
Start()314 void DataPacker::Start()
315 {
316 MEDIA_LOG_I("DataPacker Start called.");
317 stopped_.store(false);
318 }
319
Stop()320 void DataPacker::Stop()
321 {
322 MEDIA_LOG_I("DataPacker Stop called.");
323 stopped_.store(true);
324 cvEmpty_.NotifyAll(); // avoid some thread can not exit
325 cvFull_.NotifyAll();
326 }
327
FlushInternal()328 void DataPacker::FlushInternal()
329 {
330 MEDIA_LOG_D("DataPacker FlushInternal called.");
331 que_.clear();
332 size_ = 0;
333 mediaOffset_ = 0;
334 dts_ = 0;
335 pts_ = 0;
336 isEos_ = false;
337 prevGet_ = INVALID_POSITION;
338 currentGet_ = INVALID_POSITION;
339 }
340
341 // Remove first removeSize data in the buffer
RemoveBufferContent(std::shared_ptr<AVBuffer> & buffer,size_t removeSize)342 void DataPacker::RemoveBufferContent(std::shared_ptr<AVBuffer> &buffer, size_t removeSize)
343 {
344 if (removeSize == 0) {
345 return;
346 }
347 auto memory = buffer->GetMemory();
348 FALSE_RETURN(removeSize < memory->GetSize());
349 auto copySize = memory->GetSize() - removeSize;
350 FALSE_LOG_MSG(memmove_s(memory->GetWritableAddr(copySize), memory->GetCapacity(),
351 memory->GetReadOnlyData(removeSize), copySize) == EOK, "memmove failed.");
352 FALSE_RETURN(UpdateWhenFrontDataRemoved(removeSize));
353 }
354
355 // Remove consumed data, and make the remaining data continuous
356 // Consumed data - between prevGet_.first and currentGet_.first
357 // In order to make remaining data continuous, also remove the data before prevGet_.first
358 // Update to support live play mode, Remove the data before position
RemoveOldData(const Position & position)359 void DataPacker::RemoveOldData(const Position& position)
360 {
361 MEDIA_LOG_DD("Before RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
362 FALSE_LOG(RemoveTo(position));
363 if (que_.empty()) {
364 mediaOffset_ = 0;
365 size_ = 0;
366 pts_ = 0;
367 dts_ = 0;
368 } else {
369 pts_ = que_.front()->pts;
370 dts_ = que_.front()->dts;
371 }
372 MEDIA_LOG_DD("After RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
373 }
374
RemoveTo(const Position & position)375 bool DataPacker::RemoveTo(const Position& position)
376 {
377 MEDIA_LOG_DD("Remove to " PUBLIC_LOG_S, position.ToString().c_str());
378 size_t removeSize;
379 int32_t i = 0;
380 while (i < position.index && !que_.empty()) { // Remove all whole buffer before position.index
381 removeSize = GetBufferSize(que_.front());
382 FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
383 que_.pop_front();
384 i++;
385 }
386 FALSE_RETURN_V_W(!que_.empty(), true);
387
388 // The last buffer
389 removeSize = GetBufferSize(que_.front());
390 // 1. If whole buffer should be removed
391 if (position.bufferOffset >= removeSize) {
392 FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
393 que_.pop_front();
394 return true;
395 }
396 // 2. Remove the front part of the buffer data
397 RemoveBufferContent(que_.front(), position.bufferOffset);
398 return true;
399 }
400
UpdateWhenFrontDataRemoved(size_t removeSize)401 bool DataPacker::UpdateWhenFrontDataRemoved(size_t removeSize)
402 {
403 mediaOffset_ += removeSize;
404 FALSE_RETURN_V_MSG_E(size_.load() >= removeSize, false, "Total size(size_ " PUBLIC_LOG_U32
405 ") smaller than removeSize(" PUBLIC_LOG_ZU ")", size_.load(), removeSize);
406 size_ -= removeSize;
407 return true;
408 }
409
410 // offset : from GetRange(offset, size)
411 // startIndex : out, find the first buffer should copy
412 // prevOffset : the first copied buffer's media offset.
FindFirstBufferToCopy(uint64_t offset,int32_t & startIndex,uint64_t & prevOffset)413 bool DataPacker::FindFirstBufferToCopy(uint64_t offset, int32_t &startIndex, uint64_t &prevOffset)
414 {
415 startIndex = 0;
416 prevOffset= mediaOffset_;
417 do {
418 if (offset >= prevOffset && offset - prevOffset < GetBufferSize(que_[startIndex])) {
419 return true;
420 }
421 prevOffset += GetBufferSize(que_[startIndex]);
422 startIndex++;
423 } while (static_cast<size_t>(startIndex) < que_.size());
424 return false;
425 }
426
427 // size : the GetRange size
428 // dstPtr : copy data to here
429 // dstBufferPtr : the AVBuffer contains dstPtr, pass this parameter to update pts / dts.
430 // bufferOffset : the buffer offset that we start copy
CopyFirstBuffer(size_t size,int32_t index,uint8_t * dstPtr,AVBufferPtr & dstBufferPtr,int32_t bufferOffset)431 size_t DataPacker::CopyFirstBuffer(size_t size, int32_t index, uint8_t *dstPtr, AVBufferPtr &dstBufferPtr,
432 int32_t bufferOffset)
433 {
434 auto remainSize = static_cast<int32_t>(GetBufferSize(que_[index]) - bufferOffset);
435 FALSE_RETURN_V_MSG_E(remainSize > 0, 0, "Copy size can not be negative.");
436 size_t copySize = std::min(static_cast<size_t>(remainSize), size);
437 NZERO_LOG(memcpy_s(dstPtr, copySize,
438 GetBufferReadOnlyData(que_[index]) + bufferOffset, copySize));
439
440 dstBufferPtr->pts = que_[index]->pts;
441 dstBufferPtr->dts = que_[index]->dts;
442 return copySize;
443 }
444
445 // prevOffset : the media offset of the first byte in the startIndex + 1 buffer
446 // offsetEnd : calculate from GetRange(offset, size), offsetEnd = offset + size.
447 // startIndex : the index start copy data for this GetRange. CopyFromSuccessiveBuffer process from startIndex + 1.
448 // dstPtr : copy data to here
449 // needCopySize : in and out, indicate how many bytes still need to copy.
CopyFromSuccessiveBuffer(uint64_t prevOffset,uint64_t offsetEnd,int32_t startIndex,uint8_t * dstPtr,uint32_t & needCopySize)450 int32_t DataPacker::CopyFromSuccessiveBuffer(uint64_t prevOffset, uint64_t offsetEnd, int32_t startIndex,
451 uint8_t *dstPtr, uint32_t &needCopySize)
452 {
453 size_t copySize;
454 int32_t usedCount = 0;
455 prevOffset = prevOffset + GetBufferSize(que_[startIndex]);
456 for (size_t i = startIndex + 1; i < que_.size(); ++i) {
457 usedCount++;
458 uint64_t curOffsetEnd = prevOffset + GetBufferSize(que_[i]);
459 if (curOffsetEnd >= offsetEnd) { // This buffer is enough
460 NZERO_LOG(memcpy_s(dstPtr, needCopySize, GetBufferReadOnlyData(que_[i]), needCopySize));
461 needCopySize = 0;
462 return usedCount; // Finished copy buffer
463 } else {
464 copySize = GetBufferSize(que_[i]);
465 NZERO_LOG(memcpy_s(dstPtr, copySize, GetBufferReadOnlyData(que_[i]), copySize));
466 dstPtr += copySize;
467 needCopySize -= copySize;
468 prevOffset += copySize;
469 }
470 }
471 MEDIA_LOG_W("Processed all cached buffers, still not meet offsetEnd, maybe EOS reached.");
472 return usedCount;
473 }
474
ToString() const475 std::string DataPacker::ToString() const
476 {
477 return "DataPacker (offset " + std::to_string(mediaOffset_) + ", size " + std::to_string(size_) +
478 ", buffer count " + std::to_string(que_.size()) + ")";
479 }
480 } // namespace Media
481 } // namespace OHOS
482