1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CORE_THREADING_TASK_QUEUE_H
17 #define CORE_THREADING_TASK_QUEUE_H
18 
19 #include <atomic>
20 #include <cstdint>
21 #include <functional>
22 
23 #include <base/containers/vector.h>
24 #include <core/namespace.h>
25 #include <core/threading/intf_thread_pool.h>
26 
CORE_BEGIN_NAMESPACE()27 CORE_BEGIN_NAMESPACE()
28 
29 // Helper class for running std::function as a ThreadPool task.
30 class FunctionTask final : public IThreadPool::ITask {
31 public:
32     static Ptr Create(std::function<void()> func)
33     {
34         return Ptr { new FunctionTask(move(func)) };
35     }
36 
37     explicit FunctionTask(std::function<void()> func) : func_(move(func)) {};
38 
39     void operator()() override
40     {
41         func_();
42     }
43 
44 protected:
45     void Destroy() override
46     {
47         delete this;
48     }
49 
50 private:
51     std::function<void()> func_;
52 };
53 
54 // Abstract base class for task queues.
55 /* Usage examples:
56  *  1. SequentialTaskQueue queue(threadManager.GetThreadPool());
57  *  2. queue.Submit("identifier A", FunctionTask::Create(function));
58  *  3. queue.Submit("identifier B", FunctionTask::Create(std::bind(&Classname::function, this)));
59  *  4. queue.Submit("identifier C", FunctionTask::Create([]() { <code> }));
60  */
61 class TaskQueue {
62 public:
63     /** Constructor for the task queue.
64         @param aThreadPool Optional thread pool, if support for threading is desired.
65     */
66     explicit TaskQueue(const IThreadPool::Ptr& threadPool);
67     TaskQueue(const TaskQueue& other) = delete;
68     TaskQueue& operator=(const TaskQueue& other) = delete;
69 
70     virtual ~TaskQueue();
71 
72     /** Submit task to end of execution queue.
73         @param taskIdentifier Identifier of the task, must be unique.
74         @param task Task to execute.
75     */
76     virtual void Submit(uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task) = 0;
77 
78     /** Remove task from execution queue.
79         @param taskIdentifier Identifier of the task.
80     */
81     virtual void Remove(uint64_t taskIdentifier) = 0;
82 
83     /** Remove all tasks from execution queue. */
84     virtual void Clear() = 0;
85 
86     /** Execute task queue in this thread. */
87     virtual void Execute() = 0;
88 
89     /** Execute task queue asynchronously in new thread. */
90     virtual void ExecuteAsync();
91 
92     /** Checks if task queue is running asynchronously.
93         @return True if task queue is currently running and can't be re-executed.
94     */
95     bool IsRunningAsync() const;
96 
97     /** Waits until task queue has completed asynchronous execution */
98     void Wait();
99 
100 protected:
101     class ExecuteAsyncTask final : public IThreadPool::ITask {
102     public:
103         explicit ExecuteAsyncTask(TaskQueue& queue);
104         void operator()() override;
105 
106     protected:
107         void Destroy() override;
108 
109     private:
110         TaskQueue& queue_;
111     };
112 
113     struct Entry {
114         Entry() = default;
115         Entry(uint64_t identifier, IThreadPool::ITask::Ptr task);
116         bool operator==(uint64_t identifier) const;
117         bool operator==(const Entry& other) const;
118 
119         IThreadPool::ITask::Ptr task;
120         uint64_t identifier {};
121         BASE_NS::vector<uint64_t> dependencies;
122     };
123 
124     IThreadPool::Ptr threadPool_;
125     IThreadPool::IResult::Ptr asyncOperation_;
126     std::atomic_bool isRunningAsync_;
127 };
128 CORE_END_NAMESPACE()
129 
130 #endif // CORE_THREADING_TASK_QUEUE_H
131