1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18
19 #include <mutex>
20 #include <thread>
21
22 #include <base/containers/vector.h>
23
24 #include <meta/interface/intf_clock.h>
25 #include <meta/interface/intf_task_queue.h>
26
META_BEGIN_NAMESPACE()27 META_BEGIN_NAMESPACE()
28
29 class TaskQueueImpl : public ITaskQueueExtend {
30 public:
31 using Token = ITaskQueue::Token;
32
33 void SetExtend(ITaskQueueExtend* extend) override
34 {
35 extend_ = extend ? extend : this;
36 }
37 void Shutdown() override
38 {
39 Close();
40 }
41
42 void CancelTask(Token token)
43 {
44 if (token != nullptr) {
45 std::unique_lock lock { mutex_ };
46 Token executingToken = execToken_;
47 if (token == execToken_) {
48 // Currently executing task is requested to cancel.
49 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
50 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
51 execToken_ = nullptr;
52 }
53
54 // If we are currently executing the task in different thread, wait for it to complete.
55 if (std::this_thread::get_id() != execThread_) {
56 while (!terminate_ && token == executingToken) {
57 lock.unlock();
58 // sleep a bit.
59 std::this_thread::yield();
60 lock.lock();
61 executingToken = execToken_;
62 }
63 }
64
65 // Remove all tasks from the queue, with the same token. (if any)
66 // One can push the same task to the queue multiple times currently.
67 // (ie. you "can" schedule the same task with different "delays")
68 // So we remove all scheduled tasks with same token.
69 // Also redo/rearm might have add the task back while we were waiting/yielding.
70 for (auto it = tasks_.begin(); it != tasks_.end();) {
71 if (it->operation.get() == token) {
72 it = tasks_.erase(it);
73 } else {
74 it++;
75 }
76 }
77 }
78 }
79
80 Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
81 {
82 Token ret { p.get() };
83
84 if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
85 i->SetQueueAndToken(self_.lock(), ret);
86 }
87
88 // insertion sort the tasks
89 if (tasks_.empty()) {
90 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
91 } else if (tasks_.size() == 1) {
92 if (tasks_.front().executeTime >= excTime) {
93 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
94 } else {
95 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
96 }
97 } else {
98 bool found = false;
99 for (auto it = tasks_.end() - 1; it >= tasks_.begin(); --it) {
100 if (it->executeTime > excTime) {
101 // task in list should execute after us, so insert there.
102 tasks_.insert(it + 1, { delay, excTime, BASE_NS::move(p) });
103 found = true;
104 break;
105 }
106 }
107 if (!found) {
108 // add last then ..
109 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
110 }
111 }
112 return ret;
113 }
114
115 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
116 {
117 if (p) {
118 std::unique_lock lock { mutex_ };
119 return AddTaskImpl(BASE_NS::move(p), delay, excTime);
120 }
121 return nullptr;
122 }
123
124 TimeSpan Time() const
125 {
126 using namespace std::chrono;
127 return TimeSpan::Microseconds(
128 duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
129 }
130
131 void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
132 {
133 // Must only be called while having the lock
134 BASE_NS::vector<Task> rearm;
135 while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
136 auto task = BASE_NS::move(tasks_.back());
137 tasks_.pop_back();
138 execToken_ = task.operation.get();
139 lock.unlock();
140 bool redo = extend_->InvokeTask(task.operation);
141 lock.lock();
142 // Note execToken_ has been set to null if the executing task is cancelled.
143 if ((redo) && (execToken_ != nullptr)) {
144 // Reschedule the task again.
145 rearm.emplace_back(BASE_NS::move(task));
146 }
147 execToken_ = nullptr;
148 }
149
150 // rearm the tasks.. (if we are not shutting down)
151 if (!terminate_) {
152 for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
153 auto& task = *it;
154 if (task.delay > TimeSpan()) {
155 // calculate the next executeTime in phase.. (ie. how many events missed)
156 uint64_t dt = task.delay.ToMicroseconds();
157 uint64_t et = task.executeTime.ToMicroseconds();
158 uint64_t ct = curTime.ToMicroseconds();
159 // calculate the next executeTime in phase..
160 et += dt;
161 if (et <= ct) {
162 // "ticks" how many events would have ran.. (rounded up)
163 auto ticks = ((ct - et) + (dt - 1)) / dt;
164 // and based on the "ticks" we can now count the next execution time.
165 et += (ticks * dt);
166 CORE_LOG_V("Skipped ticks %d", (int)ticks);
167 }
168 task.executeTime = TimeSpan::Microseconds(et);
169 } else {
170 task.executeTime = curTime;
171 }
172 AddTaskImpl(task.operation, task.delay, task.executeTime);
173 }
174 }
175 }
176
177 void Close()
178 {
179 std::unique_lock lock { mutex_ };
180 terminate_ = true;
181 tasks_.clear();
182 }
183
184 struct Task {
185 Task() = default;
186 Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
187
188 TimeSpan delay;
189 TimeSpan executeTime;
190 ITaskQueueTask::Ptr operation { nullptr };
191 };
192
193 protected:
194 std::mutex mutex_;
195
196 ITaskQueueExtend* extend_ { this };
197 bool terminate_ {};
198 std::thread::id execThread_;
199 // currently running task..
200 Token execToken_ { nullptr };
201 BASE_NS::vector<Task> tasks_;
202 ITaskQueue::WeakPtr self_;
203 };
204
205 META_END_NAMESPACE()
206
207 #endif
208