1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
18 #include <condition_variable>
19 #include <mutex>
20 #include <queue>
21 #include <thread>
22 #include "priority_queue.h"
23 
24 namespace OHOS {
25 class Executor : public std::enable_shared_from_this<Executor> {
26 public:
27     using TaskId = uint64_t;
28     using Task = std::function<void()>;
29     using Duration = std::chrono::steady_clock::duration;
30     using Time = std::chrono::steady_clock::time_point;
31     static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>();
32     static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0);
33     static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max();
34     static constexpr Duration INVALID_DELAY = std::chrono::seconds(0);
35     static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l);
36 
37     enum Status {
38         RUNNING,
39         IS_STOPPING,
40         STOPPED
41     };
42     struct InnerTask {
43         std::function<void()> exec = []() {};
44         Duration interval = INVALID_INTERVAL;
45         uint64_t times = UNLIMITED_TIMES;
46         TaskId taskId = INVALID_TASK_ID;
47         InnerTask() = default;
48 
ValidInnerTask49         bool Valid() const
50         {
51             return taskId != INVALID_TASK_ID;
52         }
53     };
54 
Executor()55     Executor()
56         : thread_([this] {
57               pthread_setname_np(pthread_self(), "OS_TaskExecutor");
58               Run();
59               self_ = nullptr;
60           })
61     {
62         thread_.detach();
63     }
64 
Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)65     void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle,
66         std::function<bool(std::shared_ptr<Executor>, bool)> release)
67     {
68         std::unique_lock<decltype(mutex_)> lock(mutex_);
69         self_ = shared_from_this();
70         waits_ = queue;
71         idle_ = std::move(idle);
72         release_ = std::move(release);
73         condition_.notify_one();
74     }
75 
76     void Stop(bool wait = false) noexcept
77     {
78         std::unique_lock<decltype(mutex_)> lock(mutex_);
79         running_ = IS_STOPPING;
80         condition_.notify_one();
81         cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; });
82     }
83 
84 private:
85     static constexpr Duration TIME_OUT = std::chrono::seconds(2);
Run()86     void Run()
87     {
88         std::unique_lock<decltype(mutex_)> lock(mutex_);
89         do {
90             do {
91                 condition_.wait(lock, [this] {
92                     return running_ == IS_STOPPING || waits_ != nullptr;
93                 });
94                 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) {
95                     auto currentTask = waits_->Pop();
96                     lock.unlock();
97                     currentTask.exec();
98                     lock.lock();
99                     waits_->Finish(currentTask.taskId);
100                 }
101                 if (!idle_(self_) && running_ == RUNNING) {
102                     continue;
103                 }
104                 waits_ = nullptr;
105             } while (running_ == RUNNING &&
106                      condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() {
107                          return waits_ != nullptr;
108                      }));
109         } while (!release_(self_, running_ == IS_STOPPING));
110         running_ = STOPPED;
111         cond_.notify_all();
112     }
113 
114     Status running_ = RUNNING;
115     std::mutex mutex_;
116     std::condition_variable condition_;
117     std::condition_variable cond_;
118     std::shared_ptr<Executor> self_;
119     PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr;
120     std::function<bool(std::shared_ptr<Executor>)> idle_;
121     std::function<bool(std::shared_ptr<Executor>, bool)> release_;
122     std::thread thread_;
123 };
124 } // namespace OHOS
125 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
126