1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19
20 #include <base/containers/vector.h>
21
22 #include <meta/base/interface_macros.h>
23 #include <meta/interface/intf_task_queue.h>
24 #include <meta/interface/intf_task_queue_registry.h>
25
26 #include "future.h"
27 #include "meta_object.h"
28 #include "task_queue.h"
29
30 META_BEGIN_NAMESPACE()
31
32 class ThreadedTaskQueue
33 : public Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl> {
34 public:
35 using Super =
36 Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl>;
37 using Token = ITaskQueue::Token;
38
39 META_NO_COPY_MOVE(ThreadedTaskQueue)
40
41 ThreadedTaskQueue() = default;
~ThreadedTaskQueue()42 ~ThreadedTaskQueue() override
43 {
44 Shutdown();
45 }
46
Build(const IMetadata::Ptr & data)47 bool Build(const IMetadata::Ptr& data) override
48 {
49 bool ret = Super::Build(data);
50 if (ret) {
51 self_ = GetSelf<ITaskQueue>();
52 thread_ = std::thread([this]() { ProcessTasks(); });
53 }
54 return ret;
55 }
56
InvokeTask(const ITaskQueueTask::Ptr & task)57 bool InvokeTask(const ITaskQueueTask::Ptr& task) override
58 {
59 auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
60 auto ret = task->Invoke();
61 GetTaskQueueRegistry().SetCurrentTaskQueue(q);
62 return ret;
63 }
64
Shutdown()65 void Shutdown() override
66 {
67 Close();
68 addCondition_.notify_one();
69 if (thread_.joinable()) {
70 thread_.join();
71 }
72 }
73
CancelTask(Token token)74 void CancelTask(Token token) override
75 {
76 TaskQueueImpl::CancelTask(token);
77 }
78
AddTask(ITaskQueueTask::Ptr p)79 Token AddTask(ITaskQueueTask::Ptr p) override
80 {
81 return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
82 }
83
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)84 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
85 {
86 auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
87 if (t) {
88 addCondition_.notify_one();
89 }
90 return t;
91 }
92
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)93 IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
94 {
95 IPromise::Ptr promise(new Promise);
96 BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
97 auto f = task->GetFuture();
98 AddTask(BASE_NS::move(task));
99 return f;
100 }
101
ProcessTasks()102 void ProcessTasks()
103 {
104 std::unique_lock lock { mutex_ };
105 execThread_ = std::this_thread::get_id();
106 while (!terminate_) {
107 if (!tasks_.empty()) {
108 TimeSpan delta = tasks_.back().executeTime - Time();
109 // wait for next execute time (or trigger which ever is first). and see if we can now process things..
110 // technically we will always be a bit late here. "it's a best effort"
111 if (delta > TimeSpan::Microseconds(0)) {
112 addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
113 }
114 } else {
115 // infinite wait, since the queue is empty..
116 addCondition_.wait(lock);
117 }
118 auto curTime = Time();
119 TaskQueueImpl::ProcessTasks(lock, curTime);
120 }
121 }
122
123 private:
124 std::condition_variable addCondition_;
125 std::thread thread_;
126 };
127
128 namespace Internal {
129
GetThreadedTaskQueueFactory()130 IObjectFactory::Ptr GetThreadedTaskQueueFactory()
131 {
132 return ThreadedTaskQueue::GetFactory();
133 }
134
135 } // namespace Internal
136
137 META_END_NAMESPACE()
138