1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef AVCODEC_LOCK_FREE_QUEUE_H
17 #define AVCODEC_LOCK_FREE_QUEUE_H
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <mutex>
22 #include <memory>
23 #include <type_traits>
24 #include "utils.h"
25 
26 
27 namespace OHOS {
28 namespace MediaAVCodec {
29 
30 enum QueueResult {
31     OK,
32     FULL,
33     EMPTY,
34     INACTIVE,
35     NO_MEMORY
36 };
37 constexpr size_t QUEUE_RESULT_NUM{5};
38 constexpr const char* QUEUE_RESULT_DESCRIPTION[QUEUE_RESULT_NUM]{
39     "OK",
40     "Full",
41     "Empty",
42     "Inactive",
43     "NoMemory"
44 };
45 
46 /*
47     A simple lock free ring buffer queue for 1 producer and 1 consumer
48 */
49 
50 template<typename T, std::size_t N>
51 class LockFreeQueue {
52 public:
53     using UnderlyingType = T;
54 
LockFreeQueue(const std::string & name)55     explicit LockFreeQueue(const std::string& name) : name_(name) {}
~LockFreeQueue()56     ~LockFreeQueue()
57     {
58         AVCODEC_LOGD("Queue %{public}s dtor", name_.data());
59     }
60 
61     static std::shared_ptr<LockFreeQueue<T, N>> Create(const std::string& name = "")
62     {
63         auto p = std::make_unique<LockFreeQueue<T, N>>(name);
64         CHECK_AND_RETURN_RET_LOG(p && p->Alloc(), nullptr, "Create queue failed");
65         return p;
66     }
67 
PushWait(const T & data)68     QueueResult PushWait(const T& data)
69     {
70         CHECK_AND_RETURN_RET_LOG(data_, QueueResult::NO_MEMORY, "Queue %{public}s has no memory", name_.data());
71         if (!active_.load()) {
72             AVCODEC_LOGD("Queue %{public}s is inactive", name_.data());
73             return QueueResult::INACTIVE;
74         }
75         size_t currentTail{0};
76         size_t newTail{0};
77         bool canPush{false};
78         std::unique_lock<std::mutex> lock(canPushMtx_, std::defer_lock);
79         do {
80             currentTail = tail_.load();
81             newTail = (currentTail + 1) % queueSize_;
82             // when queue is full, wait until at least 1 data is popped
83             if (newTail == head_.load()) {
84                 lock.lock();
85                 canPushCv_.wait(lock, [&newTail, this]() { return newTail != head_.load() || !active_.load(); });
86                 lock.unlock();
87             }
88             if (!active_.load()) {
89                 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data());
90                 return QueueResult::INACTIVE;
91             }
92             canPush = tail_.compare_exchange_strong(currentTail, newTail);
93         } while (!canPush);
94 
95         data_[currentTail].data = data;
96         canPopCv_.notify_one();
97 
98         return QueueResult::OK;
99     }
100 
PopWait(T & data)101     QueueResult PopWait(T& data)
102     {
103         CHECK_AND_RETURN_RET_LOG(data_, QueueResult::NO_MEMORY, "Queue %{public}s has no memory", name_.data());
104         if (!active_.load()) {
105             AVCODEC_LOGD("Queue %{public}s is inactive", name_.data());
106             return QueueResult::INACTIVE;
107         }
108         size_t currentHead{0};
109         size_t newHead{0};
110         bool canPop{false};
111         std::unique_lock<std::mutex> lock(canPopMtx_, std::defer_lock);
112 
113         do {
114             currentHead = head_.load();
115             newHead = (currentHead + 1) % queueSize_;
116             // when queue is empty, wait until at least 1 data is pushed.
117             if (currentHead == tail_.load()) {
118                 lock.lock();
119                 canPopCv_.wait(lock, [&currentHead, this]() { return currentHead != tail_.load() || !active_.load(); });
120                 lock.unlock();
121             }
122             if (!active_.load()) {
123                 AVCODEC_LOGD("Queue %{public}s is inactive", name_.data());
124                 return QueueResult::INACTIVE;
125             }
126             canPop = head_.compare_exchange_strong(currentHead, newHead);
127         } while (!canPop);
128 
129         data = data_[currentHead].data;
130         canPushCv_.notify_one();
131 
132         return QueueResult::OK;
133     }
134 
Empty()135     bool Empty() const
136     {
137         return head_.load() == tail_.load();
138     }
139 
Full()140     bool Full() const
141     {
142         return head_.load() == (tail_.load() + 1) % queueSize_;
143     }
144 
Clear()145     void Clear()
146     {
147         Deactivate();
148         head_.store(0);
149         tail_.store(0);
150     }
151 
Deactivate()152     void Deactivate()
153     {
154         active_.store(false);
155         canPushCv_.notify_all();
156         canPopCv_.notify_all();
157     }
158 
Activate()159     void Activate()
160     {
161         active_.store(true);
162     }
163 private:
164     struct Node {
165         T data;
166     };
167 
Alloc()168     bool Alloc()
169     {
170         // when queue is full, there is a void element between head and tail. So the real length of the data is N + 1.
171         data_ = std::make_unique<Node[]>(N + 1);
172         return data_ != nullptr;
173     }
174 
175     std::atomic<size_t> head_{0};
176     std::atomic<size_t> tail_{0};
177     std::atomic<bool> active_{true};
178     std::unique_ptr<Node[]> data_{nullptr};
179     static constexpr std::size_t queueSize_{N + 1};
180     std::mutex canPushMtx_;
181     std::mutex canPopMtx_;
182     std::condition_variable canPushCv_;
183     std::condition_variable canPopCv_;
184     std::string name_;
185     static constexpr HiviewDFX::HiLogLabel LABEL{LogLabel("LockFreeQueue")};
186 };
187 
188 
189 } // namespace MediaAVCodec
190 } // namespace OHOS
191 
192 #endif // AVCODEC_LOCK_FREE_QUEUE_H
193 
194