1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19 
20 #include <base/containers/vector.h>
21 
22 #include <meta/base/interface_macros.h>
23 #include <meta/interface/intf_task_queue.h>
24 #include <meta/interface/intf_task_queue_registry.h>
25 
26 #include "future.h"
27 #include "meta_object.h"
28 #include "task_queue.h"
29 
30 META_BEGIN_NAMESPACE()
31 
32 class ThreadedTaskQueue
33     : public Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl> {
34 public:
35     using Super =
36         Internal::MetaObjectFwd<ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IThreadedTaskQueue, TaskQueueImpl>;
37     using Token = ITaskQueue::Token;
38 
39     META_NO_COPY_MOVE(ThreadedTaskQueue)
40 
41     ThreadedTaskQueue() = default;
~ThreadedTaskQueue()42     ~ThreadedTaskQueue() override
43     {
44         Shutdown();
45     }
46 
Build(const IMetadata::Ptr & data)47     bool Build(const IMetadata::Ptr& data) override
48     {
49         bool ret = Super::Build(data);
50         if (ret) {
51             self_ = GetSelf<ITaskQueue>();
52             thread_ = std::thread([this]() { ProcessTasks(); });
53         }
54         return ret;
55     }
56 
InvokeTask(const ITaskQueueTask::Ptr & task)57     bool InvokeTask(const ITaskQueueTask::Ptr& task) override
58     {
59         auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
60         auto ret = task->Invoke();
61         GetTaskQueueRegistry().SetCurrentTaskQueue(q);
62         return ret;
63     }
64 
Shutdown()65     void Shutdown() override
66     {
67         Close();
68         addCondition_.notify_one();
69         if (thread_.joinable()) {
70             thread_.join();
71         }
72     }
73 
CancelTask(Token token)74     void CancelTask(Token token) override
75     {
76         TaskQueueImpl::CancelTask(token);
77     }
78 
AddTask(ITaskQueueTask::Ptr p)79     Token AddTask(ITaskQueueTask::Ptr p) override
80     {
81         return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
82     }
83 
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)84     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
85     {
86         auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
87         if (t) {
88             addCondition_.notify_one();
89         }
90         return t;
91     }
92 
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)93     IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
94     {
95         IPromise::Ptr promise(new Promise);
96         BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
97         auto f = task->GetFuture();
98         AddTask(BASE_NS::move(task));
99         return f;
100     }
101 
ProcessTasks()102     void ProcessTasks()
103     {
104         std::unique_lock lock { mutex_ };
105         execThread_ = std::this_thread::get_id();
106         while (!terminate_) {
107             if (!tasks_.empty()) {
108                 TimeSpan delta = tasks_.back().executeTime - Time();
109                 // wait for next execute time (or trigger which ever is first). and see if we can now process things..
110                 // technically we will always be a bit late here. "it's a best effort"
111                 if (delta > TimeSpan::Microseconds(0)) {
112                     addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
113                 }
114             } else {
115                 // infinite wait, since the queue is empty..
116                 addCondition_.wait(lock);
117             }
118             auto curTime = Time();
119             TaskQueueImpl::ProcessTasks(lock, curTime);
120         }
121     }
122 
123 private:
124     std::condition_variable addCondition_;
125     std::thread thread_;
126 };
127 
128 namespace Internal {
129 
GetThreadedTaskQueueFactory()130 IObjectFactory::Ptr GetThreadedTaskQueueFactory()
131 {
132     return ThreadedTaskQueue::GetFactory();
133 }
134 
135 } // namespace Internal
136 
137 META_END_NAMESPACE()
138