1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 18 #include <condition_variable> 19 #include <mutex> 20 #include <queue> 21 #include <thread> 22 #include "priority_queue.h" 23 24 namespace OHOS { 25 class Executor : public std::enable_shared_from_this<Executor> { 26 public: 27 using TaskId = uint64_t; 28 using Task = std::function<void()>; 29 using Duration = std::chrono::steady_clock::duration; 30 using Time = std::chrono::steady_clock::time_point; 31 static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>(); 32 static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 33 static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); 34 static constexpr Duration INVALID_DELAY = std::chrono::seconds(0); 35 static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 36 37 enum Status { 38 RUNNING, 39 IS_STOPPING, 40 STOPPED 41 }; 42 struct InnerTask { 43 std::function<void()> exec = []() {}; 44 Duration interval = INVALID_INTERVAL; 45 uint64_t times = UNLIMITED_TIMES; 46 TaskId taskId = INVALID_TASK_ID; 47 InnerTask() = default; 48 ValidInnerTask49 bool Valid() const 50 { 51 return taskId != INVALID_TASK_ID; 52 } 53 }; 54 Executor()55 Executor() 56 : thread_([this] { 57 pthread_setname_np(pthread_self(), "OS_TaskExecutor"); 58 Run(); 59 self_ = nullptr; 60 }) 61 { 62 thread_.detach(); 63 } 64 Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)65 void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle, 66 std::function<bool(std::shared_ptr<Executor>, bool)> release) 67 { 68 std::unique_lock<decltype(mutex_)> lock(mutex_); 69 self_ = shared_from_this(); 70 waits_ = queue; 71 idle_ = std::move(idle); 72 release_ = std::move(release); 73 condition_.notify_one(); 74 } 75 76 void Stop(bool wait = false) noexcept 77 { 78 std::unique_lock<decltype(mutex_)> lock(mutex_); 79 running_ = IS_STOPPING; 80 condition_.notify_one(); 81 cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; }); 82 } 83 84 private: 85 static constexpr Duration TIME_OUT = std::chrono::seconds(2); Run()86 void Run() 87 { 88 std::unique_lock<decltype(mutex_)> lock(mutex_); 89 do { 90 do { 91 condition_.wait(lock, [this] { 92 return running_ == IS_STOPPING || waits_ != nullptr; 93 }); 94 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) { 95 auto currentTask = waits_->Pop(); 96 lock.unlock(); 97 currentTask.exec(); 98 lock.lock(); 99 waits_->Finish(currentTask.taskId); 100 } 101 if (!idle_(self_) && running_ == RUNNING) { 102 continue; 103 } 104 waits_ = nullptr; 105 } while (running_ == RUNNING && 106 condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() { 107 return waits_ != nullptr; 108 })); 109 } while (!release_(self_, running_ == IS_STOPPING)); 110 running_ = STOPPED; 111 cond_.notify_all(); 112 } 113 114 Status running_ = RUNNING; 115 std::mutex mutex_; 116 std::condition_variable condition_; 117 std::condition_variable cond_; 118 std::shared_ptr<Executor> self_; 119 PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr; 120 std::function<bool(std::shared_ptr<Executor>)> idle_; 121 std::function<bool(std::shared_ptr<Executor>, bool)> release_; 122 std::thread thread_; 123 }; 124 } // namespace OHOS 125 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 126