1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H 17 #define PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <queue> 22 #include <thread> 23 24 #include "executor.h" 25 #include "log_print.h" 26 #include "pool.h" 27 #include "priority_queue.h" 28 29 namespace OHOS { 30 namespace NativePreferences { 31 class ExecutorPool { 32 public: 33 using TaskId = Executor::TaskId; 34 using Task = Executor::Task; 35 using Duration = Executor::Duration; 36 using Time = Executor::Time; 37 using InnerTask = Executor::InnerTask; 38 using Status = Executor::Status; 39 using TaskQueue = PriorityQueue<InnerTask, Time, TaskId>; 40 static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>(); 41 static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 42 static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); 43 static constexpr Duration INVALID_DELAY = std::chrono::seconds(0); 44 static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 45 ExecutorPool(size_t max,size_t min)46 ExecutorPool(size_t max, size_t min) 47 : pool_(max, min), delayTasks_(InnerTask(), NextTimer), taskId_(INVALID_TASK_ID) 48 { 49 // When max equals 1, timer thread schedules and executes tasks. 50 if (max > 1) { 51 execs_ = new (std::nothrow) TaskQueue(InnerTask()); 52 } 53 } 54 ~ExecutorPool()55 ~ExecutorPool() 56 { 57 poolStatus = Status::IS_STOPPING; 58 if (execs_ != nullptr) { 59 execs_->Clean(); 60 } 61 delayTasks_.Clean(); 62 std::shared_ptr<Executor> scheduler; 63 { 64 std::lock_guard<decltype(mtx_)> scheduleLock(mtx_); 65 scheduler = std::move(scheduler_); 66 } 67 if (scheduler != nullptr) { 68 scheduler->Stop(true); 69 } 70 pool_.Clean([](std::shared_ptr<Executor> executor) { 71 executor->Stop(true); 72 }); 73 delete execs_; 74 poolStatus = Status::STOPPED; 75 } 76 Execute(Task task)77 TaskId Execute(Task task) 78 { 79 if (poolStatus != Status::RUNNING) { 80 LOG_ERROR("execute task failed."); 81 return INVALID_TASK_ID; 82 } 83 84 if (execs_ == nullptr) { 85 return Schedule(std::move(task), INVALID_DELAY, INVALID_INTERVAL, UNLIMITED_TIMES); 86 } 87 88 return Execute(std::move(task), GenTaskId()); 89 } 90 Schedule(Duration delay,Task task)91 TaskId Schedule(Duration delay, Task task) 92 { 93 return Schedule(std::move(task), delay, INVALID_INTERVAL, 1); 94 } 95 Schedule(Task task,Duration interval)96 TaskId Schedule(Task task, Duration interval) 97 { 98 return Schedule(std::move(task), INVALID_DELAY, interval, UNLIMITED_TIMES); 99 } 100 Schedule(Task task,Duration delay,Duration interval)101 TaskId Schedule(Task task, Duration delay, Duration interval) 102 { 103 return Schedule(std::move(task), delay, interval, UNLIMITED_TIMES); 104 } 105 Schedule(Task task,Duration delay,Duration interval,uint64_t times)106 TaskId Schedule(Task task, Duration delay, Duration interval, uint64_t times) 107 { 108 InnerTask innerTask; 109 innerTask.exec = std::move(task); 110 innerTask.interval = interval; 111 innerTask.times = times; 112 innerTask.taskId = GenTaskId(); 113 return Schedule(std::move(innerTask), std::chrono::steady_clock::now() + delay); 114 } 115 116 bool Remove(TaskId taskId, bool wait = false) 117 { 118 bool res = true; 119 auto delay = delayTasks_.Find(taskId); 120 if (!delay.Valid()) { 121 res = false; 122 } 123 delayTasks_.Remove(taskId, wait); 124 if (execs_ != nullptr) { 125 execs_->Remove(taskId, wait); 126 } 127 return res; 128 } 129 Reset(TaskId taskId,Duration interval)130 TaskId Reset(TaskId taskId, Duration interval) 131 { 132 auto updated = delayTasks_.Update(taskId, [interval](InnerTask &task) -> std::pair<bool, Time> { 133 if (task.interval != INVALID_INTERVAL) { 134 task.interval = interval; 135 } 136 auto time = std::chrono::steady_clock::now() + interval; 137 return std::pair{ true, time }; 138 }); 139 return updated ? taskId : INVALID_TASK_ID; 140 } 141 142 private: Execute(Task task,TaskId taskId)143 TaskId Execute(Task task, TaskId taskId) 144 { 145 InnerTask innerTask; 146 innerTask.exec = task; 147 innerTask.taskId = taskId; 148 execs_->Push(std::move(innerTask), taskId, INVALID_TIME); 149 auto executor = pool_.Get(); 150 if (executor == nullptr) { 151 return taskId; 152 } 153 executor->Bind( 154 execs_, 155 [this](std::shared_ptr<Executor> exe) { 156 pool_.Idle(exe); 157 return true; 158 }, 159 [this](std::shared_ptr<Executor> exe, bool force) -> bool { 160 return pool_.Release(exe, force); 161 }); 162 return taskId; 163 } 164 Schedule(InnerTask innerTask,Time delay)165 TaskId Schedule(InnerTask innerTask, Time delay) 166 { 167 auto id = innerTask.taskId; 168 if (execs_ != nullptr) { 169 auto func = innerTask.exec; 170 auto run = [this, func, id]() { 171 Execute(func, id); 172 }; 173 innerTask.exec = run; 174 } 175 delayTasks_.Push(std::move(innerTask), id, delay); 176 std::lock_guard<decltype(mtx_)> scheduleLock(mtx_); 177 if (scheduler_ == nullptr) { 178 scheduler_ = pool_.Get(true); 179 scheduler_->Bind( 180 &delayTasks_, 181 [this](std::shared_ptr<Executor> exe) { 182 std::unique_lock<decltype(mtx_)> lock(mtx_); 183 if (delayTasks_.Size() != 0) { 184 return false; 185 } 186 scheduler_ = nullptr; 187 pool_.Idle(exe); 188 return true; 189 }, 190 [this](std::shared_ptr<Executor> exe, bool force) -> bool { 191 return pool_.Release(exe, force); 192 }); 193 } 194 return innerTask.taskId; 195 } 196 GenTaskId()197 TaskId GenTaskId() 198 { 199 auto taskId = ++taskId_; 200 if (taskId == INVALID_TASK_ID) { 201 taskId = ++taskId_; 202 } 203 return taskId; 204 } 205 NextTimer(InnerTask & task)206 static std::pair<bool, Time> NextTimer(InnerTask &task) 207 { 208 if (task.interval != INVALID_INTERVAL && --task.times > 0) { 209 auto time = std::chrono::steady_clock::now() + task.interval; 210 return { true, time }; 211 } 212 return { false, INVALID_TIME }; 213 } 214 215 Status poolStatus = Status::RUNNING; 216 std::mutex mtx_; 217 Pool<Executor> pool_; 218 TaskQueue delayTasks_; 219 std::shared_ptr<Executor> scheduler_ = nullptr; 220 TaskQueue *execs_ = nullptr; 221 std::atomic<TaskId> taskId_; 222 }; 223 } // namespace NativePreferences 224 } // namespace OHOS 225 226 #endif // PREFERENCES_FRAMEWORKS_EXECUTOR_POOL_H