1 /*
2 * Copyright (c) 2024-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <mutex>
17 #include <list>
18 #include <algorithm>
19 #include <cassert>
20 #include <limits>
21 #include <securec.h>
22 #include "media_cached_buffer.h"
23 #include "common/log.h"
24 #include "avcodec_log.h"
25 #include "avcodec_errors.h"
26
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_STREAM_SOURCE, "HiStreamer" };
29 }
30
31 namespace OHOS {
32 namespace Media {
33
34 constexpr size_t CACHE_FRAGMENT_MAX_NUM_DEFAULT = 300; // Maximum number of fragment nodes
35 constexpr size_t CACHE_FRAGMENT_MAX_NUM_LARGE = 10; // Maximum number of fragment nodes
36 constexpr size_t CACHE_FRAGMENT_MIN_NUM_DEFAULT = 3; // Minimum number of fragment nodes
37 constexpr double NEW_FRAGMENT_INIT_CHUNK_NUM = 128.0; // Restricting the cache size of seek operation, 128 = 2MB
38 constexpr double NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR = 0.25;
39 constexpr double CACHE_RELEASE_FACTOR_DEFAULT = 10;
40 constexpr double TO_PERCENT = 100;
41 constexpr int64_t MAX_TOTAL_READ_SIZE = 2000000;
42 constexpr int64_t UP_LIMIT_MAX_TOTAL_READ_SIZE = 3000000;
43 constexpr int64_t ACCESS_OFFSET_MAX_LENGTH = 2 * 1024;
44
BoundedIntervalComp(int64_t mid,uint64_t start,int64_t end)45 inline constexpr bool BoundedIntervalComp(int64_t mid, uint64_t start, int64_t end)
46 {
47 return (static_cast<int64_t>(start) <= mid && mid <= end);
48 }
49
LeftBoundedRightOpenComp(int64_t mid,uint64_t start,int64_t end)50 inline constexpr bool LeftBoundedRightOpenComp(int64_t mid, uint64_t start, int64_t end)
51 {
52 return (static_cast<int64_t>(start) <= mid && mid < end);
53 }
54
IncreaseStep(uint8_t * & src,uint64_t & offset,size_t & writeSize,size_t step)55 inline void IncreaseStep(uint8_t*& src, uint64_t& offset, size_t& writeSize, size_t step)
56 {
57 src += step;
58 offset += static_cast<uint64_t>(step);
59 writeSize += step;
60 }
61
InitChunkInfo(CacheChunk & chunkInfo,uint64_t offset)62 inline void InitChunkInfo(CacheChunk& chunkInfo, uint64_t offset)
63 {
64 chunkInfo.offset = offset;
65 chunkInfo.dataLength = 0;
66 }
67
CacheMediaChunkBufferImpl()68 CacheMediaChunkBufferImpl::CacheMediaChunkBufferImpl()
69 : totalBuffSize_(0), totalReadSize_(0), chunkMaxNum_(0), chunkSize_(0), bufferAddr_(nullptr),
70 fragmentMaxNum_(CACHE_FRAGMENT_MAX_NUM_DEFAULT),
71 lruCache_(CACHE_FRAGMENT_MAX_NUM_DEFAULT) {}
72
~CacheMediaChunkBufferImpl()73 CacheMediaChunkBufferImpl::~CacheMediaChunkBufferImpl()
74 {
75 std::lock_guard lock(mutex_);
76 freeChunks_.clear();
77 fragmentCacheBuffer_.clear();
78 readPos_ = fragmentCacheBuffer_.end();
79 writePos_ = fragmentCacheBuffer_.end();
80 chunkMaxNum_ = 0;
81 totalReadSize_ = 0;
82 if (bufferAddr_ != nullptr) {
83 free(bufferAddr_);
84 bufferAddr_ = nullptr;
85 }
86 }
87
Init(uint64_t totalBuffSize,uint32_t chunkSize)88 bool CacheMediaChunkBufferImpl::Init(uint64_t totalBuffSize, uint32_t chunkSize)
89 {
90 if (isLargeOffsetSpan_) {
91 lruCache_.ReCacheSize(CACHE_FRAGMENT_MAX_NUM_LARGE);
92 } else {
93 lruCache_.ReCacheSize(CACHE_FRAGMENT_MAX_NUM_DEFAULT);
94 }
95
96 if (totalBuffSize == 0 || chunkSize == 0 || totalBuffSize < chunkSize) {
97 return false;
98 }
99
100 double newFragmentInitChunkNum = NEW_FRAGMENT_INIT_CHUNK_NUM;
101 uint64_t diff = (totalBuffSize + chunkSize) > 1 ? (totalBuffSize + chunkSize) - 1 : 0;
102 int64_t chunkNum = static_cast<int64_t>(diff / chunkSize) + 1;
103 if ((chunkNum - static_cast<int64_t>(newFragmentInitChunkNum)) < 0) {
104 return false;
105 }
106 if (newFragmentInitChunkNum > static_cast<double>(chunkNum) * NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR) {
107 newFragmentInitChunkNum = std::max(1.0, static_cast<double>(chunkNum) * NEW_FRAGMENT_NIT_DEFAULT_DENOMINATOR);
108 }
109 std::lock_guard lock(mutex_);
110 if (bufferAddr_ != nullptr) {
111 return false;
112 }
113
114 readPos_ = fragmentCacheBuffer_.end();
115 writePos_ = fragmentCacheBuffer_.end();
116 size_t sizePerChunk = sizeof(CacheChunk) + chunkSize;
117 FALSE_RETURN_V_MSG_E(static_cast<int64_t>(sizePerChunk) * chunkNum > 0, false,
118 "Invalid sizePerChunk and chunkNum.");
119 bufferAddr_ = static_cast<uint8_t*>(malloc(sizePerChunk * chunkNum));
120 if (bufferAddr_ == nullptr) {
121 return false;
122 }
123
124 uint8_t* temp = bufferAddr_;
125 for (auto i = 0; i < chunkNum; ++i) {
126 auto chunkInfo = reinterpret_cast<CacheChunk*>(temp);
127 chunkInfo->offset = 0;
128 chunkInfo->dataLength = 0;
129 chunkInfo->chunkSize = static_cast<uint32_t>(chunkSize);
130 freeChunks_.push_back(chunkInfo);
131 temp += sizePerChunk;
132 }
133 chunkMaxNum_ = chunkNum >= 1 ? static_cast<uint32_t>(chunkNum) - 1 : 0; // -1
134 totalBuffSize_ = totalBuffSize;
135 chunkSize_ = chunkSize;
136 initReadSizeFactor_ = newFragmentInitChunkNum / (chunkMaxNum_ - newFragmentInitChunkNum);
137 return true;
138 }
139
140 // Upadate the chunk read from the fragment
UpdateAccessPos(FragmentIterator & fragmentPos,ChunkIterator & chunkPos,uint64_t offsetChunk)141 void CacheMediaChunkBufferImpl::UpdateAccessPos(FragmentIterator& fragmentPos, ChunkIterator& chunkPos,
142 uint64_t offsetChunk)
143 {
144 if (chunkPos == fragmentPos->chunks.end()) {
145 auto preChunkPos = std::prev(chunkPos);
146 if (((*preChunkPos)->offset + (*preChunkPos)->chunkSize) == offsetChunk) {
147 fragmentPos->accessPos = chunkPos;
148 } else {
149 fragmentPos->accessPos = preChunkPos;
150 }
151 } else if ((*chunkPos)->offset == offsetChunk) {
152 fragmentPos->accessPos = chunkPos;
153 } else {
154 fragmentPos->accessPos = std::prev(chunkPos);
155 }
156 }
157
Read(void * ptr,uint64_t offset,size_t readSize)158 size_t CacheMediaChunkBufferImpl::Read(void* ptr, uint64_t offset, size_t readSize)
159 {
160 std::lock_guard lock(mutex_);
161 size_t hasReadSize = 0;
162 uint8_t* dst = static_cast<uint8_t*>(ptr);
163 uint64_t hasReadOffset = offset;
164 size_t oneReadSize = ReadInner(dst, hasReadOffset, readSize);
165 hasReadSize = oneReadSize;
166 while (hasReadSize < readSize && oneReadSize != 0) {
167 dst += oneReadSize;
168 hasReadOffset += static_cast<uint64_t>(oneReadSize);
169 oneReadSize = ReadInner(dst, hasReadOffset, readSize - hasReadSize);
170 hasReadSize += oneReadSize;
171 }
172 return hasReadSize;
173 }
174
ReadInner(void * ptr,uint64_t offset,size_t readSize)175 size_t CacheMediaChunkBufferImpl::ReadInner(void* ptr, uint64_t offset, size_t readSize)
176 {
177 auto fragmentPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
178 if (readSize == 0 || fragmentPos == fragmentCacheBuffer_.end()) {
179 return 0;
180 }
181 auto chunkPos = fragmentPos->accessPos;
182 if (chunkPos == fragmentPos->chunks.end() ||
183 offset < (*chunkPos)->offset ||
184 offset > (*chunkPos)->offset + (*chunkPos)->dataLength) {
185 chunkPos = GetOffsetChunkCache(fragmentPos->chunks, offset, LeftBoundedRightOpenComp);
186 }
187
188 uint8_t* dst = static_cast<uint8_t*>(ptr);
189 uint64_t offsetChunk = offset;
190 if (chunkPos != fragmentPos->chunks.end()) {
191 uint64_t readOffset = offset > fragmentPos->offsetBegin ? offset - fragmentPos->offsetBegin : 0;
192 uint64_t temp = readOffset > static_cast<uint64_t>(fragmentPos->accessLength) ?
193 readOffset - static_cast<uint64_t>(fragmentPos->accessLength) : 0;
194 if (temp >= ACCESS_OFFSET_MAX_LENGTH) {
195 chunkPos = SplitFragmentCacheBuffer(fragmentPos, offset, chunkPos);
196 }
197 size_t hasReadSize = 0;
198 while (hasReadSize < readSize && chunkPos != fragmentPos->chunks.end()) {
199 auto chunkInfo = *chunkPos;
200 uint64_t diff = offsetChunk > chunkInfo->offset ? offsetChunk - chunkInfo->offset : 0;
201 if (offsetChunk < chunkInfo->offset || diff > chunkInfo->dataLength) {
202 DumpAndCheckInner();
203 return 0;
204 }
205 uint64_t readDiff = chunkInfo->dataLength > diff ? chunkInfo->dataLength - diff : 0;
206 auto readOne = std::min(static_cast<size_t>(readDiff), readSize - hasReadSize);
207 errno_t res = memcpy_s(dst + hasReadSize, readOne, (*chunkPos)->data + diff, readOne);
208 FALSE_RETURN_V_MSG_E(res == EOK, 0, "memcpy_s data err");
209 hasReadSize += readOne;
210 offsetChunk += static_cast<uint64_t>(readOne);
211 chunkPos++;
212 }
213 UpdateAccessPos(fragmentPos, chunkPos, offsetChunk);
214 uint64_t lengthDiff = offsetChunk > fragmentPos->offsetBegin ? offsetChunk - fragmentPos->offsetBegin : 0;
215 fragmentPos->accessLength = static_cast<int64_t>(lengthDiff);
216 fragmentPos->readTime = Clock::now();
217 fragmentPos->totalReadSize += hasReadSize;
218 totalReadSize_ += hasReadSize;
219 readPos_ = fragmentPos;
220 lruCache_.Refer(fragmentPos->offsetBegin, fragmentPos);
221 return hasReadSize;
222 }
223 return 0;
224 }
225
WriteInPlace(FragmentIterator & fragmentPos,uint8_t * ptr,uint64_t inOffset,size_t inWriteSize,size_t & outWriteSize)226 bool CacheMediaChunkBufferImpl::WriteInPlace(FragmentIterator& fragmentPos, uint8_t* ptr, uint64_t inOffset,
227 size_t inWriteSize, size_t& outWriteSize)
228 {
229 uint64_t offset = inOffset;
230 size_t writeSize = inWriteSize;
231 uint8_t* src = ptr;
232 auto& chunkList = fragmentPos->chunks;
233 outWriteSize = 0;
234 ChunkIterator chunkPos = std::upper_bound(chunkList.begin(), chunkList.end(), offset,
235 [](auto inputOffset, const CacheChunk* chunk) {
236 return (inputOffset <= chunk->offset + chunk->dataLength);
237 });
238 if (chunkPos == chunkList.end()) {
239 DumpInner(0);
240 return false;
241 }
242 size_t writeSizeTmp = 0;
243 auto chunkInfoTmp = *chunkPos;
244 uint64_t accessLengthTmp = inOffset > writePos_->offsetBegin ? inOffset - writePos_->offsetBegin : 0;
245 if (chunkInfoTmp->offset <= offset &&
246 offset < chunkInfoTmp->offset + static_cast<uint64_t>(chunkInfoTmp->dataLength)) {
247 size_t diff = static_cast<size_t>(offset > chunkInfoTmp->offset ? offset - chunkInfoTmp->offset : 0);
248 size_t copyLen = static_cast<size_t>(chunkInfoTmp->dataLength - diff);
249 copyLen = std::min(copyLen, writeSize);
250 errno_t res = memcpy_s(chunkInfoTmp->data + diff, copyLen, src, copyLen);
251 FALSE_RETURN_V_MSG_E(res == EOK, false, "memcpy_s data err");
252 IncreaseStep(src, offset, writeSizeTmp, copyLen);
253 if (writePos_->accessLength > static_cast<int64_t>(accessLengthTmp)) {
254 writePos_->accessPos = chunkPos;
255 writePos_->accessLength = static_cast<int64_t>(accessLengthTmp);
256 }
257 } else if (writePos_->accessLength > static_cast<int64_t>(accessLengthTmp)) {
258 writePos_->accessPos = std::next(chunkPos);
259 writePos_->accessLength = static_cast<int64_t>(accessLengthTmp);
260 }
261 ++chunkPos;
262 while (writeSizeTmp < writeSize && chunkPos != chunkList.end()) {
263 chunkInfoTmp = *chunkPos;
264 auto copyLen = std::min(chunkInfoTmp->dataLength, (uint32_t)(writeSize - writeSizeTmp));
265 errno_t res = memcpy_s(chunkInfoTmp->data, copyLen, src, copyLen);
266 FALSE_RETURN_V_MSG_E(res == EOK, false, "memcpy_s data err");
267 IncreaseStep(src, offset, writeSizeTmp, copyLen);
268 ++chunkPos;
269 }
270 outWriteSize = writeSizeTmp;
271 return true;
272 }
273
WriteMergerPre(uint64_t offset,size_t writeSize,FragmentIterator & nextFragmentPos)274 bool CacheMediaChunkBufferImpl::WriteMergerPre(uint64_t offset, size_t writeSize, FragmentIterator& nextFragmentPos)
275 {
276 nextFragmentPos = std::next(writePos_);
277 bool isLoop = true;
278 while (isLoop) {
279 if (nextFragmentPos == fragmentCacheBuffer_.end() ||
280 offset + static_cast<uint64_t>(writeSize) < nextFragmentPos->offsetBegin) {
281 nextFragmentPos = fragmentCacheBuffer_.end();
282 isLoop = false;
283 break;
284 }
285 if (offset + static_cast<uint64_t>(writeSize) <
286 nextFragmentPos->offsetBegin + static_cast<uint64_t>(nextFragmentPos->dataLength)) {
287 auto endPos = GetOffsetChunkCache(nextFragmentPos->chunks,
288 offset + static_cast<uint64_t>(writeSize), LeftBoundedRightOpenComp);
289 freeChunks_.splice(freeChunks_.end(), nextFragmentPos->chunks, nextFragmentPos->chunks.begin(), endPos);
290 if (endPos == nextFragmentPos->chunks.end()) {
291 nextFragmentPos = EraseFragmentCache(nextFragmentPos);
292 DumpInner(0);
293 return false;
294 }
295 auto &chunkInfo = *endPos;
296 uint64_t newOffset = offset + static_cast<uint64_t>(writeSize);
297 uint64_t dataLength = static_cast<uint64_t>(chunkInfo->dataLength);
298 uint64_t moveLen = (chunkInfo->offset + dataLength) > newOffset ?
299 (chunkInfo->offset + dataLength) - newOffset : 0;
300 auto mergeDataLen = chunkInfo->dataLength > moveLen ? chunkInfo->dataLength - moveLen : 0;
301 errno_t res = memmove_s(chunkInfo->data, moveLen, chunkInfo->data + mergeDataLen, moveLen);
302 FALSE_RETURN_V_MSG_E(res == EOK, false, "memmove_s data err");
303 chunkInfo->offset = newOffset;
304 chunkInfo->dataLength = static_cast<uint32_t>(moveLen);
305 uint64_t lostLength = newOffset > nextFragmentPos->offsetBegin ?
306 newOffset - nextFragmentPos->offsetBegin : 0;
307 nextFragmentPos->dataLength -= static_cast<int64_t>(lostLength);
308 lruCache_.Update(nextFragmentPos->offsetBegin, newOffset, nextFragmentPos);
309 nextFragmentPos->offsetBegin = newOffset;
310 nextFragmentPos->accessLength = 0;
311 nextFragmentPos->accessPos = nextFragmentPos->chunks.end();
312 isLoop = false;
313 break;
314 } else {
315 freeChunks_.splice(freeChunks_.end(), nextFragmentPos->chunks);
316 writePos_->totalReadSize += nextFragmentPos->totalReadSize;
317 nextFragmentPos->totalReadSize = 0; // avoid total size sub, chunk num reduce.
318 nextFragmentPos = EraseFragmentCache(nextFragmentPos);
319 }
320 }
321 return true;
322 }
323
WriteMergerPost(FragmentIterator & nextFragmentPos)324 void CacheMediaChunkBufferImpl::WriteMergerPost(FragmentIterator& nextFragmentPos)
325 {
326 if (nextFragmentPos == fragmentCacheBuffer_.end() || writePos_->chunks.empty() ||
327 nextFragmentPos->chunks.empty()) {
328 return;
329 }
330 auto preChunkInfo = writePos_->chunks.back();
331 auto nextChunkInfo = nextFragmentPos->chunks.front();
332 if (preChunkInfo->offset + preChunkInfo->dataLength != nextChunkInfo->offset) {
333 DumpAndCheckInner();
334 return;
335 }
336 writePos_->dataLength += nextFragmentPos->dataLength;
337 writePos_->totalReadSize += nextFragmentPos->totalReadSize;
338 nextFragmentPos->totalReadSize = 0; // avoid total size sub, chunk num reduce
339 writePos_->chunks.splice(writePos_->chunks.end(), nextFragmentPos->chunks);
340 EraseFragmentCache(nextFragmentPos);
341 }
342
Write(void * ptr,uint64_t inOffset,size_t inWriteSize)343 size_t CacheMediaChunkBufferImpl::Write(void* ptr, uint64_t inOffset, size_t inWriteSize)
344 {
345 std::lock_guard lock(mutex_);
346 uint64_t offset = inOffset;
347 size_t writeSize = inWriteSize;
348 uint8_t* src = static_cast<uint8_t*>(ptr);
349 size_t dupWriteSize = 0;
350
351 auto fragmentPos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
352 ChunkIterator chunkPos;
353 if (fragmentPos != fragmentCacheBuffer_.end()) {
354 auto& chunkList = fragmentPos->chunks;
355 writePos_ = fragmentPos;
356 if ((fragmentPos->offsetBegin + static_cast<uint64_t>(fragmentPos->dataLength)) != offset) {
357 auto ret = WriteInPlace(fragmentPos, src, offset, writeSize, dupWriteSize);
358 if (!ret || dupWriteSize >= writeSize) {
359 return writeSize;
360 }
361 src += dupWriteSize;
362 offset += dupWriteSize;
363 writeSize -= dupWriteSize;
364 }
365 chunkPos = std::prev(chunkList.end());
366 } else {
367 if (freeChunks_.empty()) {
368 MEDIA_LOG_D("no free chunk.");
369 }
370 MEDIA_LOG_D("not find fragment.");
371 chunkPos = AddFragmentCacheBuffer(offset);
372 }
373 FragmentIterator nextFragmentPos = fragmentCacheBuffer_.end();
374 auto success = WriteMergerPre(offset, writeSize, nextFragmentPos);
375 if (!success) {
376 return dupWriteSize;
377 }
378 auto writeSizeTmp = WriteChunk(*writePos_, chunkPos, src, offset, writeSize);
379 if (writeSize != writeSizeTmp) {
380 nextFragmentPos = fragmentCacheBuffer_.end();
381 }
382 WriteMergerPost(nextFragmentPos);
383 return writeSizeTmp + dupWriteSize;
384 }
385
Seek(uint64_t offset)386 bool CacheMediaChunkBufferImpl::Seek(uint64_t offset)
387 {
388 std::lock_guard lock(mutex_);
389 auto readPos = GetOffsetFragmentCache(readPos_, offset, BoundedIntervalComp);
390 if (readPos != fragmentCacheBuffer_.end()) {
391 readPos_ = readPos;
392 bool isSeekHit = false;
393 auto chunkPos = GetOffsetChunkCache(readPos->chunks, offset, LeftBoundedRightOpenComp);
394 if (chunkPos != readPos->chunks.end()) {
395 auto readOffset = offset > readPos->offsetBegin ? offset - readPos->offsetBegin : 0;
396 uint64_t diff = readOffset > static_cast<uint64_t>(readPos->accessLength) ?
397 readOffset - static_cast<uint64_t>(readPos->accessLength) : 0;
398 if (diff >= ACCESS_OFFSET_MAX_LENGTH) {
399 chunkPos = SplitFragmentCacheBuffer(readPos, offset, chunkPos);
400 }
401
402 if (chunkPos == readPos->chunks.end()) {
403 return false;
404 }
405 lruCache_.Refer(readPos->offsetBegin, readPos);
406 (*readPos).accessPos = chunkPos;
407 auto tmpLength = offset > (*readPos).offsetBegin ? offset - (*readPos).offsetBegin : 0;
408 (*readPos).accessLength = static_cast<int64_t>(tmpLength);
409 readPos->readTime = Clock::now();
410 isSeekHit = true;
411 }
412 ResetReadSizeAlloc();
413 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
414 readPos->totalReadSize += newReadSizeInit;
415 totalReadSize_ += newReadSizeInit;
416 return isSeekHit;
417 }
418 return false;
419 }
420
GetBufferSize(uint64_t offset)421 size_t CacheMediaChunkBufferImpl::GetBufferSize(uint64_t offset)
422 {
423 std::lock_guard lock(mutex_);
424 auto readPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
425 size_t bufferSize = 0;
426 while (readPos != fragmentCacheBuffer_.end()) {
427 uint64_t nextOffsetBegin = readPos->offsetBegin + static_cast<uint64_t>(readPos->dataLength);
428 bufferSize = static_cast<size_t>(nextOffsetBegin > offset ? nextOffsetBegin - offset : 0);
429 readPos++;
430 if (readPos == fragmentCacheBuffer_.end() || nextOffsetBegin != readPos->offsetBegin) {
431 break;
432 }
433 }
434 return bufferSize;
435 }
436
HandleFragmentPos(FragmentIterator & fragmentIter)437 void CacheMediaChunkBufferImpl::HandleFragmentPos(FragmentIterator& fragmentIter)
438 {
439 uint64_t nextOffsetBegin = fragmentIter->offsetBegin + static_cast<uint64_t>(fragmentIter->dataLength);
440 ++fragmentIter;
441 while (fragmentIter != fragmentCacheBuffer_.end()) {
442 if (nextOffsetBegin != fragmentIter->offsetBegin) {
443 break;
444 }
445 nextOffsetBegin = fragmentIter->offsetBegin + static_cast<uint64_t>(fragmentIter->dataLength);
446 ++fragmentIter;
447 }
448 }
449
GetNextBufferOffset(uint64_t offset)450 uint64_t CacheMediaChunkBufferImpl::GetNextBufferOffset(uint64_t offset)
451 {
452 std::lock_guard lock(mutex_);
453 auto fragmentIter = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
454 [](auto inputOffset, const FragmentCacheBuffer& fragment) {
455 return (inputOffset < fragment.offsetBegin + fragment.dataLength);
456 });
457 if (fragmentIter != fragmentCacheBuffer_.end()) {
458 if (LeftBoundedRightOpenComp(offset, fragmentIter->offsetBegin,
459 fragmentIter->offsetBegin + fragmentIter->dataLength)) {
460 HandleFragmentPos(fragmentIter);
461 }
462 }
463 if (fragmentIter != fragmentCacheBuffer_.end()) {
464 return fragmentIter->offsetBegin;
465 }
466 return 0;
467 }
468
EraseFragmentCache(const FragmentIterator & iter)469 FragmentIterator CacheMediaChunkBufferImpl::EraseFragmentCache(const FragmentIterator& iter)
470 {
471 if (iter == readPos_) {
472 readPos_ = fragmentCacheBuffer_.end();
473 }
474 if (iter == writePos_) {
475 writePos_ = fragmentCacheBuffer_.end();
476 }
477 totalReadSize_ -= iter->totalReadSize;
478 lruCache_.Delete(iter->offsetBegin);
479 return fragmentCacheBuffer_.erase(iter);
480 }
481
WriteOneChunkData(CacheChunk & chunkInfo,uint8_t * src,uint64_t offset,size_t writeSize)482 inline size_t WriteOneChunkData(CacheChunk& chunkInfo, uint8_t* src, uint64_t offset, size_t writeSize)
483 {
484 uint64_t copyBegin = offset > chunkInfo.offset ? offset - chunkInfo.offset : 0;
485 if (copyBegin < 0 || copyBegin > chunkInfo.chunkSize) {
486 return 0;
487 }
488 size_t writePerOne = static_cast<size_t>(chunkInfo.chunkSize - static_cast<size_t>(copyBegin));
489 writePerOne = std::min(writePerOne, writeSize);
490 errno_t res = memcpy_s(chunkInfo.data + copyBegin, writePerOne, src, writePerOne);
491 FALSE_RETURN_V_MSG_E(res == EOK, 0, "memcpy_s data err");
492 chunkInfo.dataLength = static_cast<uint32_t>(static_cast<size_t>(copyBegin) + writePerOne);
493 return writePerOne;
494 }
495
PopFreeCacheChunk(CacheChunkList & freeChunks,uint64_t offset)496 inline CacheChunk* PopFreeCacheChunk(CacheChunkList& freeChunks, uint64_t offset)
497 {
498 if (freeChunks.empty()) {
499 return nullptr;
500 }
501 auto tmp = freeChunks.front();
502 freeChunks.pop_front();
503 InitChunkInfo(*tmp, offset);
504 return tmp;
505 }
506
WriteChunk(FragmentCacheBuffer & fragmentCacheBuffer,ChunkIterator & chunkPos,void * ptr,uint64_t offset,size_t writeSize)507 size_t CacheMediaChunkBufferImpl::WriteChunk(FragmentCacheBuffer& fragmentCacheBuffer, ChunkIterator& chunkPos,
508 void* ptr, uint64_t offset, size_t writeSize)
509 {
510 if (chunkPos == fragmentCacheBuffer.chunks.end()) {
511 MEDIA_LOG_D("input valid.");
512 return 0;
513 }
514 size_t writedTmp = 0;
515 auto chunkInfo = *chunkPos;
516 uint8_t* src = static_cast<uint8_t*>(ptr);
517 if (chunkInfo->chunkSize > chunkInfo->dataLength) {
518 writedTmp += WriteOneChunkData(*chunkInfo, src, offset, writeSize);
519 fragmentCacheBuffer.dataLength += static_cast<int64_t>(writedTmp);
520 }
521 while (writedTmp < writeSize) {
522 auto chunkOffset = offset + static_cast<uint64_t>(writedTmp);
523 auto freeChunk = GetFreeCacheChunk(chunkOffset);
524 if (freeChunk == nullptr) {
525 return writedTmp;
526 }
527 auto writePerOne = WriteOneChunkData(*freeChunk, src + writedTmp, chunkOffset, writeSize - writedTmp);
528 fragmentCacheBuffer.chunks.push_back(freeChunk);
529 writedTmp += writePerOne;
530 fragmentCacheBuffer.dataLength += static_cast<int64_t>(writePerOne);
531
532 if (fragmentCacheBuffer.accessPos == fragmentCacheBuffer.chunks.end()) {
533 fragmentCacheBuffer.accessPos = std::prev(fragmentCacheBuffer.chunks.end());
534 }
535 }
536 return writedTmp;
537 }
538
UpdateFragmentCacheForDelHead(FragmentIterator & fragmentIter)539 CacheChunk* CacheMediaChunkBufferImpl::UpdateFragmentCacheForDelHead(FragmentIterator& fragmentIter)
540 {
541 FragmentCacheBuffer& fragment = *fragmentIter;
542 if (fragment.chunks.empty()) {
543 return nullptr;
544 }
545 auto cacheChunk = fragment.chunks.front();
546 fragment.chunks.pop_front();
547
548 auto oldOffsetBegin = fragment.offsetBegin;
549 int64_t dataLength = static_cast<int64_t>(cacheChunk->dataLength);
550 fragment.offsetBegin += static_cast<uint64_t>(dataLength);
551 fragment.dataLength -= dataLength;
552 if (fragment.accessLength > dataLength) {
553 fragment.accessLength -= dataLength;
554 } else {
555 fragment.accessLength = 0;
556 }
557 lruCache_.Update(oldOffsetBegin, fragmentIter->offsetBegin, fragmentIter);
558 return cacheChunk;
559 }
560
UpdateFragmentCacheForDelTail(FragmentCacheBuffer & fragment)561 CacheChunk* UpdateFragmentCacheForDelTail(FragmentCacheBuffer& fragment)
562 {
563 if (fragment.chunks.empty()) {
564 return nullptr;
565 }
566 if (fragment.accessPos == std::prev(fragment.chunks.end())) {
567 fragment.accessPos = fragment.chunks.end();
568 }
569
570 auto cacheChunk = fragment.chunks.back();
571 fragment.chunks.pop_back();
572
573 auto dataLength = cacheChunk->dataLength;
574 if (fragment.accessLength > fragment.dataLength - static_cast<int64_t>(dataLength)) {
575 fragment.accessLength = fragment.dataLength - static_cast<int64_t>(dataLength);
576 }
577 fragment.dataLength -= static_cast<int64_t>(dataLength);
578 return cacheChunk;
579 }
580
CheckThresholdFragmentCacheBuffer(FragmentIterator & currWritePos)581 bool CacheMediaChunkBufferImpl::CheckThresholdFragmentCacheBuffer(FragmentIterator& currWritePos)
582 {
583 int64_t offset = -1;
584 FragmentIterator fragmentIterator = fragmentCacheBuffer_.end();
585 auto ret = lruCache_.GetLruNode(offset, fragmentIterator);
586 if (!ret) {
587 return false;
588 }
589 if (fragmentIterator == fragmentCacheBuffer_.end()) {
590 return false;
591 }
592 if (currWritePos == fragmentIterator) {
593 lruCache_.Refer(offset, currWritePos);
594 ret = lruCache_.GetLruNode(offset, fragmentIterator);
595 if (!ret) {
596 return false;
597 }
598 if (fragmentIterator == fragmentCacheBuffer_.end()) {
599 return false;
600 }
601 }
602 freeChunks_.splice(freeChunks_.end(), fragmentIterator->chunks);
603 EraseFragmentCache(fragmentIterator);
604 return true;
605 }
606
607 /***
608 * 总体策略:
609 * 计算最大允许Fragment数,大于 FRAGMENT_MAX_NUM(4)则剔除最近为未读取的Fragment(不包含当前写的节点)
610 * 新分配的节点固定分配 个chunk大小,通过公式计算,保证其能够下载;
611 * 每个Fragment最大允许的Chunk数:(本Fragment读取字节(fragmentReadSize)/ 总读取字节(totalReadSize))* 总Chunk个数
612 * 计算改Fragment最大允许的chunk个数
613 * 如果超过,则删除对应已读chunk,如果没有已读chunk,还超则返回不允许继续写,返回失败;(说明该Fragment不能再写更多的内容)
614 * 如果没有超过则从空闲队列中获取chunk,没有则
615 * for循环其他Fragment,计算每个Fragment的最大允许chunk个数:
616 * 如果超过,则删除对应已读chunk
617 * 如果还不够,则
618 * for循环其他Fragment,计算每个Fragment的最大允许chunk个数:
619 * 如果超过,则删除对应末尾未读chunk
620 *
621 * 如果还没有则返回失败
622 *
623 * 备注:是否一开始:优先从空闲队列中获取,没有则继续。
624 */
DeleteHasReadFragmentCacheBuffer(FragmentIterator & fragmentIter,size_t allowChunkNum)625 void CacheMediaChunkBufferImpl::DeleteHasReadFragmentCacheBuffer(FragmentIterator& fragmentIter, size_t allowChunkNum)
626 {
627 auto& fragmentCacheChunks = *fragmentIter;
628 while (fragmentCacheChunks.chunks.size() >= allowChunkNum &&
629 fragmentCacheChunks.accessLength > static_cast<int64_t>(static_cast<double>(fragmentCacheChunks.dataLength) *
630 CACHE_RELEASE_FACTOR_DEFAULT / TO_PERCENT)) {
631 if (fragmentCacheChunks.accessPos != fragmentCacheChunks.chunks.begin()) {
632 auto tmp = UpdateFragmentCacheForDelHead(fragmentIter);
633 if (tmp != nullptr) {
634 freeChunks_.push_back(tmp);
635 }
636 } else {
637 MEDIA_LOG_D("judge has read finish.");
638 break;
639 }
640 }
641 }
642
DeleteUnreadFragmentCacheBuffer(FragmentIterator & fragmentIter,size_t allowChunkNum)643 void CacheMediaChunkBufferImpl::DeleteUnreadFragmentCacheBuffer(FragmentIterator& fragmentIter, size_t allowChunkNum)
644 {
645 auto& fragmentCacheChunks = *fragmentIter;
646 while (fragmentCacheChunks.chunks.size() > allowChunkNum) {
647 if (!fragmentCacheChunks.chunks.empty()) {
648 auto tmp = UpdateFragmentCacheForDelTail(fragmentCacheChunks);
649 if (tmp != nullptr) {
650 freeChunks_.push_back(tmp);
651 }
652 } else {
653 break;
654 }
655 }
656 }
657
GetFreeCacheChunk(uint64_t offset,bool checkAllowFailContinue)658 CacheChunk* CacheMediaChunkBufferImpl::GetFreeCacheChunk(uint64_t offset, bool checkAllowFailContinue)
659 {
660 if (writePos_ == fragmentCacheBuffer_.end()) {
661 return nullptr;
662 }
663 if (!freeChunks_.empty()) {
664 return PopFreeCacheChunk(freeChunks_, offset);
665 }
666 auto currWritePos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
667 size_t allowChunkNum = 0;
668 if (currWritePos != fragmentCacheBuffer_.end()) {
669 allowChunkNum = CalcAllowMaxChunkNum(currWritePos->totalReadSize, currWritePos->offsetBegin);
670 DeleteHasReadFragmentCacheBuffer(currWritePos, allowChunkNum);
671 if (currWritePos->chunks.size() >= allowChunkNum && !checkAllowFailContinue) {
672 return nullptr;
673 }
674 }
675 if (!freeChunks_.empty()) {
676 return PopFreeCacheChunk(freeChunks_, offset);
677 }
678 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
679 if (iter != currWritePos) {
680 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
681 DeleteHasReadFragmentCacheBuffer(iter, allowChunkNum);
682 }
683 }
684 if (!freeChunks_.empty()) {
685 return PopFreeCacheChunk(freeChunks_, offset);
686 }
687 while (fragmentCacheBuffer_.size() > CACHE_FRAGMENT_MIN_NUM_DEFAULT) {
688 auto result = CheckThresholdFragmentCacheBuffer(currWritePos);
689 if (!freeChunks_.empty()) {
690 return PopFreeCacheChunk(freeChunks_, offset);
691 }
692 if (!result) {
693 break;
694 }
695 }
696 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
697 if (iter != currWritePos) {
698 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
699 DeleteUnreadFragmentCacheBuffer(iter, allowChunkNum);
700 }
701 }
702 if (!freeChunks_.empty()) {
703 return PopFreeCacheChunk(freeChunks_, offset);
704 }
705 return nullptr;
706 }
707
GetFreeCacheChunk(uint64_t offset,bool checkAllowFailContinue)708 CacheChunk* CacheMediaChunkBufferHlsImpl::GetFreeCacheChunk(uint64_t offset, bool checkAllowFailContinue)
709 {
710 if (writePos_ == fragmentCacheBuffer_.end()) {
711 return nullptr;
712 }
713 if (!freeChunks_.empty()) {
714 return PopFreeCacheChunk(freeChunks_, offset);
715 }
716 auto currWritePos = GetOffsetFragmentCache(writePos_, offset, BoundedIntervalComp);
717 size_t allowChunkNum = 0;
718 if (currWritePos != fragmentCacheBuffer_.end()) {
719 allowChunkNum = CalcAllowMaxChunkNum(currWritePos->totalReadSize, currWritePos->offsetBegin);
720 DeleteHasReadFragmentCacheBuffer(currWritePos, allowChunkNum);
721 if (currWritePos->chunks.size() >= allowChunkNum && !checkAllowFailContinue) {
722 MEDIA_LOG_D("allowChunkNum limit.");
723 return nullptr;
724 }
725 } else {
726 MEDIA_LOG_D("curr write is new fragment.");
727 }
728 MEDIA_LOG_D("clear other fragment has read chunk.");
729 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
730 if (iter != currWritePos) {
731 allowChunkNum = CalcAllowMaxChunkNum(iter->totalReadSize, iter->offsetBegin);
732 DeleteHasReadFragmentCacheBuffer(iter, allowChunkNum);
733 }
734 }
735 if (!freeChunks_.empty()) {
736 return PopFreeCacheChunk(freeChunks_, offset);
737 }
738 return nullptr;
739 }
740
GetFragmentIterator(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos,CacheChunk * splitHead,CacheChunk * & chunkInfo)741 FragmentIterator CacheMediaChunkBufferImpl::GetFragmentIterator(FragmentIterator& currFragmentIter,
742 uint64_t offset, ChunkIterator chunkPos, CacheChunk* splitHead, CacheChunk*& chunkInfo)
743 {
744 auto newFragmentPos = fragmentCacheBuffer_.emplace(std::next(currFragmentIter), offset);
745 if (splitHead == nullptr) {
746 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, chunkPos,
747 currFragmentIter->chunks.end());
748 } else {
749 splitHead->dataLength = 0;
750 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, std::next(chunkPos),
751 currFragmentIter->chunks.end());
752 newFragmentPos->chunks.push_front(splitHead);
753 splitHead->offset = offset;
754 uint64_t diff = offset > chunkInfo->offset ? offset - chunkInfo->offset : 0;
755 if (chunkInfo->dataLength >= diff) {
756 splitHead->dataLength = chunkInfo->dataLength - static_cast<uint32_t>(diff);
757 chunkInfo->dataLength = static_cast<uint32_t>(diff);
758 memcpy_s(splitHead->data, splitHead->dataLength, chunkInfo->data + diff, splitHead->dataLength);
759 }
760 }
761 newFragmentPos->offsetBegin = offset;
762 uint64_t diff = offset > currFragmentIter->offsetBegin ? offset - currFragmentIter->offsetBegin : 0;
763 newFragmentPos->dataLength = currFragmentIter->dataLength > static_cast<int64_t>(diff) ?
764 currFragmentIter->dataLength - static_cast<int64_t>(diff) : 0;
765 newFragmentPos->accessLength = 0;
766 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
767 newReadSizeInit = std::max(newReadSizeInit, currFragmentIter->totalReadSize);
768
769 newFragmentPos->totalReadSize = newReadSizeInit;
770 totalReadSize_ += newReadSizeInit;
771 newFragmentPos->readTime = Clock::now();
772 newFragmentPos->accessPos = newFragmentPos->chunks.begin();
773 newFragmentPos->isSplit = currFragmentIter->isSplit;
774 currFragmentIter->isSplit = true;
775 currFragmentIter->dataLength = static_cast<int64_t>(offset > currFragmentIter->offsetBegin ?
776 offset - currFragmentIter->offsetBegin : 0);
777 return newFragmentPos;
778 }
779
SplitFragmentCacheBuffer(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos)780 ChunkIterator CacheMediaChunkBufferImpl::SplitFragmentCacheBuffer(FragmentIterator& currFragmentIter,
781 uint64_t offset, ChunkIterator chunkPos)
782 {
783 ResetReadSizeAlloc();
784 auto& chunkInfo = *chunkPos;
785 CacheChunk* splitHead = nullptr;
786 if (offset != chunkInfo->offset) {
787 splitHead = freeChunks_.empty() ? GetFreeCacheChunk(offset, true) : PopFreeCacheChunk(freeChunks_, offset);
788 if (splitHead == nullptr) {
789 return chunkPos;
790 }
791 }
792 auto newFragmentPos = GetFragmentIterator(currFragmentIter, offset, chunkPos, splitHead, chunkInfo);
793 currFragmentIter = newFragmentPos;
794 if (fragmentCacheBuffer_.size() > CACHE_FRAGMENT_MAX_NUM_DEFAULT) {
795 CheckThresholdFragmentCacheBuffer(currFragmentIter);
796 }
797 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
798 return newFragmentPos->accessPos;
799 }
800
SplitFragmentCacheBuffer(FragmentIterator & currFragmentIter,uint64_t offset,ChunkIterator chunkPos)801 ChunkIterator CacheMediaChunkBufferHlsImpl::SplitFragmentCacheBuffer(FragmentIterator& currFragmentIter,
802 uint64_t offset, ChunkIterator chunkPos)
803 {
804 ResetReadSizeAlloc();
805 auto& chunkInfo = *chunkPos;
806 CacheChunk* splitHead = nullptr;
807 if (offset != chunkInfo->offset) {
808 splitHead = freeChunks_.empty() ? GetFreeCacheChunk(offset, true) : PopFreeCacheChunk(freeChunks_, offset);
809 if (splitHead == nullptr) {
810 return chunkPos;
811 }
812 }
813 auto newFragmentPos = fragmentCacheBuffer_.emplace(std::next(currFragmentIter), offset);
814 if (splitHead == nullptr) {
815 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, chunkPos,
816 currFragmentIter->chunks.end());
817 } else {
818 newFragmentPos->chunks.splice(newFragmentPos->chunks.end(), currFragmentIter->chunks, std::next(chunkPos),
819 currFragmentIter->chunks.end());
820 newFragmentPos->chunks.push_front(splitHead);
821 splitHead->offset = offset;
822 uint64_t diff = offset > chunkInfo->offset ? offset - chunkInfo->offset : 0;
823 if (chunkInfo->dataLength >= diff) {
824 splitHead->dataLength = chunkInfo->dataLength > static_cast<uint32_t>(diff) ?
825 chunkInfo->dataLength - static_cast<uint32_t>(diff) : 0;
826 chunkInfo->dataLength = static_cast<uint32_t>(diff);
827 memcpy_s(splitHead->data, splitHead->dataLength, chunkInfo->data + diff, splitHead->dataLength);
828 } else {
829 splitHead->dataLength = 0; // It can't happen. us_asan can check.
830 }
831 }
832 newFragmentPos->offsetBegin = offset;
833 uint64_t diff = offset > currFragmentIter->offsetBegin ? offset - currFragmentIter->offsetBegin : 0;
834 newFragmentPos->dataLength = currFragmentIter->dataLength > static_cast<int64_t>(diff) ?
835 currFragmentIter->dataLength - static_cast<int64_t>(diff) : 0;
836 newFragmentPos->accessLength = 0;
837 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
838 if (currFragmentIter->totalReadSize > newReadSizeInit) {
839 newReadSizeInit = currFragmentIter->totalReadSize;
840 }
841 newFragmentPos->totalReadSize = newReadSizeInit;
842 totalReadSize_ += newReadSizeInit;
843 newFragmentPos->readTime = Clock::now();
844 newFragmentPos->accessPos = newFragmentPos->chunks.begin();
845 currFragmentIter->dataLength = static_cast<int64_t>(offset > diff ? offset - diff : 0);
846 currFragmentIter = newFragmentPos;
847
848 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
849 return newFragmentPos->accessPos;
850 }
851
AddFragmentCacheBuffer(uint64_t offset)852 ChunkIterator CacheMediaChunkBufferImpl::AddFragmentCacheBuffer(uint64_t offset)
853 {
854 size_t fragmentThreshold = CACHE_FRAGMENT_MAX_NUM_DEFAULT;
855 if (isLargeOffsetSpan_) {
856 fragmentThreshold = CACHE_FRAGMENT_MAX_NUM_LARGE;
857 }
858 if (fragmentCacheBuffer_.size() >= fragmentThreshold) {
859 auto fragmentIterTmp = fragmentCacheBuffer_.end();
860 CheckThresholdFragmentCacheBuffer(fragmentIterTmp);
861 }
862 ResetReadSizeAlloc();
863 auto fragmentInsertPos = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
864 [](auto mediaOffset, const FragmentCacheBuffer& fragment) {
865 if (mediaOffset <= fragment.offsetBegin + fragment.dataLength) {
866 return true;
867 }
868 return false;
869 });
870 auto newFragmentPos = fragmentCacheBuffer_.emplace(fragmentInsertPos, offset);
871 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
872 totalReadSize_ += newReadSizeInit;
873 newFragmentPos->totalReadSize = newReadSizeInit;
874 writePos_ = newFragmentPos;
875 writePos_->accessPos = writePos_->chunks.end();
876 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
877 auto freeChunk = GetFreeCacheChunk(offset);
878 if (freeChunk == nullptr) {
879 MEDIA_LOG_D("get free cache chunk fail.");
880 return writePos_->chunks.end();
881 }
882 writePos_->accessPos = newFragmentPos->chunks.emplace(newFragmentPos->chunks.end(), freeChunk);
883 return writePos_->accessPos;
884 }
885
AddFragmentCacheBuffer(uint64_t offset)886 ChunkIterator CacheMediaChunkBufferHlsImpl::AddFragmentCacheBuffer(uint64_t offset)
887 {
888 ResetReadSizeAlloc();
889 auto fragmentInsertPos = std::upper_bound(fragmentCacheBuffer_.begin(), fragmentCacheBuffer_.end(), offset,
890 [](auto mediaOffset, const FragmentCacheBuffer& fragment) {
891 if (mediaOffset <= fragment.offsetBegin + fragment.dataLength) {
892 return true;
893 }
894 return false;
895 });
896 auto newFragmentPos = fragmentCacheBuffer_.emplace(fragmentInsertPos, offset);
897 uint64_t newReadSizeInit = static_cast<uint64_t>(1 + initReadSizeFactor_ * static_cast<double>(totalReadSize_));
898 totalReadSize_ += newReadSizeInit;
899 newFragmentPos->totalReadSize = newReadSizeInit;
900 writePos_ = newFragmentPos;
901 writePos_->accessPos = writePos_->chunks.end();
902 lruCache_.Refer(newFragmentPos->offsetBegin, newFragmentPos);
903 auto freeChunk = GetFreeCacheChunk(offset);
904 if (freeChunk == nullptr) {
905 MEDIA_LOG_D("get free cache chunk fail.");
906 return writePos_->chunks.end();
907 }
908 writePos_->accessPos = newFragmentPos->chunks.emplace(newFragmentPos->chunks.end(), freeChunk);
909 return writePos_->accessPos;
910 }
911
ResetReadSizeAlloc()912 void CacheMediaChunkBufferImpl::ResetReadSizeAlloc()
913 {
914 size_t chunkNum = chunkMaxNum_ + 1 >= freeChunks_.size() ?
915 chunkMaxNum_ + 1 - freeChunks_.size() : 0;
916 if (totalReadSize_ > static_cast<size_t>(UP_LIMIT_MAX_TOTAL_READ_SIZE) && chunkNum > 0) {
917 size_t preChunkSize = static_cast<size_t>(MAX_TOTAL_READ_SIZE - 1) / chunkNum;
918 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); ++iter) {
919 iter->totalReadSize = preChunkSize * iter->chunks.size();
920 }
921 totalReadSize_ = preChunkSize * chunkNum;
922 }
923 }
924
Dump(uint64_t param)925 void CacheMediaChunkBufferImpl::Dump(uint64_t param)
926 {
927 std::lock_guard lock(mutex_);
928 DumpInner(param);
929 }
930
DumpInner(uint64_t param)931 void CacheMediaChunkBufferImpl::DumpInner(uint64_t param)
932 {
933 (void)param;
934 MEDIA_LOG_D("cacheBuff total buffer size : " PUBLIC_LOG_U64, totalBuffSize_);
935 MEDIA_LOG_D("cacheBuff total chunk size : " PUBLIC_LOG_U32, chunkSize_);
936 MEDIA_LOG_D("cacheBuff total chunk num : " PUBLIC_LOG_U32, chunkMaxNum_);
937 MEDIA_LOG_D("cacheBuff total read size : " PUBLIC_LOG_U64, totalReadSize_);
938 MEDIA_LOG_D("cacheBuff read size factor : " PUBLIC_LOG_F, initReadSizeFactor_);
939 MEDIA_LOG_D("cacheBuff free chunk num: : " PUBLIC_LOG_ZU, freeChunks_.size());
940 MEDIA_LOG_D("cacheBuff fragment num: : " PUBLIC_LOG_ZU, fragmentCacheBuffer_.size());
941 for (auto const & fragment : fragmentCacheBuffer_) {
942 MEDIA_LOG_D("cacheBuff - fragment offset : " PUBLIC_LOG_U64, fragment.offsetBegin);
943 MEDIA_LOG_D("cacheBuff fragment length : " PUBLIC_LOG_D64, fragment.dataLength);
944 MEDIA_LOG_D("cacheBuff chunk num : " PUBLIC_LOG_ZU, fragment.chunks.size());
945 MEDIA_LOG_D("cacheBuff access length : " PUBLIC_LOG_U64, fragment.accessLength);
946 MEDIA_LOG_D("cacheBuff read size : " PUBLIC_LOG_U64, fragment.totalReadSize);
947 if (fragment.accessPos != fragment.chunks.end()) {
948 auto &chunkInfo = *fragment.accessPos;
949 MEDIA_LOG_D("cacheBuff access offset: " PUBLIC_LOG_D64 ", len: " PUBLIC_LOG_U32,
950 chunkInfo->offset, chunkInfo->dataLength);
951 } else {
952 MEDIA_LOG_D("cacheBuff access ended");
953 }
954 if (!fragment.chunks.empty()) {
955 auto &chunkInfo = fragment.chunks.back();
956 MEDIA_LOG_D("cacheBuff last chunk offset: " PUBLIC_LOG_D64 ", len: " PUBLIC_LOG_U32,
957 chunkInfo->offset, chunkInfo->dataLength);
958 }
959 MEDIA_LOG_D("cacheBuff ");
960 }
961 }
962
Check()963 bool CacheMediaChunkBufferImpl::Check()
964 {
965 std::lock_guard lock(mutex_);
966 return CheckInner();
967 }
968
Clear()969 void CacheMediaChunkBufferImpl::Clear()
970 {
971 std::lock_guard lock(mutex_);
972 auto iter = fragmentCacheBuffer_.begin();
973 while (iter != fragmentCacheBuffer_.end()) {
974 freeChunks_.splice(freeChunks_.end(), iter->chunks);
975 iter = EraseFragmentCache(iter);
976 }
977 lruCache_.Reset();
978 totalReadSize_ = 0;
979 }
980
GetFreeSize()981 uint64_t CacheMediaChunkBufferImpl::GetFreeSize()
982 {
983 std::lock_guard lock(mutex_);
984 uint64_t totalFreeSize = totalBuffSize_;
985 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); iter++) {
986 uint64_t fragmentDataLen = static_cast<uint64_t>(iter->dataLength);
987 totalFreeSize = totalFreeSize > fragmentDataLen ? totalFreeSize - fragmentDataLen : 0;
988 }
989 return totalFreeSize;
990 }
991
992 // Release all fragments before the offset.
ClearChunksOfFragment(uint64_t offset)993 bool CacheMediaChunkBufferImpl::ClearChunksOfFragment(uint64_t offset)
994 {
995 std::lock_guard lock(mutex_);
996 bool res = false;
997 auto fragmentPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
998 if (fragmentPos == fragmentCacheBuffer_.end()) {
999 return false;
1000 }
1001 auto& fragment = *fragmentPos;
1002 uint32_t chunkSize = fragment.chunks.size();
1003 for (uint32_t i = 0; i < chunkSize; ++i) {
1004 auto chunkIter = fragment.chunks.front();
1005 if (fragmentPos->accessPos == fragmentPos->chunks.end() || chunkIter == nullptr ||
1006 chunkIter->offset + chunkIter->dataLength >= offset) {
1007 break;
1008 }
1009
1010 auto chunkPos = fragmentPos->accessPos;
1011 if (chunkIter->offset >= (*chunkPos)->offset) { // Update accessPos of fragment
1012 chunkPos = GetOffsetChunkCache(fragmentPos->chunks, chunkIter->offset + chunkIter->dataLength,
1013 LeftBoundedRightOpenComp);
1014 (*fragmentPos).accessPos = chunkPos;
1015 }
1016
1017 MEDIA_LOG_D("ClearChunksOfFragment clear chunk, offsetBegin: " PUBLIC_LOG_U64 " offsetEnd " PUBLIC_LOG_U64,
1018 chunkIter->offset, chunkIter->offset + chunkIter->dataLength);
1019 auto tmp = UpdateFragmentCacheForDelHead(fragmentPos);
1020 if (tmp != nullptr) {
1021 res = true;
1022 freeChunks_.push_back(tmp);
1023 }
1024 }
1025 return res;
1026 }
1027
1028 // Release all chunks before the offset in the fragment to which the specified offset belongs.
ClearFragmentBeforeOffset(uint64_t offset)1029 bool CacheMediaChunkBufferImpl::ClearFragmentBeforeOffset(uint64_t offset)
1030 {
1031 std::lock_guard lock(mutex_);
1032 bool res = false;
1033 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end();) {
1034 if (iter->offsetBegin >= offset) {
1035 break;
1036 }
1037 if (iter->offsetBegin + static_cast<uint64_t>(iter->dataLength) <= offset) {
1038 MEDIA_LOG_D("ClearFragmentBeforeOffset clear fragment, offsetBegin: " PUBLIC_LOG_U64 " offsetEnd "
1039 PUBLIC_LOG_U64, iter->offsetBegin, iter->offsetBegin + iter->dataLength);
1040 freeChunks_.splice(freeChunks_.end(), iter->chunks);
1041 iter = EraseFragmentCache(iter);
1042 res = true;
1043 continue;
1044 }
1045 iter++;
1046 }
1047 return res;
1048 }
1049
1050 // Release all chunks of read fragment between minReadOffset and maxReadOffset.
ClearMiddleReadFragment(uint64_t minOffset,uint64_t maxOffset)1051 bool CacheMediaChunkBufferImpl::ClearMiddleReadFragment(uint64_t minOffset, uint64_t maxOffset)
1052 {
1053 std::lock_guard lock(mutex_);
1054 bool res = false;
1055 for (auto iter = fragmentCacheBuffer_.begin(); iter != fragmentCacheBuffer_.end(); iter++) {
1056 if (iter->offsetBegin + static_cast<uint64_t>(iter->dataLength) < minOffset) {
1057 continue;
1058 }
1059 if (iter->offsetBegin > maxOffset) {
1060 break;
1061 }
1062 if (iter->accessLength <= chunkSize_) {
1063 continue;
1064 }
1065 MEDIA_LOG_D("ClearMiddleReadFragment, minOffset: " PUBLIC_LOG_U64 " maxOffset: "
1066 PUBLIC_LOG_U64 " offsetBegin: " PUBLIC_LOG_U64 " dataLength: " PUBLIC_LOG_D64 " accessLength "
1067 PUBLIC_LOG_D64, minOffset, maxOffset, iter->offsetBegin, iter->dataLength, iter->accessLength);
1068 auto& fragment = *iter;
1069 uint32_t chunksSize = fragment.chunks.size();
1070 for (uint32_t i = 0; i < chunksSize; ++i) {
1071 auto chunkIter = fragment.chunks.front();
1072 if (chunkIter->dataLength >= iter->accessLength ||
1073 (chunkIter->offset + chunkIter->dataLength >= maxOffset &&
1074 chunkIter->offset <= minOffset)) {
1075 break;
1076 }
1077 auto tmp = UpdateFragmentCacheForDelHead(iter);
1078 if (tmp != nullptr) {
1079 freeChunks_.push_back(tmp);
1080 }
1081 }
1082 }
1083 return res;
1084 }
1085
IsReadSplit(uint64_t offset)1086 bool CacheMediaChunkBufferImpl::IsReadSplit(uint64_t offset)
1087 {
1088 std::lock_guard lock(mutex_);
1089 auto readPos = GetOffsetFragmentCache(readPos_, offset, LeftBoundedRightOpenComp);
1090 if (readPos != fragmentCacheBuffer_.end()) {
1091 return readPos->isSplit;
1092 }
1093 return false;
1094 }
1095
SetIsLargeOffsetSpan(bool isLargeOffsetSpan)1096 void CacheMediaChunkBufferImpl::SetIsLargeOffsetSpan(bool isLargeOffsetSpan)
1097 {
1098 isLargeOffsetSpan_ = isLargeOffsetSpan;
1099 }
1100
DumpAndCheckInner()1101 bool CacheMediaChunkBufferImpl::DumpAndCheckInner()
1102 {
1103 DumpInner(0);
1104 return CheckInner();
1105 }
1106
CheckFragment(const FragmentCacheBuffer & fragment,bool & checkSuccess)1107 void CacheMediaChunkBufferImpl::CheckFragment(const FragmentCacheBuffer& fragment, bool& checkSuccess)
1108 {
1109 if (fragment.accessPos != fragment.chunks.end()) {
1110 auto& accessChunk = *fragment.accessPos;
1111 auto accessLength = accessChunk->offset > fragment.offsetBegin ?
1112 accessChunk->offset - fragment.offsetBegin : 0;
1113 if (fragment.accessLength < accessLength ||
1114 fragment.accessLength >
1115 (static_cast<int64_t>(accessLength) + static_cast<int64_t>(accessChunk->dataLength))) {
1116 checkSuccess = false;
1117 }
1118 }
1119 }
1120
CheckInner()1121 bool CacheMediaChunkBufferImpl::CheckInner()
1122 {
1123 uint64_t chunkNum = 0;
1124 uint64_t totalReadSize = 0;
1125 bool checkSuccess = true;
1126 chunkNum = freeChunks_.size();
1127 for (auto const& fragment : fragmentCacheBuffer_) {
1128 int64_t dataLength = 0;
1129 chunkNum += fragment.chunks.size();
1130 totalReadSize += fragment.totalReadSize;
1131
1132 auto prev = fragment.chunks.begin();
1133 auto next = fragment.chunks.end();
1134 if (!fragment.chunks.empty()) {
1135 dataLength += static_cast<int64_t>((*prev)->dataLength);
1136 next = std::next(prev);
1137 if ((*prev)->offset != fragment.offsetBegin) {
1138 checkSuccess = false;
1139 }
1140 }
1141 while (next != fragment.chunks.end()) {
1142 auto &chunkPrev = *prev;
1143 auto &chunkNext = *next;
1144 dataLength += static_cast<int64_t>(chunkNext->dataLength);
1145 if (chunkPrev->offset + chunkPrev->dataLength != chunkNext->offset) {
1146 checkSuccess = false;
1147 }
1148 ++next;
1149 ++prev;
1150 }
1151 if (dataLength != fragment.dataLength) {
1152 checkSuccess = false;
1153 }
1154 CheckFragment(fragment, checkSuccess);
1155 }
1156 if (chunkNum != chunkMaxNum_ + 1) {
1157 checkSuccess = false;
1158 }
1159
1160 if (totalReadSize != totalReadSize_) {
1161 checkSuccess = false;
1162 }
1163 return checkSuccess;
1164 }
1165
1166
CacheMediaChunkBuffer()1167 CacheMediaChunkBuffer::CacheMediaChunkBuffer()
1168 {
1169 MEDIA_LOG_D("enter");
1170 impl_ = std::make_unique<CacheMediaChunkBufferImpl>();
1171 };
1172
~CacheMediaChunkBuffer()1173 CacheMediaChunkBuffer::~CacheMediaChunkBuffer()
1174 {
1175 MEDIA_LOG_D("exit");
1176 }
1177
Init(uint64_t totalBuffSize,uint32_t chunkSize)1178 bool CacheMediaChunkBuffer::Init(uint64_t totalBuffSize, uint32_t chunkSize)
1179 {
1180 return impl_->Init(totalBuffSize, chunkSize);
1181 }
1182
Read(void * ptr,uint64_t offset,size_t readSize)1183 size_t CacheMediaChunkBuffer::Read(void* ptr, uint64_t offset, size_t readSize)
1184 {
1185 return impl_->Read(ptr, offset, readSize);
1186 }
1187
Write(void * ptr,uint64_t offset,size_t writeSize)1188 size_t CacheMediaChunkBuffer::Write(void* ptr, uint64_t offset, size_t writeSize)
1189 {
1190 return impl_->Write(ptr, offset, writeSize);
1191 }
1192
Seek(uint64_t offset)1193 bool CacheMediaChunkBuffer::Seek(uint64_t offset)
1194 {
1195 return impl_->Seek(offset);
1196 }
1197
GetBufferSize(uint64_t offset)1198 size_t CacheMediaChunkBuffer::GetBufferSize(uint64_t offset)
1199 {
1200 return impl_->GetBufferSize(offset);
1201 }
1202
GetNextBufferOffset(uint64_t offset)1203 uint64_t CacheMediaChunkBuffer::GetNextBufferOffset(uint64_t offset)
1204 {
1205 return impl_->GetNextBufferOffset(offset);
1206 }
1207
Clear()1208 void CacheMediaChunkBuffer::Clear()
1209 {
1210 return impl_->Clear();
1211 }
1212
GetFreeSize()1213 uint64_t CacheMediaChunkBuffer::GetFreeSize()
1214 {
1215 return impl_->GetFreeSize();
1216 }
1217
ClearFragmentBeforeOffset(uint64_t offset)1218 bool CacheMediaChunkBuffer::ClearFragmentBeforeOffset(uint64_t offset)
1219 {
1220 return impl_->ClearFragmentBeforeOffset(offset);
1221 }
1222
ClearChunksOfFragment(uint64_t offset)1223 bool CacheMediaChunkBuffer::ClearChunksOfFragment(uint64_t offset)
1224 {
1225 return impl_->ClearChunksOfFragment(offset);
1226 }
1227
ClearMiddleReadFragment(uint64_t minOffset,uint64_t maxOffset)1228 bool CacheMediaChunkBuffer::ClearMiddleReadFragment(uint64_t minOffset, uint64_t maxOffset)
1229 {
1230 return impl_->ClearMiddleReadFragment(minOffset, maxOffset);
1231 }
1232
IsReadSplit(uint64_t offset)1233 bool CacheMediaChunkBuffer::IsReadSplit(uint64_t offset)
1234 {
1235 return impl_->IsReadSplit(offset);
1236 }
1237
SetIsLargeOffsetSpan(bool isLargeOffsetSpan)1238 void CacheMediaChunkBuffer::SetIsLargeOffsetSpan(bool isLargeOffsetSpan)
1239 {
1240 return impl_->SetIsLargeOffsetSpan(isLargeOffsetSpan);
1241 }
1242
SetReadBlocking(bool isReadBlockingAllowed)1243 void CacheMediaChunkBuffer::SetReadBlocking(bool isReadBlockingAllowed)
1244 {
1245 (void)isReadBlockingAllowed;
1246 }
1247
Dump(uint64_t param)1248 void CacheMediaChunkBuffer::Dump(uint64_t param)
1249 {
1250 return impl_->Dump(param);
1251 }
1252
Check()1253 bool CacheMediaChunkBuffer::Check()
1254 {
1255 return impl_->Check();
1256 }
1257 }
1258 }