1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PREFERENCES_PRIORITY_QUEUE_H 17 #define PREFERENCES_PRIORITY_QUEUE_H 18 #include <map> 19 #include <memory> 20 #include <mutex> 21 #include <queue> 22 #include <set> 23 #include <shared_mutex> 24 25 namespace OHOS { 26 namespace NativePreferences { 27 template<typename _Tsk, typename _Tme, typename _Tid> 28 class PriorityQueue { 29 public: 30 struct PQMatrix { 31 _Tsk task_; 32 _Tid id_; PQMatrixPQMatrix33 PQMatrix(_Tsk task, _Tid id) : task_(task), id_(id) {} 34 }; 35 using TskIndex = typename std::map<_Tme, PQMatrix>::iterator; 36 using TskUpdater = typename std::function<std::pair<bool, _Tme>(_Tsk &element)>; 37 38 PriorityQueue(const _Tsk &task, TskUpdater updater = nullptr) 39 : INVALID_TSK(std::move(task)), updater_(std::move(updater)) 40 { 41 if (!updater_) { 42 updater_ = [](_Tsk &) { return std::pair{false, _Tme()};}; 43 } 44 } Pop()45 _Tsk Pop() 46 { 47 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 48 while (!tasks_.empty()) { 49 auto waitTme = tasks_.begin()->first; 50 if (waitTme > std::chrono::steady_clock::now()) { 51 popCv_.wait_until(lock, waitTme); 52 continue; 53 } 54 auto temp = tasks_.begin(); 55 auto id = temp->second.id_; 56 running_.emplace(id, temp->second); 57 auto res = std::move(temp->second.task_); 58 tasks_.erase(temp); 59 indexes_.erase(id); 60 return res; 61 } 62 return INVALID_TSK; 63 } 64 Push(_Tsk tsk,_Tid id,_Tme tme)65 bool Push(_Tsk tsk, _Tid id, _Tme tme) 66 { 67 std::unique_lock<std::mutex> lock(pqMtx_); 68 if (!tsk.Valid()) { 69 return false; 70 } 71 auto temp = tasks_.emplace(tme, PQMatrix(std::move(tsk), id)); 72 indexes_.emplace(id, temp); 73 popCv_.notify_all(); 74 return true; 75 } 76 Size()77 size_t Size() 78 { 79 std::lock_guard<std::mutex> lock(pqMtx_); 80 return tasks_.size(); 81 } 82 Find(_Tid id)83 _Tsk Find(_Tid id) 84 { 85 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 86 if (indexes_.find(id) != indexes_.end()) { 87 return indexes_[id]->second.task_; 88 } 89 return INVALID_TSK; 90 } 91 Update(_Tid id,TskUpdater updater)92 bool Update(_Tid id, TskUpdater updater) 93 { 94 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 95 auto index = indexes_.find(id); 96 if (index != indexes_.end()) { 97 auto [updated, time] = updater(index->second->second.task_); 98 if (!updated) { 99 return false; 100 } 101 auto matrix = std::move(index->second->second); 102 tasks_.erase(index->second); 103 index->second = tasks_.emplace(time, std::move(matrix)); 104 popCv_.notify_all(); 105 return true; 106 } 107 108 auto running = running_.find(id); 109 if (running != running_.end()) { 110 auto [updated, time] = updater((*running).second.task_); 111 return updated; 112 } 113 114 return false; 115 } 116 Remove(_Tid id,bool wait)117 bool Remove(_Tid id, bool wait) 118 { 119 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 120 removeCv_.wait(lock, [this, id, wait] { 121 return !wait || running_.find(id) == running_.end(); 122 }); 123 auto index = indexes_.find(id); 124 if (index == indexes_.end()) { 125 return false; 126 } 127 tasks_.erase(index->second); 128 indexes_.erase(index); 129 popCv_.notify_all(); 130 return true; 131 } 132 Clean()133 void Clean() 134 { 135 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 136 indexes_.clear(); 137 tasks_.clear(); 138 popCv_.notify_all(); 139 } 140 Finish(_Tid id)141 void Finish(_Tid id) 142 { 143 std::unique_lock<decltype(pqMtx_)> lock(pqMtx_); 144 auto it = running_.find(id); 145 if (it == running_.end()) { 146 return; 147 } 148 auto [repeat, time] = updater_(it->second.task_); 149 if (repeat) { 150 indexes_.emplace(id, tasks_.emplace(time, std::move(it->second))); 151 } 152 running_.erase(it); 153 removeCv_.notify_all(); 154 } 155 156 private: 157 const _Tsk INVALID_TSK; 158 std::mutex pqMtx_; 159 std::condition_variable popCv_; 160 std::condition_variable removeCv_; 161 std::multimap<_Tme, PQMatrix> tasks_; 162 std::map<_Tid, PQMatrix> running_; 163 std::map<_Tid, TskIndex> indexes_; 164 TskUpdater updater_; 165 }; 166 } //namespace NativePreferences 167 } // namespace OHOS 168 #endif //PREFERENCES_PRIORITY_QUEUE_H