1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef INTELL_VOICE_BUFFER_QUEUE_H
16 #define INTELL_VOICE_BUFFER_QUEUE_H
17 
18 #include <unistd.h>
19 #include <queue>
20 #include <mutex>
21 #include <condition_variable>
22 #include <chrono>
23 #include "array_buffer_util.h"
24 #include "intell_voice_log.h"
25 
26 #define LOG_TAG "QueueUtil"
27 
28 namespace OHOS {
29 namespace IntellVoiceUtils {
30 constexpr uint32_t MAX_CAPACITY = 500;
31 
32 template <typename T>
33 class QueueUtil {
34 public:
35     QueueUtil() = default;
~QueueUtil()36     ~QueueUtil()
37     {
38         Uninit();
39     }
40     bool Init(uint32_t capacity = MAX_CAPACITY)
41     {
42         std::unique_lock<std::mutex> lock(queueMutex_);
43         SetAvailable(true);
44         capacity_ = capacity;
45         return true;
46     }
47     bool Push(const T &element, bool isWait = true)
48     {
49         std::unique_lock<std::mutex> lock(queueMutex_);
50         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
51 
52         while (queue_.size() >= capacity_) {
53             CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait");
54             notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
55             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
56         }
57 
58         queue_.push(element);
59         notEmptyCv_.notify_one();
60         return true;
61     }
62     bool Push(T &&element, bool isWait = true)
63     {
64         std::unique_lock<std::mutex> lock(queueMutex_);
65         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
66 
67         while (queue_.size() >= capacity_) {
68             CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait");
69             notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); });
70             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
71         }
72 
73         queue_.push(std::move(element));
74         notEmptyCv_.notify_one();
75         return true;
76     }
Pop(T & element)77     bool Pop(T &element)
78     {
79         std::unique_lock<std::mutex> lock(queueMutex_);
80         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
81 
82         while (queue_.empty()) {
83             notEmptyCv_.wait(lock, [&] { return (!queue_.empty() || !IsAvailable()); });
84             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
85         }
86 
87         element = std::move(queue_.front());
88         queue_.pop();
89         notFullCv_.notify_one();
90         return true;
91     }
PopUntilTimeout(uint32_t timeLenMs,T & element)92     bool PopUntilTimeout(uint32_t timeLenMs, T &element)
93     {
94         std::unique_lock<std::mutex> lock(queueMutex_);
95         CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
96 
97         while (queue_.empty()) {
98             if (!(notEmptyCv_.wait_for(lock, std::chrono::milliseconds(timeLenMs),
99                 [&] { return (!queue_.empty() || !IsAvailable()); }))) {
100                 INTELL_VOICE_LOG_WARN("wait time out");
101                 return false;
102             }
103             CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available");
104         }
105 
106         element = std::move(queue_.front());
107         queue_.pop();
108         notFullCv_.notify_one();
109         return true;
110     }
Uninit()111     void Uninit()
112     {
113         {
114             std::unique_lock<std::mutex> lock(queueMutex_);
115             capacity_ = 0;
116             ClearQueue();
117             SetAvailable(false);
118         }
119         notEmptyCv_.notify_all();
120         notFullCv_.notify_all();
121     }
122 private:
IsAvailable()123     bool IsAvailable() const
124     {
125         return isAvailable_;
126     }
SetAvailable(bool isAvailable)127     void SetAvailable(bool isAvailable)
128     {
129         isAvailable_ = isAvailable;
130     }
ClearQueue()131     void ClearQueue()
132     {
133         while (!queue_.empty()) {
134             queue_.pop();
135         }
136     }
137 
138 private:
139     bool isAvailable_ = false;
140     uint32_t capacity_ = 0;
141     std::mutex queueMutex_;
142     std::condition_variable notEmptyCv_;
143     std::condition_variable notFullCv_;
144     std::queue<T> queue_;
145 };
146 
147 using Uint8ArrayBufferQueue = QueueUtil<std::unique_ptr<Uint8ArrayBuffer>>;
148 }
149 }
150 
151 #undef LOG_TAG
152 
153 #endif