1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PREFERENCES_FRAMEWORKS_EXECUTOR_H
17 #define PREFERENCES_FRAMEWORKS_EXECUTOR_H
18 #include <condition_variable>
19 #include <mutex>
20 #include <queue>
21 #include <thread>
22 
23 #include "preferences_thread.h"
24 #include "priority_queue.h"
25 namespace OHOS {
26 namespace NativePreferences {
27 
28 class Executor : public std::enable_shared_from_this<Executor> {
29 public:
30     using TaskId = uint64_t;
31     using Task = std::function<void()>;
32     using Duration = std::chrono::steady_clock::duration;
33     using Time = std::chrono::steady_clock::time_point;
34     static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>();
35     static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0);
36     static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max();
37     static constexpr Duration INVALID_DELAY = std::chrono::seconds(0);
38     static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l);
39 
40     enum Status {
41         RUNNING,
42         IS_STOPPING,
43         STOPPED
44     };
45     struct InnerTask {
46         std::function<void()> exec = []() {};
47         Duration interval = INVALID_INTERVAL;
48         uint64_t times = UNLIMITED_TIMES;
49         TaskId taskId = INVALID_TASK_ID;
50         InnerTask() = default;
51 
ValidInnerTask52         bool Valid() const
53         {
54             return taskId != INVALID_TASK_ID;
55         }
56     };
57 
Executor(const std::string & name)58     Executor(const std::string &name)
59         : thread_([this, name] {
60               auto realName = std::string("task_queue_") + name;
61               PthreadSetNameNp(realName);
62               Run();
63               self_ = nullptr;
64           })
65     {
66         thread_.detach();
67     }
68 
Executor()69     Executor()
70         : thread_([this] {
71               PthreadSetNameNp("Executor");
72               Run();
73               self_ = nullptr;
74           })
75     {
76         thread_.detach();
77     }
78 
Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)79     void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle,
80         std::function<bool(std::shared_ptr<Executor>, bool)> release)
81     {
82         std::unique_lock<decltype(mutex_)> lock(mutex_);
83         self_ = shared_from_this();
84         waits_ = queue;
85         idle_ = std::move(idle);
86         release_ = std::move(release);
87         condition_.notify_one();
88     }
89 
90     void Stop(bool wait = false) noexcept
91     {
92         std::unique_lock<decltype(mutex_)> lock(mutex_);
93         running_ = IS_STOPPING;
94         condition_.notify_one();
95         cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; });
96     }
97 
98 private:
99     static constexpr Duration TIME_OUT = std::chrono::seconds(2);
Run()100     void Run()
101     {
102         std::unique_lock<decltype(mutex_)> lock(mutex_);
103         do {
104             do {
105                 condition_.wait(lock, [this] {
106                     return running_ == IS_STOPPING || waits_ != nullptr;
107                 });
108                 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) {
109                     auto currentTask = waits_->Pop();
110                     lock.unlock();
111                     currentTask.exec();
112                     lock.lock();
113                     waits_->Finish(currentTask.taskId);
114                 }
115                 if (!idle_(self_) && running_ == RUNNING) {
116                     continue;
117                 }
118                 waits_ = nullptr;
119             } while (running_ == RUNNING &&
120                      condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() {
121                          return waits_ != nullptr;
122                      }));
123         } while (!release_(self_, running_ == IS_STOPPING));
124         running_ = STOPPED;
125         cond_.notify_all();
126     }
127 
128     Status running_ = RUNNING;
129     std::mutex mutex_;
130     std::condition_variable condition_;
131     std::condition_variable cond_;
132     std::shared_ptr<Executor> self_;
133     PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr;
134     std::function<bool(std::shared_ptr<Executor>)> idle_;
135     std::function<bool(std::shared_ptr<Executor>, bool)> release_;
136     std::thread thread_;
137 };
138 } // namespace NativePreferences
139 } // namespace OHOS
140 #endif // PREFERENCES_FRAMEWORKS_EXECUTOR_H