1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H
17 #define PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <queue>
22 #include <thread>
23 
24 #include "executor.h"
25 #include "log_print.h"
26 #include "pool.h"
27 #include "priority_queue.h"
28 
29 namespace OHOS {
30 namespace NativePreferences {
31 class ExecutorPool {
32 public:
33     using TaskId = Executor::TaskId;
34     using Task = Executor::Task;
35     using Duration = Executor::Duration;
36     using Time = Executor::Time;
37     using InnerTask = Executor::InnerTask;
38     using Status = Executor::Status;
39     using TaskQueue = PriorityQueue<InnerTask, Time, TaskId>;
40     static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>();
41     static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0);
42     static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max();
43     static constexpr Duration INVALID_DELAY = std::chrono::seconds(0);
44     static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l);
45 
ExecutorPool(size_t max,size_t min)46     ExecutorPool(size_t max, size_t min)
47         : pool_(max, min), delayTasks_(InnerTask(), NextTimer), taskId_(INVALID_TASK_ID)
48     {
49         // When max equals 1, timer thread schedules and executes tasks.
50         if (max > 1) {
51             execs_ = new (std::nothrow) TaskQueue(InnerTask());
52         }
53     }
54 
~ExecutorPool()55     ~ExecutorPool()
56     {
57         poolStatus = Status::IS_STOPPING;
58         if (execs_ != nullptr) {
59             execs_->Clean();
60         }
61         delayTasks_.Clean();
62         std::shared_ptr<Executor> scheduler;
63         {
64             std::lock_guard<decltype(mtx_)> scheduleLock(mtx_);
65             scheduler = std::move(scheduler_);
66         }
67         if (scheduler != nullptr) {
68             scheduler->Stop(true);
69         }
70         pool_.Clean([](std::shared_ptr<Executor> executor) {
71             executor->Stop(true);
72         });
73         delete execs_;
74         poolStatus = Status::STOPPED;
75     }
76 
Execute(Task task)77     TaskId Execute(Task task)
78     {
79         if (poolStatus != Status::RUNNING) {
80             LOG_ERROR("execute task failed.");
81             return INVALID_TASK_ID;
82         }
83 
84         if (execs_ == nullptr) {
85             return Schedule(std::move(task), INVALID_DELAY, INVALID_INTERVAL, UNLIMITED_TIMES);
86         }
87 
88         return Execute(std::move(task), GenTaskId());
89     }
90 
Schedule(Duration delay,Task task)91     TaskId Schedule(Duration delay, Task task)
92     {
93         return Schedule(std::move(task), delay, INVALID_INTERVAL, 1);
94     }
95 
Schedule(Task task,Duration interval)96     TaskId Schedule(Task task, Duration interval)
97     {
98         return Schedule(std::move(task), INVALID_DELAY, interval, UNLIMITED_TIMES);
99     }
100 
Schedule(Task task,Duration delay,Duration interval)101     TaskId Schedule(Task task, Duration delay, Duration interval)
102     {
103         return Schedule(std::move(task), delay, interval, UNLIMITED_TIMES);
104     }
105 
Schedule(Task task,Duration delay,Duration interval,uint64_t times)106     TaskId Schedule(Task task, Duration delay, Duration interval, uint64_t times)
107     {
108         InnerTask innerTask;
109         innerTask.exec = std::move(task);
110         innerTask.interval = interval;
111         innerTask.times = times;
112         innerTask.taskId = GenTaskId();
113         return Schedule(std::move(innerTask), std::chrono::steady_clock::now() + delay);
114     }
115 
116     bool Remove(TaskId taskId, bool wait = false)
117     {
118         bool res = true;
119         auto delay = delayTasks_.Find(taskId);
120         if (!delay.Valid()) {
121             res = false;
122         }
123         delayTasks_.Remove(taskId, wait);
124         if (execs_ != nullptr) {
125             execs_->Remove(taskId, wait);
126         }
127         return res;
128     }
129 
Reset(TaskId taskId,Duration interval)130     TaskId Reset(TaskId taskId, Duration interval)
131     {
132         auto updated = delayTasks_.Update(taskId, [interval](InnerTask &task) -> std::pair<bool, Time> {
133             if (task.interval != INVALID_INTERVAL) {
134                 task.interval = interval;
135             }
136             auto time = std::chrono::steady_clock::now() + interval;
137             return std::pair{ true, time };
138         });
139         return updated ? taskId : INVALID_TASK_ID;
140     }
141 
142 private:
Execute(Task task,TaskId taskId)143     TaskId Execute(Task task, TaskId taskId)
144     {
145         InnerTask innerTask;
146         innerTask.exec = task;
147         innerTask.taskId = taskId;
148         execs_->Push(std::move(innerTask), taskId, INVALID_TIME);
149         auto executor = pool_.Get();
150         if (executor == nullptr) {
151             return taskId;
152         }
153         executor->Bind(
154             execs_,
155             [this](std::shared_ptr<Executor> exe) {
156                 pool_.Idle(exe);
157                 return true;
158             },
159             [this](std::shared_ptr<Executor> exe, bool force) -> bool {
160                 return pool_.Release(exe, force);
161             });
162         return taskId;
163     }
164 
Schedule(InnerTask innerTask,Time delay)165     TaskId Schedule(InnerTask innerTask, Time delay)
166     {
167         auto id = innerTask.taskId;
168         if (execs_ != nullptr) {
169             auto func = innerTask.exec;
170             auto run = [this, func, id]() {
171                 Execute(func, id);
172             };
173             innerTask.exec = run;
174         }
175         delayTasks_.Push(std::move(innerTask), id, delay);
176         std::lock_guard<decltype(mtx_)> scheduleLock(mtx_);
177         if (scheduler_ == nullptr) {
178             scheduler_ = pool_.Get(true);
179             scheduler_->Bind(
180                 &delayTasks_,
181                 [this](std::shared_ptr<Executor> exe) {
182                     std::unique_lock<decltype(mtx_)> lock(mtx_);
183                     if (delayTasks_.Size() != 0) {
184                         return false;
185                     }
186                     scheduler_ = nullptr;
187                     pool_.Idle(exe);
188                     return true;
189                 },
190                 [this](std::shared_ptr<Executor> exe, bool force) -> bool {
191                     return pool_.Release(exe, force);
192                 });
193         }
194         return innerTask.taskId;
195     }
196 
GenTaskId()197     TaskId GenTaskId()
198     {
199         auto taskId = ++taskId_;
200         if (taskId == INVALID_TASK_ID) {
201             taskId = ++taskId_;
202         }
203         return taskId;
204     }
205 
NextTimer(InnerTask & task)206     static std::pair<bool, Time> NextTimer(InnerTask &task)
207     {
208         if (task.interval != INVALID_INTERVAL && --task.times > 0) {
209             auto time = std::chrono::steady_clock::now() + task.interval;
210             return { true, time };
211         }
212         return { false, INVALID_TIME };
213     }
214 
215     Status poolStatus = Status::RUNNING;
216     std::mutex mtx_;
217     Pool<Executor> pool_;
218     TaskQueue delayTasks_;
219     std::shared_ptr<Executor> scheduler_ = nullptr;
220     TaskQueue *execs_ = nullptr;
221     std::atomic<TaskId> taskId_;
222 };
223 } // namespace NativePreferences
224 } // namespace OHOS
225 
226 #endif // PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H