1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef AVCODEC_LOCK_FREE_QUEUE_H 17 #define AVCODEC_LOCK_FREE_QUEUE_H 18 19 #include <atomic> 20 #include <condition_variable> 21 #include <mutex> 22 #include <memory> 23 #include <type_traits> 24 #include "utils.h" 25 26 27 namespace OHOS { 28 namespace MediaAVCodec { 29 30 enum QueueResult { 31 OK, 32 FULL, 33 EMPTY, 34 INACTIVE, 35 NO_MEMORY 36 }; 37 constexpr size_t QUEUE_RESULT_NUM{5}; 38 constexpr const char* QUEUE_RESULT_DESCRIPTION[QUEUE_RESULT_NUM]{ 39 "OK", 40 "Full", 41 "Empty", 42 "Inactive", 43 "NoMemory" 44 }; 45 46 /* 47 A simple lock free ring buffer queue for 1 producer and 1 consumer 48 */ 49 50 template<typename T, std::size_t N> 51 class LockFreeQueue { 52 public: 53 using UnderlyingType = T; 54 LockFreeQueue(const std::string & name)55 explicit LockFreeQueue(const std::string& name) : name_(name) {} ~LockFreeQueue()56 ~LockFreeQueue() 57 { 58 AVCODEC_LOGD("Queue %{public}s dtor", name_.data()); 59 } 60 61 static std::shared_ptr<LockFreeQueue<T, N>> Create(const std::string& name = "") 62 { 63 auto p = std::make_unique<LockFreeQueue<T, N>>(name); 64 CHECK_AND_RETURN_RET_LOG(p && p->Alloc(), nullptr, "Create queue failed"); 65 return p; 66 } 67 PushWait(const T & data)68 QueueResult PushWait(const T& data) 69 { 70 CHECK_AND_RETURN_RET_LOG(data_, QueueResult::NO_MEMORY, "Queue %{public}s has no memory", name_.data()); 71 if (!active_.load()) { 72 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data()); 73 return QueueResult::INACTIVE; 74 } 75 size_t currentTail{0}; 76 size_t newTail{0}; 77 bool canPush{false}; 78 std::unique_lock<std::mutex> lock(canPushMtx_, std::defer_lock); 79 do { 80 currentTail = tail_.load(); 81 newTail = (currentTail + 1) % queueSize_; 82 // when queue is full, wait until at least 1 data is popped 83 if (newTail == head_.load()) { 84 lock.lock(); 85 canPushCv_.wait(lock, [&newTail, this]() { return newTail != head_.load() || !active_.load(); }); 86 lock.unlock(); 87 } 88 if (!active_.load()) { 89 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data()); 90 return QueueResult::INACTIVE; 91 } 92 canPush = tail_.compare_exchange_strong(currentTail, newTail); 93 } while (!canPush); 94 95 data_[currentTail].data = data; 96 canPopCv_.notify_one(); 97 98 return QueueResult::OK; 99 } 100 PopWait(T & data)101 QueueResult PopWait(T& data) 102 { 103 CHECK_AND_RETURN_RET_LOG(data_, QueueResult::NO_MEMORY, "Queue %{public}s has no memory", name_.data()); 104 if (!active_.load()) { 105 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data()); 106 return QueueResult::INACTIVE; 107 } 108 size_t currentHead{0}; 109 size_t newHead{0}; 110 bool canPop{false}; 111 std::unique_lock<std::mutex> lock(canPopMtx_, std::defer_lock); 112 113 do { 114 currentHead = head_.load(); 115 newHead = (currentHead + 1) % queueSize_; 116 // when queue is empty, wait until at least 1 data is pushed. 117 if (currentHead == tail_.load()) { 118 lock.lock(); 119 canPopCv_.wait(lock, [¤tHead, this]() { return currentHead != tail_.load() || !active_.load(); }); 120 lock.unlock(); 121 } 122 if (!active_.load()) { 123 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data()); 124 return QueueResult::INACTIVE; 125 } 126 canPop = head_.compare_exchange_strong(currentHead, newHead); 127 } while (!canPop); 128 129 data = data_[currentHead].data; 130 canPushCv_.notify_one(); 131 132 return QueueResult::OK; 133 } 134 Empty()135 bool Empty() const 136 { 137 return head_.load() == tail_.load(); 138 } 139 Full()140 bool Full() const 141 { 142 return head_.load() == (tail_.load() + 1) % queueSize_; 143 } 144 Clear()145 void Clear() 146 { 147 Deactivate(); 148 head_.store(0); 149 tail_.store(0); 150 } 151 Deactivate()152 void Deactivate() 153 { 154 active_.store(false); 155 canPushCv_.notify_all(); 156 canPopCv_.notify_all(); 157 } 158 Activate()159 void Activate() 160 { 161 active_.store(true); 162 } 163 private: 164 struct Node { 165 T data; 166 }; 167 Alloc()168 bool Alloc() 169 { 170 // when queue is full, there is a void element between head and tail. So the real length of the data is N + 1. 171 data_ = std::make_unique<Node[]>(N + 1); 172 return data_ != nullptr; 173 } 174 175 std::atomic<size_t> head_{0}; 176 std::atomic<size_t> tail_{0}; 177 std::atomic<bool> active_{true}; 178 std::unique_ptr<Node[]> data_{nullptr}; 179 static constexpr std::size_t queueSize_{N + 1}; 180 std::mutex canPushMtx_; 181 std::mutex canPopMtx_; 182 std::condition_variable canPushCv_; 183 std::condition_variable canPopCv_; 184 std::string name_; 185 static constexpr HiviewDFX::HiLogLabel LABEL{LogLabel("LockFreeQueue")}; 186 }; 187 188 189 } // namespace MediaAVCodec 190 } // namespace OHOS 191 192 #endif // AVCODEC_LOCK_FREE_QUEUE_H 193 194