1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PREFERENCES_FRAMEWORKS_EXECUTOR_H 17 #define PREFERENCES_FRAMEWORKS_EXECUTOR_H 18 #include <condition_variable> 19 #include <mutex> 20 #include <queue> 21 #include <thread> 22 23 #include "preferences_thread.h" 24 #include "priority_queue.h" 25 namespace OHOS { 26 namespace NativePreferences { 27 28 class Executor : public std::enable_shared_from_this<Executor> { 29 public: 30 using TaskId = uint64_t; 31 using Task = std::function<void()>; 32 using Duration = std::chrono::steady_clock::duration; 33 using Time = std::chrono::steady_clock::time_point; 34 static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>(); 35 static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 36 static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); 37 static constexpr Duration INVALID_DELAY = std::chrono::seconds(0); 38 static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 39 40 enum Status { 41 RUNNING, 42 IS_STOPPING, 43 STOPPED 44 }; 45 struct InnerTask { 46 std::function<void()> exec = []() {}; 47 Duration interval = INVALID_INTERVAL; 48 uint64_t times = UNLIMITED_TIMES; 49 TaskId taskId = INVALID_TASK_ID; 50 InnerTask() = default; 51 ValidInnerTask52 bool Valid() const 53 { 54 return taskId != INVALID_TASK_ID; 55 } 56 }; 57 Executor(const std::string & name)58 Executor(const std::string &name) 59 : thread_([this, name] { 60 auto realName = std::string("task_queue_") + name; 61 PthreadSetNameNp(realName); 62 Run(); 63 self_ = nullptr; 64 }) 65 { 66 thread_.detach(); 67 } 68 Executor()69 Executor() 70 : thread_([this] { 71 PthreadSetNameNp("Executor"); 72 Run(); 73 self_ = nullptr; 74 }) 75 { 76 thread_.detach(); 77 } 78 Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)79 void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle, 80 std::function<bool(std::shared_ptr<Executor>, bool)> release) 81 { 82 std::unique_lock<decltype(mutex_)> lock(mutex_); 83 self_ = shared_from_this(); 84 waits_ = queue; 85 idle_ = std::move(idle); 86 release_ = std::move(release); 87 condition_.notify_one(); 88 } 89 90 void Stop(bool wait = false) noexcept 91 { 92 std::unique_lock<decltype(mutex_)> lock(mutex_); 93 running_ = IS_STOPPING; 94 condition_.notify_one(); 95 cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; }); 96 } 97 98 private: 99 static constexpr Duration TIME_OUT = std::chrono::seconds(2); Run()100 void Run() 101 { 102 std::unique_lock<decltype(mutex_)> lock(mutex_); 103 do { 104 do { 105 condition_.wait(lock, [this] { 106 return running_ == IS_STOPPING || waits_ != nullptr; 107 }); 108 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) { 109 auto currentTask = waits_->Pop(); 110 lock.unlock(); 111 currentTask.exec(); 112 lock.lock(); 113 waits_->Finish(currentTask.taskId); 114 } 115 if (!idle_(self_) && running_ == RUNNING) { 116 continue; 117 } 118 waits_ = nullptr; 119 } while (running_ == RUNNING && 120 condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() { 121 return waits_ != nullptr; 122 })); 123 } while (!release_(self_, running_ == IS_STOPPING)); 124 running_ = STOPPED; 125 cond_.notify_all(); 126 } 127 128 Status running_ = RUNNING; 129 std::mutex mutex_; 130 std::condition_variable condition_; 131 std::condition_variable cond_; 132 std::shared_ptr<Executor> self_; 133 PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr; 134 std::function<bool(std::shared_ptr<Executor>)> idle_; 135 std::function<bool(std::shared_ptr<Executor>, bool)> release_; 136 std::thread thread_; 137 }; 138 } // namespace NativePreferences 139 } // namespace OHOS 140 #endif // PREFERENCES_FRAMEWORKS_EXECUTOR_H