1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define HST_LOG_TAG "DataPacker"
16 
17 #include "data_packer.h"
18 #include <cstring>
19 #include "foundation/log.h"
20 #include "foundation/utils/dump_buffer.h"
21 
22 namespace OHOS {
23 namespace Media {
24 #define EXEC_WHEN_GET(isGet, exec)     \
25     do {                               \
26         if (isGet) {                   \
27             exec;                      \
28         }                              \
29     } while (0)
30 
31 static const DataPacker::Position INVALID_POSITION = DataPacker::Position(-1, 0, 0);
32 static constexpr size_t MAX_BUFFER_NUMBER_IN_DATA_PACKER = 30;
33 
DataPacker()34 DataPacker::DataPacker() : mutex_(), que_(), size_(0), mediaOffset_(0), pts_(0), dts_(0),
35     prevGet_(INVALID_POSITION), currentGet_(INVALID_POSITION), capacity_(MAX_BUFFER_NUMBER_IN_DATA_PACKER)
36 {
37     MEDIA_LOG_I("DataPacker ctor...");
38 }
39 
~DataPacker()40 DataPacker::~DataPacker()
41 {
42     MEDIA_LOG_I("DataPacker dtor...");
43     cvEmpty_.NotifyAll();
44     cvFull_.NotifyAll();
45 }
46 
GetBufferSize(AVBufferPtr & ptr)47 inline static size_t GetBufferSize(AVBufferPtr& ptr)
48 {
49     return ptr->GetMemory()->GetSize();
50 }
51 
GetBufferWritableData(AVBufferPtr & ptr,size_t size,size_t position=0)52 inline static uint8_t* GetBufferWritableData(AVBufferPtr& ptr, size_t size, size_t position = 0)
53 {
54     return ptr->GetMemory()->GetWritableAddr(size, position);
55 }
56 
GetBufferReadOnlyData(AVBufferPtr & ptr)57 inline static const uint8_t* GetBufferReadOnlyData(AVBufferPtr& ptr)
58 {
59     return ptr->GetMemory()->GetReadOnlyData();
60 }
61 
PushData(AVBufferPtr bufferPtr,uint64_t offset)62 void DataPacker::PushData(AVBufferPtr bufferPtr, uint64_t offset)
63 {
64     size_t bufferSize = GetBufferSize(bufferPtr);
65     MEDIA_LOG_DD("DataPacker PushData begin... buffer (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_ZU ")",
66                  offset, bufferSize);
67     DUMP_BUFFER2LOG("DataPacker Push", bufferPtr, offset);
68     FALSE_RETURN_MSG(bufferSize > 0, "Can not push zero length buffer.");
69 
70     OSAL::ScopedLock lock(mutex_);
71     if (que_.size() >= capacity_) {
72         MEDIA_LOG_D("DataPacker is full, waiting for pop.");
73         do {
74             cvFull_.WaitFor(lock, 1000,  // 1000 ms
75                             [this] { return que_.size() < capacity_ || stopped_.load(); });
76             if (stopped_.load()) {
77                 MEDIA_LOG_D("DataPacker stopped, so return.");
78                 return;
79             }
80         } while (que_.size() >= capacity_);
81     }
82 
83     size_ += GetBufferSize(bufferPtr);
84     if (que_.empty()) {
85         mediaOffset_ = offset;
86         dts_ = bufferPtr->dts;
87         pts_ = bufferPtr->pts;
88     }
89     que_.emplace_back(std::move(bufferPtr));
90     cvEmpty_.NotifyOne();
91     MEDIA_LOG_DD("DataPacker PushData end. " PUBLIC_LOG_S, ToString().c_str());
92 }
93 
94 // curOffset: the offset end of this dataPacker. if IsDataAvailable() false, we can get data from source from curOffset
IsDataAvailable(uint64_t offset,uint32_t size,uint64_t & curOffset)95 bool DataPacker::IsDataAvailable(uint64_t offset, uint32_t size, uint64_t &curOffset)
96 {
97     MEDIA_LOG_DD("dataPacker (offset " PUBLIC_LOG_U64 ", size " PUBLIC_LOG_U32 "), curOffsetEnd is " PUBLIC_LOG_U64,
98                  mediaOffset_, size_.load(), mediaOffset_ + size_.load());
99     MEDIA_LOG_DD(PUBLIC_LOG_S, ToString().c_str());
100     OSAL::ScopedLock lock(mutex_);
101     auto curOffsetTemp = mediaOffset_;
102     if (que_.empty() || offset < curOffsetTemp || offset > curOffsetTemp + size_) { // 原有数据无法命中, 则删除原有数据
103         curOffset = offset;
104         FlushInternal();
105         MEDIA_LOG_DD("IsDataAvailable false, offset not in cached data, clear it.");
106         return false;
107     }
108     size_t bufCnt = que_.size();
109     uint64_t offsetEnd = offset + size;
110     uint64_t curOffsetEnd = mediaOffset_ + GetBufferSize(que_.front());
111     if (bufCnt == 1) {
112         curOffset = curOffsetEnd;
113         MEDIA_LOG_DD("IsDataAvailable bufCnt == 1, result " PUBLIC_LOG_D32, offsetEnd <= curOffsetEnd);
114         return offsetEnd <= curOffsetEnd;
115     }
116     auto preOffsetEnd = curOffsetEnd;
117     for (size_t i = 1; i < bufCnt; ++i) {
118         curOffsetEnd = preOffsetEnd + GetBufferSize(que_[i]);
119         if (curOffsetEnd >= offsetEnd) {
120             MEDIA_LOG_DD("IsDataAvailable true, last buffer index " PUBLIC_LOG_ZU ", offsetEnd " PUBLIC_LOG_U64
121                          ", curOffsetEnd " PUBLIC_LOG_U64, i, offsetEnd, curOffsetEnd);
122             return true;
123         } else {
124             preOffsetEnd = curOffsetEnd;
125         }
126     }
127     if (preOffsetEnd >= offsetEnd) {
128         MEDIA_LOG_DD("IsDataAvailable true, use all buffers, last buffer index " PUBLIC_LOG_ZU ", offsetEnd "
129                      PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64, bufCnt - 1, offsetEnd, curOffsetEnd);
130         return true;
131     }
132     curOffset = preOffsetEnd;
133     MEDIA_LOG_DD("IsDataAvailable false, offsetEnd " PUBLIC_LOG_U64 ", curOffsetEnd " PUBLIC_LOG_U64,
134                  offsetEnd, preOffsetEnd);
135     return false;
136 }
137 
PeekRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)138 bool DataPacker::PeekRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
139 {
140     OSAL::ScopedLock lock(mutex_);
141     if (que_.empty()) {
142         MEDIA_LOG_D("DataPacker is empty, waiting for push.");
143         cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
144     }
145 
146     return PeekRangeInternal(offset, size, bufferPtr, false);
147 }
148 
149 // Should call IsDataAvailable() before to make sure there is enough buffer to copy.
150 // offset : the offset (of the media file) to peek ( 要peek的数据起始位置 在media file文件 中的 offset )
151 // size : the size of data to peek
152 // bufferPtr : out buffer
153 // isGet : is it called from GetRange.
PeekRangeInternal(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr,bool isGet)154 bool DataPacker::PeekRangeInternal(uint64_t offset, uint32_t size, AVBufferPtr &bufferPtr, bool isGet)
155 {
156     MEDIA_LOG_DD("PeekRangeInternal (offset, size) = (" PUBLIC_LOG_U64 ", " PUBLIC_LOG_U32 ")...", offset, size);
157     int32_t startIndex = 0; // The index of buffer that we first use
158     size_t copySize = 0;
159     uint32_t needCopySize = size;
160     uint32_t firstBufferOffset = 0;
161     uint8_t* dstPtr = GetBufferWritableData(bufferPtr, needCopySize);
162     FALSE_RETURN_V(dstPtr != nullptr, false);
163 
164     auto offsetEnd = offset + needCopySize;
165     auto curOffsetEnd = mediaOffset_ + GetBufferSize(que_[startIndex]);
166     if (offsetEnd <= curOffsetEnd) { // first buffer is enough
167         auto bufferOffset = static_cast<int32_t>(offset - mediaOffset_);
168         FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
169         firstBufferOffset = bufferOffset;
170         copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
171         needCopySize -= copySize;
172         FALSE_LOG_MSG(needCopySize == 0, "First buffer is enough, but copySize is not enough");
173         EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
174         return true;
175     } else { // first buffer not enough
176         // Find the first buffer that should copy
177         uint64_t prevOffset; // The media offset of the startIndex buffer start byte
178         FALSE_RETURN_V_MSG_E(FindFirstBufferToCopy(offset, startIndex, prevOffset), false,
179             "Read offset(" PUBLIC_LOG_D64 ") size(" PUBLIC_LOG_D32 ") from " PUBLIC_LOG_S,
180             offset, size, ToString().c_str());
181         auto bufferOffset = static_cast<int32_t>(offset - prevOffset);
182         FALSE_RETURN_V_MSG_E(bufferOffset >= 0, false, "Copy buffer start position error.");
183         firstBufferOffset = bufferOffset;
184         copySize = CopyFirstBuffer(size, startIndex, dstPtr, bufferPtr, bufferOffset);
185 
186         needCopySize -= copySize;
187         if (needCopySize == 0) { // First buffer is enough
188             EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
189             return true;
190         }
191         dstPtr += copySize;
192 
193         // First buffer is not enough, copy from successive buffers
194         (void)CopyFromSuccessiveBuffer(prevOffset, offsetEnd, startIndex, dstPtr, needCopySize);
195     }
196     EXEC_WHEN_GET(isGet, currentGet_ = Position(startIndex, firstBufferOffset, offset));
197 
198     // Update to the real size, especially at the end.
199     bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
200     return true;
201 }
202 
203 // Call IsDataAvailable() first before call GetRange
GetRange(uint64_t offset,uint32_t size,AVBufferPtr & bufferPtr)204 bool DataPacker::GetRange(uint64_t offset, uint32_t size, AVBufferPtr& bufferPtr)
205 {
206     MEDIA_LOG_DD("DataPacker GetRange(offset, size) = (" PUBLIC_LOG_U64 ", "
207                  PUBLIC_LOG_U32 ")...", offset, size);
208     DUMP_BUFFER2LOG("GetRange Input", bufferPtr, 0);
209     FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
210         "GetRange input bufferPtr empty or capacity not enough.");
211 
212     OSAL::ScopedLock lock(mutex_);
213     if (que_.empty()) {
214         MEDIA_LOG_D("DataPacker is empty, waiting for push");
215         cvEmpty_.Wait(lock, [this] { return !que_.empty(); });
216     }
217 
218     FALSE_RETURN_V(!que_.empty(), false);
219     prevGet_ = currentGet_; // store last get position to prevGet_
220 
221     FALSE_RETURN_V(PeekRangeInternal(offset, size, bufferPtr, true), false);
222     if (isEos_ && size_ <= size) { // Is EOS, and this time get all the data.
223         FlushInternal();
224     } else {
225         if (prevGet_ < currentGet_) {
226             RemoveOldData(currentGet_);
227         }
228     }
229 
230     if (que_.size() < capacity_) {
231         cvFull_.NotifyOne();
232     }
233     return true;
234 }
235 
236 // GetRange in live play mode
237 //  1. not use offset
238 //  2. remove the data have been read
GetRange(uint32_t size,AVBufferPtr & bufferPtr)239 bool DataPacker::GetRange(uint32_t size, AVBufferPtr& bufferPtr)
240 {
241     MEDIA_LOG_D("DataPacker live play GetRange(size) = (" PUBLIC_LOG_U32 ")...", size);
242     FALSE_RETURN_V_MSG_E(bufferPtr && (!bufferPtr->IsEmpty()) && bufferPtr->GetMemory()->GetCapacity() >= size, false,
243         "Live play GetRange input bufferPtr empty or capacity not enough.");
244 
245     OSAL::ScopedLock lock(mutex_);
246     if (que_.empty()) {
247         FALSE_RETURN_V_W(!isEos_, false);
248         MEDIA_LOG_D("DataPacker is empty, live play GetRange waiting for push");
249         cvEmpty_.Wait(lock, [this] { return !que_.empty() || isEos_; });
250         if (isEos_) {
251             MEDIA_LOG_D("Eos wakeup the cvEmpty ConditionVariable");
252             return false;
253         }
254     }
255 
256     FALSE_RETURN_V(!que_.empty(), false);
257 
258     int32_t needCopySize = static_cast<int32_t>(size);
259     int32_t index = 0;
260     uint32_t lastBufferOffsetEnd = 0;
261 
262     uint8_t* dstPtr = GetBufferWritableData(bufferPtr, size);
263     FALSE_RETURN_V(dstPtr != nullptr, false);
264 
265     while (static_cast<uint32_t>(index) < que_.size()) {
266         AVBufferPtr& buffer = que_[index];
267         size_t bufferSize = GetBufferSize(buffer);
268         int32_t currCopySize = std::min(static_cast<int32_t>(bufferSize), needCopySize);
269         currCopySize = CopyFirstBuffer(currCopySize, index, dstPtr, bufferPtr, 0);
270         lastBufferOffsetEnd = currCopySize;
271         dstPtr += currCopySize;
272         needCopySize -= currCopySize;
273         if (needCopySize <= 0) { // it is enough
274             break;
275         }
276         index++;
277         lastBufferOffsetEnd = 0;
278     }
279     FALSE_LOG(needCopySize >= 0);
280     if (needCopySize < 0) {
281         needCopySize = 0;
282     }
283     bufferPtr->GetMemory()->UpdateDataSize(size - needCopySize);
284 
285     auto endPosition = Position(index, lastBufferOffsetEnd, mediaOffset_ + size - needCopySize);
286     RemoveOldData(endPosition); // Live play, remove the got data
287     if (que_.size() < capacity_) {
288         cvFull_.NotifyOne();
289     }
290     return true;
291 }
292 
Flush()293 void DataPacker::Flush()
294 {
295     MEDIA_LOG_I("DataPacker Flush called.");
296     OSAL::ScopedLock lock(mutex_);
297     FlushInternal();
298 }
299 
SetEos()300 void DataPacker::SetEos()
301 {
302     MEDIA_LOG_I("DataPacker SetEos called.");
303     OSAL::ScopedLock lock(mutex_);
304     isEos_ = true;
305     cvEmpty_.NotifyOne();
306 }
307 
IsEmpty()308 bool DataPacker::IsEmpty()
309 {
310     OSAL::ScopedLock lock(mutex_);
311     return size_ > 0;
312 }
313 
Start()314 void DataPacker::Start()
315 {
316     MEDIA_LOG_I("DataPacker Start called.");
317     stopped_.store(false);
318 }
319 
Stop()320 void DataPacker::Stop()
321 {
322     MEDIA_LOG_I("DataPacker Stop called.");
323     stopped_.store(true);
324     cvEmpty_.NotifyAll(); // avoid some thread can not exit
325     cvFull_.NotifyAll();
326 }
327 
FlushInternal()328 void DataPacker::FlushInternal()
329 {
330     MEDIA_LOG_D("DataPacker FlushInternal called.");
331     que_.clear();
332     size_ = 0;
333     mediaOffset_ = 0;
334     dts_ = 0;
335     pts_ = 0;
336     isEos_ = false;
337     prevGet_ = INVALID_POSITION;
338     currentGet_ = INVALID_POSITION;
339 }
340 
341 // Remove first removeSize data in the buffer
RemoveBufferContent(std::shared_ptr<AVBuffer> & buffer,size_t removeSize)342 void DataPacker::RemoveBufferContent(std::shared_ptr<AVBuffer> &buffer, size_t removeSize)
343 {
344     if (removeSize == 0) {
345         return;
346     }
347     auto memory = buffer->GetMemory();
348     FALSE_RETURN(removeSize < memory->GetSize());
349     auto copySize = memory->GetSize() - removeSize;
350     FALSE_LOG_MSG(memmove_s(memory->GetWritableAddr(copySize), memory->GetCapacity(),
351         memory->GetReadOnlyData(removeSize), copySize) == EOK, "memmove failed.");
352     FALSE_RETURN(UpdateWhenFrontDataRemoved(removeSize));
353 }
354 
355 // Remove consumed data, and make the remaining data continuous
356 // Consumed data - between prevGet_.first and currentGet_.first
357 // In order to make remaining data continuous, also remove the data before prevGet_.first
358 // Update to support live play mode, Remove the data before position
RemoveOldData(const Position & position)359 void DataPacker::RemoveOldData(const Position& position)
360 {
361     MEDIA_LOG_DD("Before RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
362     FALSE_LOG(RemoveTo(position));
363     if (que_.empty()) {
364         mediaOffset_ = 0;
365         size_ = 0;
366         pts_ = 0;
367         dts_ = 0;
368     } else {
369         pts_ = que_.front()->pts;
370         dts_ = que_.front()->dts;
371     }
372     MEDIA_LOG_DD("After RemoveOldData " PUBLIC_LOG_S, ToString().c_str());
373 }
374 
RemoveTo(const Position & position)375 bool DataPacker::RemoveTo(const Position& position)
376 {
377     MEDIA_LOG_DD("Remove to " PUBLIC_LOG_S, position.ToString().c_str());
378     size_t removeSize;
379     int32_t i = 0;
380     while (i < position.index && !que_.empty()) { // Remove all whole buffer before position.index
381         removeSize = GetBufferSize(que_.front());
382         FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
383         que_.pop_front();
384         i++;
385     }
386     FALSE_RETURN_V_W(!que_.empty(), true);
387 
388     // The last buffer
389     removeSize = GetBufferSize(que_.front());
390     // 1. If whole buffer should be removed
391     if (position.bufferOffset >= removeSize) {
392         FALSE_RETURN_V(UpdateWhenFrontDataRemoved(removeSize), false);
393         que_.pop_front();
394         return true;
395     }
396     // 2. Remove the front part of the buffer data
397     RemoveBufferContent(que_.front(), position.bufferOffset);
398     return true;
399 }
400 
UpdateWhenFrontDataRemoved(size_t removeSize)401 bool DataPacker::UpdateWhenFrontDataRemoved(size_t removeSize)
402 {
403     mediaOffset_ += removeSize;
404     FALSE_RETURN_V_MSG_E(size_.load() >= removeSize, false, "Total size(size_ " PUBLIC_LOG_U32
405         ") smaller than removeSize(" PUBLIC_LOG_ZU ")", size_.load(), removeSize);
406     size_ -= removeSize;
407     return true;
408 }
409 
410 // offset : from GetRange(offset, size)
411 // startIndex : out, find the first buffer should copy
412 // prevOffset : the first copied buffer's media offset.
FindFirstBufferToCopy(uint64_t offset,int32_t & startIndex,uint64_t & prevOffset)413 bool DataPacker::FindFirstBufferToCopy(uint64_t offset, int32_t &startIndex, uint64_t &prevOffset)
414 {
415     startIndex = 0;
416     prevOffset= mediaOffset_;
417     do {
418         if (offset >= prevOffset && offset - prevOffset < GetBufferSize(que_[startIndex])) {
419             return true;
420         }
421         prevOffset += GetBufferSize(que_[startIndex]);
422         startIndex++;
423     } while (static_cast<size_t>(startIndex) < que_.size());
424     return false;
425 }
426 
427 // size : the GetRange size
428 // dstPtr : copy data to here
429 // dstBufferPtr : the AVBuffer contains dstPtr, pass this parameter to update pts / dts.
430 // bufferOffset : the buffer offset that we start copy
CopyFirstBuffer(size_t size,int32_t index,uint8_t * dstPtr,AVBufferPtr & dstBufferPtr,int32_t bufferOffset)431 size_t DataPacker::CopyFirstBuffer(size_t size, int32_t index, uint8_t *dstPtr, AVBufferPtr &dstBufferPtr,
432                                    int32_t bufferOffset)
433 {
434     auto remainSize = static_cast<int32_t>(GetBufferSize(que_[index]) - bufferOffset);
435     FALSE_RETURN_V_MSG_E(remainSize > 0, 0, "Copy size can not be negative.");
436     size_t copySize = std::min(static_cast<size_t>(remainSize), size);
437     NZERO_LOG(memcpy_s(dstPtr, copySize,
438                        GetBufferReadOnlyData(que_[index]) + bufferOffset, copySize));
439 
440     dstBufferPtr->pts = que_[index]->pts;
441     dstBufferPtr->dts = que_[index]->dts;
442     return copySize;
443 }
444 
445 // prevOffset : the media offset of the first byte in the startIndex + 1 buffer
446 // offsetEnd : calculate from GetRange(offset, size), offsetEnd = offset + size.
447 // startIndex : the index start copy data for this GetRange. CopyFromSuccessiveBuffer process from startIndex + 1.
448 // dstPtr : copy data to here
449 // needCopySize : in and out, indicate how many bytes still need to copy.
CopyFromSuccessiveBuffer(uint64_t prevOffset,uint64_t offsetEnd,int32_t startIndex,uint8_t * dstPtr,uint32_t & needCopySize)450 int32_t DataPacker::CopyFromSuccessiveBuffer(uint64_t prevOffset, uint64_t offsetEnd, int32_t startIndex,
451                                              uint8_t *dstPtr, uint32_t &needCopySize)
452 {
453     size_t copySize;
454     int32_t usedCount = 0;
455     prevOffset = prevOffset + GetBufferSize(que_[startIndex]);
456     for (size_t i = startIndex + 1; i < que_.size(); ++i) {
457         usedCount++;
458         uint64_t curOffsetEnd = prevOffset + GetBufferSize(que_[i]);
459         if (curOffsetEnd >= offsetEnd) { // This buffer is enough
460             NZERO_LOG(memcpy_s(dstPtr, needCopySize, GetBufferReadOnlyData(que_[i]), needCopySize));
461             needCopySize = 0;
462             return usedCount; // Finished copy buffer
463         } else {
464             copySize = GetBufferSize(que_[i]);
465             NZERO_LOG(memcpy_s(dstPtr, copySize, GetBufferReadOnlyData(que_[i]), copySize));
466             dstPtr += copySize;
467             needCopySize -= copySize;
468             prevOffset += copySize;
469         }
470     }
471     MEDIA_LOG_W("Processed all cached buffers, still not meet offsetEnd, maybe EOS reached.");
472     return usedCount;
473 }
474 
ToString() const475 std::string DataPacker::ToString() const
476 {
477     return "DataPacker (offset " + std::to_string(mediaOffset_) + ", size " + std::to_string(size_) +
478            ", buffer count " + std::to_string(que_.size()) + ")";
479 }
480 } // namespace Media
481 } // namespace OHOS
482