1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef INTELL_VOICE_BUFFER_QUEUE_H 16 #define INTELL_VOICE_BUFFER_QUEUE_H 17 18 #include <unistd.h> 19 #include <queue> 20 #include <mutex> 21 #include <condition_variable> 22 #include <chrono> 23 #include "array_buffer_util.h" 24 #include "intell_voice_log.h" 25 26 #define LOG_TAG "QueueUtil" 27 28 namespace OHOS { 29 namespace IntellVoiceUtils { 30 constexpr uint32_t MAX_CAPACITY = 500; 31 32 template <typename T> 33 class QueueUtil { 34 public: 35 QueueUtil() = default; ~QueueUtil()36 ~QueueUtil() 37 { 38 Uninit(); 39 } 40 bool Init(uint32_t capacity = MAX_CAPACITY) 41 { 42 std::unique_lock<std::mutex> lock(queueMutex_); 43 SetAvailable(true); 44 capacity_ = capacity; 45 return true; 46 } 47 bool Push(const T &element, bool isWait = true) 48 { 49 std::unique_lock<std::mutex> lock(queueMutex_); 50 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 51 52 while (queue_.size() >= capacity_) { 53 CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait"); 54 notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); }); 55 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 56 } 57 58 queue_.push(element); 59 notEmptyCv_.notify_one(); 60 return true; 61 } 62 bool Push(T &&element, bool isWait = true) 63 { 64 std::unique_lock<std::mutex> lock(queueMutex_); 65 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 66 67 while (queue_.size() >= capacity_) { 68 CHECK_CONDITION_RETURN_FALSE((!isWait), "queue is full, no need to wait"); 69 notFullCv_.wait(lock, [&]() { return ((queue_.size() < capacity_) || (!IsAvailable())); }); 70 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 71 } 72 73 queue_.push(std::move(element)); 74 notEmptyCv_.notify_one(); 75 return true; 76 } Pop(T & element)77 bool Pop(T &element) 78 { 79 std::unique_lock<std::mutex> lock(queueMutex_); 80 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 81 82 while (queue_.empty()) { 83 notEmptyCv_.wait(lock, [&] { return (!queue_.empty() || !IsAvailable()); }); 84 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 85 } 86 87 element = std::move(queue_.front()); 88 queue_.pop(); 89 notFullCv_.notify_one(); 90 return true; 91 } PopUntilTimeout(uint32_t timeLenMs,T & element)92 bool PopUntilTimeout(uint32_t timeLenMs, T &element) 93 { 94 std::unique_lock<std::mutex> lock(queueMutex_); 95 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 96 97 while (queue_.empty()) { 98 if (!(notEmptyCv_.wait_for(lock, std::chrono::milliseconds(timeLenMs), 99 [&] { return (!queue_.empty() || !IsAvailable()); }))) { 100 INTELL_VOICE_LOG_WARN("wait time out"); 101 return false; 102 } 103 CHECK_CONDITION_RETURN_FALSE(!IsAvailable(), "queue is not available"); 104 } 105 106 element = std::move(queue_.front()); 107 queue_.pop(); 108 notFullCv_.notify_one(); 109 return true; 110 } Uninit()111 void Uninit() 112 { 113 { 114 std::unique_lock<std::mutex> lock(queueMutex_); 115 capacity_ = 0; 116 ClearQueue(); 117 SetAvailable(false); 118 } 119 notEmptyCv_.notify_all(); 120 notFullCv_.notify_all(); 121 } 122 private: IsAvailable()123 bool IsAvailable() const 124 { 125 return isAvailable_; 126 } SetAvailable(bool isAvailable)127 void SetAvailable(bool isAvailable) 128 { 129 isAvailable_ = isAvailable; 130 } ClearQueue()131 void ClearQueue() 132 { 133 while (!queue_.empty()) { 134 queue_.pop(); 135 } 136 } 137 138 private: 139 bool isAvailable_ = false; 140 uint32_t capacity_ = 0; 141 std::mutex queueMutex_; 142 std::condition_variable notEmptyCv_; 143 std::condition_variable notFullCv_; 144 std::queue<T> queue_; 145 }; 146 147 using Uint8ArrayBufferQueue = QueueUtil<std::unique_ptr<Uint8ArrayBuffer>>; 148 } 149 } 150 151 #undef LOG_TAG 152 153 #endif