/* * Copyright (c) 2021-2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * @file safe_block_queue.h * * Provides interfaces for thread-safe blocking queues in c_utils. * The file includes the SafeBlockQueue class and * the SafeBlockQueueTracking class for trackable tasks. */ #ifndef UTILS_BASE_BLOCK_QUEUE_H #define UTILS_BASE_BLOCK_QUEUE_H #include #include #include #include #include namespace OHOS { /** * @brief Provides interfaces for thread-safe blocking queues. * * The interfaces can be used to perform blocking and non-blocking push and * pop operations on queues. */ template class SafeBlockQueue { public: explicit SafeBlockQueue(int capacity) : maxSize_(capacity) { } /** * @brief Inserts an element at the end of this queue in blocking mode. * * If the queue is full, the thread of the push operation will be blocked * until the queue has space. * If the queue is not full, the push operation can be performed and one of the * pop threads (blocked when the queue is empty) is woken up. * * @param elem Indicates the element to insert. */ virtual void Push(T const& elem) { std::unique_lock lock(mutexLock_); while (queueT_.size() >= maxSize_) { // If the queue is full, wait for jobs to be taken. cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); } // Insert the element into the queue if the queue is not full. queueT_.push(elem); cvNotEmpty_.notify_one(); } /** * @brief Removes the first element from this queue in blocking mode. * * If the queue is empty, the thread of the pop operation will be blocked * until the queue has elements. * If the queue is not empty, the pop operation can be performed, the first * element of the queue is returned, and one of the push threads (blocked * when the queue is full) is woken up. */ T Pop() { std::unique_lock lock(mutexLock_); while (queueT_.empty()) { // If the queue is empty, wait for elements to be pushed in. cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); }); } T elem = queueT_.front(); queueT_.pop(); cvNotFull_.notify_one(); return elem; } /** * @brief Inserts an element at the end of this queue in non-blocking mode. * * If the queue is full, false is returned directly. * If the queue is not full, the push operation can be performed, one of the * pop threads (blocked when the queue is empty) is woken up, and true * is returned. * * @param elem Indicates the element to insert. */ virtual bool PushNoWait(T const& elem) { std::unique_lock lock(mutexLock_); if (queueT_.size() >= maxSize_) { return false; } // Insert the element if the queue is not full. queueT_.push(elem); cvNotEmpty_.notify_one(); return true; } /** * @brief Removes the first element from this queue in non-blocking mode. * * If the queue is empty, false is returned directly. * If the queue is not empty, the pop operation can be performed, one of the * push threads (blocked when the queue is full) is woken up, and true * is returned. * * @param outtask Indicates the data of the pop operation. */ bool PopNotWait(T& outtask) { std::unique_lock lock(mutexLock_); if (queueT_.empty()) { return false; } outtask = queueT_.front(); queueT_.pop(); cvNotFull_.notify_one(); return true; } unsigned int Size() { std::unique_lock lock(mutexLock_); return queueT_.size(); } bool IsEmpty() { std::unique_lock lock(mutexLock_); return queueT_.empty(); } bool IsFull() { std::unique_lock lock(mutexLock_); return queueT_.size() == maxSize_; } virtual ~SafeBlockQueue() {} protected: unsigned long maxSize_; // Capacity of the queue std::mutex mutexLock_; std::condition_variable cvNotEmpty_; std::condition_variable cvNotFull_; std::queue queueT_; }; /** * @brief Provides interfaces for operating the thread-safe blocking queues * and tracking the number of pending tasks. * This class inherits from SafeBlockQueue. */ template class SafeBlockQueueTracking : public SafeBlockQueue { public: explicit SafeBlockQueueTracking(int capacity) : SafeBlockQueue(capacity) { unfinishedTaskCount_ = 0; } virtual ~SafeBlockQueueTracking() {} /** * @brief Inserts an element at the end of this queue in blocking mode. * * If the queue is full, the thread of the push operation will be blocked * until the queue has space. * If the queue is not full, the push operation can be performed and one of the * pop threads (blocked when the queue is empty) is woken up. */ virtual void Push(T const& elem) { unfinishedTaskCount_++; std::unique_lock lock(mutexLock_); while (queueT_.size() >= maxSize_) { // If the queue is full, wait for jobs to be taken. cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); } // If the queue is not full, insert the element. queueT_.push(elem); cvNotEmpty_.notify_one(); } /** * @brief Inserts an element at the end of this queue in non-blocking mode. * * If the queue is full, false is returned directly. * If the queue is not full, the push operation can be performed, * one of the pop threads (blocked when the queue is empty) is woken up, * and true is returned. */ virtual bool PushNoWait(T const& elem) { std::unique_lock lock(mutexLock_); if (queueT_.size() >= maxSize_) { return false; } // Insert the element if the queue is not full. queueT_.push(elem); unfinishedTaskCount_++; cvNotEmpty_.notify_one(); return true; } /** * @brief Called to return the result when a task is complete. * * If the count of unfinished tasks < 1, false is returned directly. * If the count of unfinished tasks = 1, all the threads blocked * by calling Join() will be woken up, * the count of unfinished tasks decrements by 1, and true is returned. * If the count of unfinished tasks > 1, * the count of unfinished tasks decrements by 1, and true is returned. */ bool OneTaskDone() { std::unique_lock lock(mutexLock_); int unfinished = unfinishedTaskCount_ - 1; if (unfinished <= 0) { if (unfinished < 0) { return false; // false mean call elem done too many times } cvAllTasksDone_.notify_all(); } unfinishedTaskCount_ = unfinished; return true; } /** * @brief Waits for all tasks to complete. * * If there is any task not completed, the current thread will be * blocked even if it is just woken up. */ void Join() { std::unique_lock lock(mutexLock_); cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; }); } /** * @brief Obtains the number of unfinished tasks. */ int GetUnfinishTaskNum() { return unfinishedTaskCount_; } protected: using SafeBlockQueue::maxSize_; using SafeBlockQueue::mutexLock_; using SafeBlockQueue::cvNotEmpty_; using SafeBlockQueue::cvNotFull_; using SafeBlockQueue::queueT_; std::atomic unfinishedTaskCount_; std::condition_variable cvAllTasksDone_; }; } // namespace OHOS #endif