1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 17 #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 18 #include <atomic> 19 #include <chrono> 20 #include <condition_variable> 21 #include <functional> 22 #include <limits> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <set> 27 #include <thread> 28 29 #include "visibility.h" 30 namespace OHOS { 31 class API_LOCAL TaskScheduler { 32 public: 33 using TaskId = uint64_t; 34 using Time = std::chrono::steady_clock::time_point; 35 using Duration = std::chrono::steady_clock::duration; 36 using Clock = std::chrono::steady_clock; 37 using Task = std::function<void()>; 38 inline static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 39 inline static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 40 inline static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); TaskScheduler(size_t capacity,const std::string & name)41 TaskScheduler(size_t capacity, const std::string &name) 42 { 43 capacity_ = capacity; 44 isRunning_ = true; 45 taskId_ = INVALID_TASK_ID; 46 running_ = InnerTask(); 47 thread_ = std::make_unique<std::thread>([this, name]() { 48 auto realName = std::string("scheduler_") + name; 49 pthread_setname_np(pthread_self(), realName.c_str()); 50 Loop(); 51 }); 52 } TaskScheduler(const std::string & name)53 TaskScheduler(const std::string &name) : TaskScheduler(std::numeric_limits<size_t>::max(), name) {} 54 TaskScheduler(size_t capacity = std::numeric_limits<size_t>::max()) : TaskScheduler(capacity, "") {} ~TaskScheduler()55 ~TaskScheduler() 56 { 57 isRunning_ = false; 58 Clean(); 59 Execute([]() {}); 60 thread_->join(); 61 } 62 // execute task at specific time 63 TaskId At(const Time &begin, Task task, Duration interval = INVALID_INTERVAL, uint64_t times = UNLIMITED_TIMES) 64 { 65 std::unique_lock<decltype(mutex_)> lock(mutex_); 66 if (tasks_.size() >= capacity_) { 67 return INVALID_TASK_ID; 68 } 69 InnerTask innerTask; 70 innerTask.times = times; 71 innerTask.taskId = GenTaskId(); 72 innerTask.interval = interval; 73 innerTask.exec = std::move(task); 74 auto it = tasks_.insert({ begin, innerTask}); 75 if (it == tasks_.begin()) { 76 condition_.notify_one(); 77 } 78 indexes_[innerTask.taskId] = it; 79 return innerTask.taskId; 80 } Reset(TaskId taskId,const Duration & interval)81 TaskId Reset(TaskId taskId, const Duration &interval) 82 { 83 std::unique_lock<decltype(mutex_)> lock(mutex_); 84 if (running_.taskId == taskId && running_.interval != INVALID_INTERVAL) { 85 running_.interval = interval; 86 return running_.taskId; 87 } 88 auto index = indexes_.find(taskId); 89 if (index == indexes_.end()) { 90 return INVALID_TASK_ID; 91 } 92 auto &innerTask = index->second->second; 93 if (innerTask.interval != INVALID_INTERVAL) { 94 innerTask.interval = interval; 95 } 96 auto it = tasks_.insert({ std::chrono::steady_clock::now() + interval, std::move(innerTask) }); 97 if (it == tasks_.begin() || index->second == tasks_.begin()) { 98 condition_.notify_one(); 99 } 100 tasks_.erase(index->second); 101 indexes_[taskId] = it; 102 return taskId; 103 } Clean()104 void Clean() 105 { 106 std::unique_lock<decltype(mutex_)> lock(mutex_); 107 indexes_.clear(); 108 tasks_.clear(); 109 } 110 // execute task periodically with duration Every(Duration interval,Task task)111 TaskId Every(Duration interval, Task task) 112 { 113 return At(std::chrono::steady_clock::now() + interval, task, interval); 114 } 115 // remove task in SchedulerTask 116 void Remove(TaskId taskId, bool wait = false) 117 { 118 std::unique_lock<decltype(mutex_)> lock(mutex_); 119 cond_.wait(lock, [this, taskId, wait]() { 120 return (!wait || running_.taskId != taskId); 121 }); 122 auto index = indexes_.find(taskId); 123 if (index == indexes_.end()) { 124 return; 125 } 126 tasks_.erase(index->second); 127 indexes_.erase(index); 128 condition_.notify_one(); 129 } 130 // execute task periodically with duration after delay Every(Duration delay,Duration interval,Task task)131 TaskId Every(Duration delay, Duration interval, Task task) 132 { 133 return At(std::chrono::steady_clock::now() + delay, task, interval); 134 } 135 // execute task for some times periodically with duration after delay Every(int32_t times,Duration delay,Duration interval,Task task)136 TaskId Every(int32_t times, Duration delay, Duration interval, Task task) 137 { 138 return At(std::chrono::steady_clock::now() + delay, task, interval, times); 139 } Execute(Task task)140 TaskId Execute(Task task) 141 { 142 return At(std::chrono::steady_clock::now(), std::move(task)); 143 } 144 private: 145 struct InnerTask { 146 TaskId taskId = INVALID_TASK_ID; 147 Duration interval = INVALID_INTERVAL; 148 uint64_t times = UNLIMITED_TIMES; 149 std::function<void()> exec; 150 }; Loop()151 void Loop() 152 { 153 while (isRunning_) { 154 std::function<void()> exec; 155 { 156 std::unique_lock<decltype(mutex_)> lock(mutex_); 157 condition_.wait(lock, [this] { 158 return !tasks_.empty(); 159 }); 160 if (tasks_.begin()->first > std::chrono::steady_clock::now()) { 161 auto time = tasks_.begin()->first; 162 condition_.wait_until(lock, time); 163 continue; 164 } 165 auto it = tasks_.begin(); 166 running_ = it->second; 167 exec = running_.exec; 168 indexes_.erase(running_.taskId); 169 tasks_.erase(it); 170 running_.times--; 171 } 172 if (exec) { 173 exec(); 174 } 175 { 176 std::unique_lock<decltype(mutex_)> lock(mutex_); 177 if (running_.interval != INVALID_INTERVAL && running_.times > 0) { 178 auto it = tasks_.insert({ std::chrono::steady_clock::now() + running_.interval, running_ }); 179 indexes_[running_.taskId] = it; 180 } 181 running_ = InnerTask(); 182 cond_.notify_all(); 183 } 184 } 185 } 186 GenTaskId()187 TaskId GenTaskId() 188 { 189 auto taskId = ++taskId_; 190 if (taskId == INVALID_TASK_ID) { 191 return ++taskId_; 192 } 193 return taskId; 194 } 195 196 volatile bool isRunning_; 197 size_t capacity_; 198 std::multimap<Time, InnerTask> tasks_; 199 std::map<TaskId, decltype(tasks_)::iterator> indexes_; 200 InnerTask running_; 201 std::mutex mutex_; 202 std::unique_ptr<std::thread> thread_; 203 std::condition_variable condition_; 204 std::condition_variable cond_; 205 std::atomic<uint64_t> taskId_; 206 }; 207 } // namespace OHOS 208 #endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 209