1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18 
19 #include <mutex>
20 #include <thread>
21 
22 #include <base/containers/vector.h>
23 
24 #include <meta/interface/intf_clock.h>
25 #include <meta/interface/intf_task_queue.h>
26 
META_BEGIN_NAMESPACE()27 META_BEGIN_NAMESPACE()
28 
29 class TaskQueueImpl : public ITaskQueueExtend {
30 public:
31     using Token = ITaskQueue::Token;
32 
33     void SetExtend(ITaskQueueExtend* extend) override
34     {
35         extend_ = extend ? extend : this;
36     }
37     void Shutdown() override
38     {
39         Close();
40     }
41 
42     void CancelTask(Token token)
43     {
44         if (token != nullptr) {
45             std::unique_lock lock { mutex_ };
46             Token executingToken = execToken_;
47             if (token == execToken_) {
48                 // Currently executing task is requested to cancel.
49                 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
50                 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
51                 execToken_ = nullptr;
52             }
53 
54             // If we are currently executing the task in different thread, wait for it to complete.
55             if (std::this_thread::get_id() != execThread_) {
56                 while (!terminate_ && token == executingToken) {
57                     lock.unlock();
58                     // sleep a bit.
59                     std::this_thread::yield();
60                     lock.lock();
61                     executingToken = execToken_;
62                 }
63             }
64 
65             // Remove all tasks from the queue, with the same token. (if any)
66             // One can push the same task to the queue multiple times currently.
67             // (ie. you "can" schedule the same task with different "delays")
68             // So we remove all scheduled tasks with same token.
69             // Also redo/rearm might have add the task back while we were waiting/yielding.
70             for (auto it = tasks_.begin(); it != tasks_.end();) {
71                 if (it->operation.get() == token) {
72                     it = tasks_.erase(it);
73                 } else {
74                     it++;
75                 }
76             }
77         }
78     }
79 
80     Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
81     {
82         Token ret { p.get() };
83 
84         if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
85             i->SetQueueAndToken(self_.lock(), ret);
86         }
87 
88         // insertion sort the tasks
89         if (tasks_.empty()) {
90             tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
91         } else if (tasks_.size() == 1) {
92             if (tasks_.front().executeTime >= excTime) {
93                 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
94             } else {
95                 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
96             }
97         } else {
98             bool found = false;
99             for (auto it = tasks_.end() - 1; it >= tasks_.begin(); --it) {
100                 if (it->executeTime > excTime) {
101                     // task in list should execute after us, so insert there.
102                     tasks_.insert(it + 1, { delay, excTime, BASE_NS::move(p) });
103                     found = true;
104                     break;
105                 }
106             }
107             if (!found) {
108                 // add last then ..
109                 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
110             }
111         }
112         return ret;
113     }
114 
115     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
116     {
117         if (p) {
118             std::unique_lock lock { mutex_ };
119             return AddTaskImpl(BASE_NS::move(p), delay, excTime);
120         }
121         return nullptr;
122     }
123 
124     TimeSpan Time() const
125     {
126         using namespace std::chrono;
127         return TimeSpan::Microseconds(
128             duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
129     }
130 
131     void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
132     {
133         // Must only be called while having the lock
134         BASE_NS::vector<Task> rearm;
135         while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
136             auto task = BASE_NS::move(tasks_.back());
137             tasks_.pop_back();
138             execToken_ = task.operation.get();
139             lock.unlock();
140             bool redo = extend_->InvokeTask(task.operation);
141             lock.lock();
142             // Note execToken_ has been set to null if the executing task is cancelled.
143             if ((redo) && (execToken_ != nullptr)) {
144                 // Reschedule the task again.
145                 rearm.emplace_back(BASE_NS::move(task));
146             }
147             execToken_ = nullptr;
148         }
149 
150         // rearm the tasks.. (if we are not shutting down)
151         if (!terminate_) {
152             for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
153                 auto& task = *it;
154                 if (task.delay > TimeSpan()) {
155                     // calculate the next executeTime in phase.. (ie. how many events missed)
156                     uint64_t dt = task.delay.ToMicroseconds();
157                     uint64_t et = task.executeTime.ToMicroseconds();
158                     uint64_t ct = curTime.ToMicroseconds();
159                     // calculate the next executeTime in phase..
160                     et += dt;
161                     if (et <= ct) {
162                         // "ticks" how many events would have ran.. (rounded up)
163                         auto ticks = ((ct - et) + (dt - 1)) / dt;
164                         // and based on the "ticks" we can now count the next execution time.
165                         et += (ticks * dt);
166                         CORE_LOG_V("Skipped ticks %d", (int)ticks);
167                     }
168                     task.executeTime = TimeSpan::Microseconds(et);
169                 } else {
170                     task.executeTime = curTime;
171                 }
172                 AddTaskImpl(task.operation, task.delay, task.executeTime);
173             }
174         }
175     }
176 
177     void Close()
178     {
179         std::unique_lock lock { mutex_ };
180         terminate_ = true;
181         tasks_.clear();
182     }
183 
184     struct Task {
185         Task() = default;
186         Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
187 
188         TimeSpan delay;
189         TimeSpan executeTime;
190         ITaskQueueTask::Ptr operation { nullptr };
191     };
192 
193 protected:
194     std::mutex mutex_;
195 
196     ITaskQueueExtend* extend_ { this };
197     bool terminate_ {};
198     std::thread::id execThread_;
199     // currently running task..
200     Token execToken_ { nullptr };
201     BASE_NS::vector<Task> tasks_;
202     ITaskQueue::WeakPtr self_;
203 };
204 
205 META_END_NAMESPACE()
206 
207 #endif
208