1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 17 #define COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 18 19 #include <array> 20 #include <atomic> 21 #include <condition_variable> 22 #include <map> 23 #include <memory> 24 #include <mutex> 25 #include <set> 26 #include <thread> 27 28 namespace OHOS::NetManagerStandard { 29 template <typename T, size_t ARRAY_SIZE, size_t DELAYED_COUNT> class DelayedQueue { 30 public: DelayedQueue()31 DelayedQueue() : index_(0), needRun_(true) 32 { 33 pthread_ = std::thread([this]() { 34 while (needRun_) { 35 { 36 std::lock_guard<std::mutex> guard(mutex_); 37 for (const auto &elem : elems_[index_]) { 38 if (elem) { 39 elem->Execute(); 40 } 41 indexMap_.erase(elem); 42 } 43 elems_[index_].clear(); 44 } 45 if (!needRun_) { 46 break; 47 } 48 std::unique_lock<std::mutex> needRunLock(needRunMutex_); 49 needRunCondition_.wait_for(needRunLock, std::chrono::seconds(1), [this] { return !needRun_; }); 50 std::lock_guard<std::mutex> guard(mutex_); 51 index_ = (index_ + 1) % (ARRAY_SIZE + DELAYED_COUNT); 52 } 53 }); 54 } 55 ~DelayedQueue()56 ~DelayedQueue() 57 { 58 // set needRun_ = false, and notify the thread to wake 59 needRun_ = false; 60 needRunCondition_.notify_all(); 61 if (pthread_.joinable()) { 62 pthread_.join(); 63 } 64 } 65 Put(const std::shared_ptr<T> & elem)66 void Put(const std::shared_ptr<T> &elem) 67 { 68 std::lock_guard<std::mutex> guard(mutex_); 69 if (indexMap_.find(elem) != indexMap_.end()) { 70 int oldIndex = indexMap_[elem]; 71 if (oldIndex >= 0 && oldIndex < static_cast<int>(elems_.size()) && 72 (elems_[oldIndex].find(elem) != elems_[oldIndex].end())) { 73 elems_[oldIndex].erase(elem); 74 } 75 } 76 int index = (index_ + DELAYED_COUNT) % (ARRAY_SIZE + DELAYED_COUNT); 77 elems_[index].insert(elem); 78 indexMap_[elem] = index; 79 } 80 81 private: 82 std::thread pthread_; 83 int index_; 84 std::mutex mutex_; 85 std::atomic_bool needRun_; 86 std::condition_variable needRunCondition_; 87 std::mutex needRunMutex_; 88 std::array<std::set<std::shared_ptr<T>, std::owner_less<std::shared_ptr<T>>>, ARRAY_SIZE + DELAYED_COUNT> elems_; 89 std::map<std::shared_ptr<T>, int, std::owner_less<std::shared_ptr<T>>> indexMap_; 90 }; 91 } // namespace OHOS::NetManagerStandard 92 93 #endif // COMMUNICATION_NETMANAGER_BASE_DELAYED_QUEUE_H 94