1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PREFERENCES_PRIORITY_QUEUE_H
17 #define PREFERENCES_PRIORITY_QUEUE_H
18 #include <map>
19 #include <memory>
20 #include <mutex>
21 #include <queue>
22 #include <set>
23 #include <shared_mutex>
24 
25 namespace OHOS {
26 namespace NativePreferences {
27 template<typename _Tsk, typename _Tme, typename _Tid>
28 class PriorityQueue {
29 public:
30     struct PQMatrix {
31         _Tsk task_;
32         _Tid id_;
PQMatrixPQMatrix33         PQMatrix(_Tsk task, _Tid id) : task_(task), id_(id) {}
34     };
35     using TskIndex = typename std::map<_Tme, PQMatrix>::iterator;
36     using TskUpdater = typename std::function<std::pair<bool, _Tme>(_Tsk &element)>;
37 
38     PriorityQueue(const _Tsk &task, TskUpdater updater = nullptr)
39         : INVALID_TSK(std::move(task)), updater_(std::move(updater))
40     {
41         if (!updater_) {
42             updater_ = [](_Tsk &) { return std::pair{false, _Tme()};};
43         }
44     }
Pop()45     _Tsk Pop()
46     {
47         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
48         while (!tasks_.empty()) {
49             auto waitTme = tasks_.begin()->first;
50             if (waitTme > std::chrono::steady_clock::now()) {
51                 popCv_.wait_until(lock, waitTme);
52                 continue;
53             }
54             auto temp = tasks_.begin();
55             auto id = temp->second.id_;
56             running_.emplace(id, temp->second);
57             auto res = std::move(temp->second.task_);
58             tasks_.erase(temp);
59             indexes_.erase(id);
60             return res;
61         }
62         return INVALID_TSK;
63     }
64 
Push(_Tsk tsk,_Tid id,_Tme tme)65     bool Push(_Tsk tsk, _Tid id, _Tme tme)
66     {
67         std::unique_lock<std::mutex> lock(pqMtx_);
68         if (!tsk.Valid()) {
69             return false;
70         }
71         auto temp = tasks_.emplace(tme, PQMatrix(std::move(tsk), id));
72         indexes_.emplace(id, temp);
73         popCv_.notify_all();
74         return true;
75     }
76 
Size()77     size_t Size()
78     {
79         std::lock_guard<std::mutex> lock(pqMtx_);
80         return tasks_.size();
81     }
82 
Find(_Tid id)83     _Tsk Find(_Tid id)
84     {
85         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
86         if (indexes_.find(id) != indexes_.end()) {
87             return indexes_[id]->second.task_;
88         }
89         return INVALID_TSK;
90     }
91 
Update(_Tid id,TskUpdater updater)92     bool Update(_Tid id, TskUpdater updater)
93     {
94         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
95         auto index = indexes_.find(id);
96         if (index != indexes_.end()) {
97             auto [updated, time] = updater(index->second->second.task_);
98             if (!updated) {
99                 return false;
100             }
101             auto matrix = std::move(index->second->second);
102             tasks_.erase(index->second);
103             index->second = tasks_.emplace(time, std::move(matrix));
104             popCv_.notify_all();
105             return true;
106         }
107 
108         auto running = running_.find(id);
109         if (running != running_.end()) {
110             auto [updated, time] = updater((*running).second.task_);
111             return updated;
112         }
113 
114         return false;
115     }
116 
Remove(_Tid id,bool wait)117     bool Remove(_Tid id, bool wait)
118     {
119         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
120         removeCv_.wait(lock, [this, id, wait] {
121             return !wait || running_.find(id) == running_.end();
122         });
123         auto index = indexes_.find(id);
124         if (index == indexes_.end()) {
125             return false;
126         }
127         tasks_.erase(index->second);
128         indexes_.erase(index);
129         popCv_.notify_all();
130         return true;
131     }
132 
Clean()133     void Clean()
134     {
135         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
136         indexes_.clear();
137         tasks_.clear();
138         popCv_.notify_all();
139     }
140 
Finish(_Tid id)141     void Finish(_Tid id)
142     {
143         std::unique_lock<decltype(pqMtx_)> lock(pqMtx_);
144         auto it = running_.find(id);
145         if (it == running_.end()) {
146             return;
147         }
148         auto [repeat, time] = updater_(it->second.task_);
149         if (repeat) {
150             indexes_.emplace(id, tasks_.emplace(time, std::move(it->second)));
151         }
152         running_.erase(it);
153         removeCv_.notify_all();
154     }
155 
156 private:
157     const _Tsk INVALID_TSK;
158     std::mutex pqMtx_;
159     std::condition_variable popCv_;
160     std::condition_variable removeCv_;
161     std::multimap<_Tme, PQMatrix> tasks_;
162     std::map<_Tid, PQMatrix> running_;
163     std::map<_Tid, TskIndex> indexes_;
164     TskUpdater updater_;
165 };
166 } //namespace NativePreferences
167 } // namespace OHOS
168 #endif //PREFERENCES_PRIORITY_QUEUE_H