1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef DFSU_THREAD_SAFE_QUEUE_H 17 #define DFSU_THREAD_SAFE_QUEUE_H 18 19 #include <algorithm> 20 #include <deque> 21 22 #include "dfsu_exception.h" 23 24 namespace OHOS { 25 namespace Storage { 26 namespace DistributedFile { 27 /** 28 * @brief A Thread-safe Queue. 29 * 30 * Design choices: 31 * 1) unlimited capacity 32 * 2) throw exception to indicate failues 33 * 3) blocking pop interface 34 * 3) hatlable 35 * 36 * @tparam T Any type. Aggregate data type is prefered 37 * 38 */ 39 template<typename T> 40 class DfsuThreadSafeQueue { 41 public: Push(std::unique_ptr<T> pt)42 void Push(std::unique_ptr<T> pt) 43 { 44 if (!pt) { 45 ThrowException(ERR_UTILS_ACTOR_INVALID_CMD, "Push an empty cmd"); 46 } 47 std::unique_lock<std::mutex> lock(mutex_); 48 if (!halted) { 49 queue_.emplace_back(std::move(pt)); 50 cv_.notify_one(); 51 } 52 } 53 PushFront(std::unique_ptr<T> pt)54 void PushFront(std::unique_ptr<T> pt) 55 { 56 if (!pt) { 57 ThrowException(ERR_UTILS_ACTOR_INVALID_CMD, "Push an empty cmd"); 58 } 59 std::unique_lock<std::mutex> lock(mutex_); 60 if (!halted) { 61 queue_.emplace_front(std::move(pt)); 62 cv_.notify_one(); 63 } 64 } 65 WaitAndPop()66 std::unique_ptr<T> WaitAndPop() 67 { 68 std::unique_lock<std::mutex> lock(mutex_); 69 cv_.wait(lock, [&] { return !queue_.empty() || halted; }); 70 if (halted && queue_.empty()) { 71 ThrowException(ERR_UTILS_ACTOR_QUEUE_STOP, "Queue was halted"); 72 } 73 74 auto res = std::move(queue_.front()); 75 queue_.pop_front(); 76 return std::move(res); 77 } 78 ForEach(std::function<void (const std::unique_ptr<T> &)> executor)79 void ForEach(std::function<void(const std::unique_ptr<T> &)> executor) 80 { 81 std::unique_lock<std::mutex> lock(mutex_); 82 std::for_each(queue_.begin(), queue_.end(), executor); 83 } 84 Halt()85 void Halt() 86 { 87 std::unique_lock<std::mutex> lock(mutex_); 88 halted = true; 89 cv_.notify_all(); 90 } 91 92 private: 93 std::deque<std::unique_ptr<T>> queue_; 94 std::mutex mutex_; 95 std::condition_variable cv_; 96 97 bool halted {false}; 98 }; 99 } // namespace DistributedFile 100 } // namespace Storage 101 } // namespace OHOS 102 #endif // DFSU_THREAD_SAFE_QUEUE_H