1 /* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef UTILS_BLOCK_QUEUE_H 17 #define UTILS_BLOCK_QUEUE_H 18 #include <atomic> 19 #include <mutex> 20 #include <condition_variable> 21 #include <queue> 22 #include "avcodec_log.h" 23 24 namespace OHOS { 25 namespace MediaAVCodec { 26 namespace { 27 constexpr size_t DEFAULT_QUEUE_SIZE = 10; 28 } 29 30 template <typename T> 31 class BlockQueue { 32 public: 33 explicit BlockQueue(std::string name, size_t capacity = DEFAULT_QUEUE_SIZE) 34 : name_(std::move(name)), capacity_(capacity), isActive_(true) 35 { 36 } 37 38 ~BlockQueue() = default; 39 Size()40 size_t Size() 41 { 42 std::lock_guard<std::mutex> lock(mutex_); 43 return que_.size(); 44 } 45 Capacity()46 size_t Capacity() 47 { 48 return capacity_; 49 } 50 Empty()51 bool Empty() 52 { 53 std::lock_guard<std::mutex> lock(mutex_); 54 return que_.empty(); 55 } 56 Push(const T & block)57 bool Push(const T& block) 58 { 59 AVCODEC_LOGD("block queue %{public}s Push enter.", name_.c_str()); 60 std::unique_lock<std::mutex> lock(mutex_); 61 if (!isActive_) { 62 AVCODEC_LOGD("block queue %{public}s is inactive for Push.", name_.c_str()); 63 return false; 64 } 65 if (que_.size() >= capacity_) { 66 AVCODEC_LOGD("block queue %{public}s is full, please waiting for Pop.", name_.c_str()); 67 condFull_.wait(lock, [this] { return !isActive_ || que_.size() < capacity_; }); 68 } 69 if (!isActive_) { 70 AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, isFull: %{public}d.", 71 name_.c_str(), isActive_.load(), que_.size() < capacity_); 72 return false; 73 } 74 que_.push(block); 75 condEmpty_.notify_one(); 76 AVCODEC_LOGD("block queue %{public}s Push ok.", name_.c_str()); 77 return true; 78 } 79 Pop()80 T Pop() 81 { 82 AVCODEC_LOGD("block queue %{public}s Pop enter.", name_.c_str()); 83 std::unique_lock<std::mutex> lock(mutex_); 84 if (que_.empty() && !isActive_) { 85 AVCODEC_LOGD("block queue %{public}s is inactive for Pop.", name_.c_str()); 86 return {}; 87 } else if (que_.empty() && isActive_) { 88 AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str()); 89 condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); }); 90 } 91 if (que_.empty()) { 92 AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.", 93 name_.c_str(), isActive_.load(), que_.size()); 94 return {}; 95 } 96 T element = que_.front(); 97 que_.pop(); 98 condFull_.notify_one(); 99 AVCODEC_LOGD("block queue %{public}s Pop ok.", name_.c_str()); 100 return element; 101 } 102 Front()103 T Front() 104 { 105 AVCODEC_LOGD("block queue %{public}s Front enter.", name_.c_str()); 106 std::unique_lock<std::mutex> lock(mutex_); 107 if (que_.empty() && !isActive_) { 108 AVCODEC_LOGD("block queue %{public}s is inactive for Front.", name_.c_str()); 109 return {}; 110 } else if (que_.empty() && isActive_) { 111 AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str()); 112 condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); }); 113 } 114 if (que_.empty()) { 115 AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.", 116 name_.c_str(), isActive_.load(), que_.size()); 117 return {}; 118 } 119 T element = que_.front(); 120 condFull_.notify_one(); 121 AVCODEC_LOGD("block queue %{public}s Front ok.", name_.c_str()); 122 return element; 123 } 124 Clear()125 void Clear() 126 { 127 std::lock_guard<std::mutex> lock(mutex_); 128 ClearUnprotected(); 129 } 130 131 void SetActive(bool active, bool cleanData = true) 132 { 133 std::lock_guard<std::mutex> lock(mutex_); 134 AVCODEC_LOGD("SetActive %{public}s: %{public}d.", name_.c_str(), isActive_.load()); 135 isActive_ = active; 136 if (!active) { 137 if (cleanData) { 138 ClearUnprotected(); 139 } 140 condEmpty_.notify_one(); 141 } 142 } 143 144 private: ClearUnprotected()145 void ClearUnprotected() 146 { 147 if (que_.empty()) { 148 return; 149 } 150 bool needNotify = que_.size() == capacity_; 151 std::queue<T>().swap(que_); 152 if (needNotify) { 153 condFull_.notify_one(); 154 } 155 } 156 157 std::mutex mutex_; 158 std::condition_variable condFull_; 159 std::condition_variable condEmpty_; 160 std::queue<T> que_; 161 std::string name_; 162 const size_t capacity_; 163 std::atomic<bool> isActive_; 164 const OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "BlockQueue"}; 165 }; 166 } // namespace MediaAVCodec 167 } // namespace OHOS 168 #endif // !UTILS_BLOCK_QUEUE_H 169