1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef UTILS_BLOCK_QUEUE_H
17 #define UTILS_BLOCK_QUEUE_H
18 #include <atomic>
19 #include <mutex>
20 #include <condition_variable>
21 #include <queue>
22 #include "avcodec_log.h"
23 
24 namespace OHOS {
25 namespace MediaAVCodec {
26 namespace {
27 constexpr size_t DEFAULT_QUEUE_SIZE = 10;
28 }
29 
30 template <typename T>
31 class BlockQueue {
32 public:
33     explicit BlockQueue(std::string name, size_t capacity = DEFAULT_QUEUE_SIZE)
34         : name_(std::move(name)), capacity_(capacity), isActive_(true)
35     {
36     }
37 
38     ~BlockQueue() = default;
39 
Size()40     size_t Size()
41     {
42         std::lock_guard<std::mutex> lock(mutex_);
43         return que_.size();
44     }
45 
Capacity()46     size_t Capacity()
47     {
48         return capacity_;
49     }
50 
Empty()51     bool Empty()
52     {
53         std::lock_guard<std::mutex> lock(mutex_);
54         return que_.empty();
55     }
56 
Push(const T & block)57     bool Push(const T& block)
58     {
59         AVCODEC_LOGD("block queue %{public}s Push enter.", name_.c_str());
60         std::unique_lock<std::mutex> lock(mutex_);
61         if (!isActive_) {
62             AVCODEC_LOGD("block queue %{public}s is inactive for Push.", name_.c_str());
63             return false;
64         }
65         if (que_.size() >= capacity_) {
66             AVCODEC_LOGD("block queue %{public}s is full, please waiting for Pop.", name_.c_str());
67             condFull_.wait(lock, [this] { return !isActive_ || que_.size() < capacity_; });
68         }
69         if (!isActive_) {
70             AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, isFull: %{public}d.",
71                 name_.c_str(), isActive_.load(), que_.size() < capacity_);
72             return false;
73         }
74         que_.push(block);
75         condEmpty_.notify_one();
76         AVCODEC_LOGD("block queue %{public}s Push ok.", name_.c_str());
77         return true;
78     }
79 
Pop()80     T Pop()
81     {
82         AVCODEC_LOGD("block queue %{public}s Pop enter.", name_.c_str());
83         std::unique_lock<std::mutex> lock(mutex_);
84         if (que_.empty() && !isActive_) {
85             AVCODEC_LOGD("block queue %{public}s is inactive for Pop.", name_.c_str());
86             return {};
87         } else if (que_.empty() && isActive_) {
88             AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
89             condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
90         }
91         if (que_.empty()) {
92             AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
93                 name_.c_str(), isActive_.load(), que_.size());
94             return {};
95         }
96         T element = que_.front();
97         que_.pop();
98         condFull_.notify_one();
99         AVCODEC_LOGD("block queue %{public}s Pop ok.", name_.c_str());
100         return element;
101     }
102 
Front()103     T Front()
104     {
105         AVCODEC_LOGD("block queue %{public}s Front enter.", name_.c_str());
106         std::unique_lock<std::mutex> lock(mutex_);
107         if (que_.empty() && !isActive_) {
108             AVCODEC_LOGD("block queue %{public}s is inactive for Front.", name_.c_str());
109             return {};
110         } else if (que_.empty() && isActive_) {
111             AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
112             condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
113         }
114         if (que_.empty()) {
115             AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
116                 name_.c_str(), isActive_.load(), que_.size());
117             return {};
118         }
119         T element = que_.front();
120         condFull_.notify_one();
121         AVCODEC_LOGD("block queue %{public}s Front ok.", name_.c_str());
122         return element;
123     }
124 
Clear()125     void Clear()
126     {
127         std::lock_guard<std::mutex> lock(mutex_);
128         ClearUnprotected();
129     }
130 
131     void SetActive(bool active, bool cleanData = true)
132     {
133         std::lock_guard<std::mutex> lock(mutex_);
134         AVCODEC_LOGD("SetActive %{public}s: %{public}d.", name_.c_str(), isActive_.load());
135         isActive_ = active;
136         if (!active) {
137             if (cleanData) {
138                 ClearUnprotected();
139             }
140             condEmpty_.notify_one();
141         }
142     }
143 
144 private:
ClearUnprotected()145     void ClearUnprotected()
146     {
147         if (que_.empty()) {
148             return;
149         }
150         bool needNotify = que_.size() == capacity_;
151         std::queue<T>().swap(que_);
152         if (needNotify) {
153             condFull_.notify_one();
154         }
155     }
156 
157     std::mutex mutex_;
158     std::condition_variable condFull_;
159     std::condition_variable condEmpty_;
160     std::queue<T> que_;
161     std::string name_;
162     const size_t capacity_;
163     std::atomic<bool> isActive_;
164     const OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "BlockQueue"};
165 };
166 } // namespace MediaAVCodec
167 } // namespace OHOS
168 #endif // !UTILS_BLOCK_QUEUE_H
169