1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef CORE_THREADING_TASK_QUEUE_H
17 #define CORE_THREADING_TASK_QUEUE_H
18
19 #include <atomic>
20 #include <cstdint>
21 #include <functional>
22
23 #include <base/containers/vector.h>
24 #include <core/namespace.h>
25 #include <core/threading/intf_thread_pool.h>
26
CORE_BEGIN_NAMESPACE()27 CORE_BEGIN_NAMESPACE()
28
29 // Helper class for running std::function as a ThreadPool task.
30 class FunctionTask final : public IThreadPool::ITask {
31 public:
32 static Ptr Create(std::function<void()> func)
33 {
34 return Ptr { new FunctionTask(move(func)) };
35 }
36
37 explicit FunctionTask(std::function<void()> func) : func_(move(func)) {};
38
39 void operator()() override
40 {
41 func_();
42 }
43
44 protected:
45 void Destroy() override
46 {
47 delete this;
48 }
49
50 private:
51 std::function<void()> func_;
52 };
53
54 // Abstract base class for task queues.
55 /* Usage examples:
56 * 1. SequentialTaskQueue queue(threadManager.GetThreadPool());
57 * 2. queue.Submit("identifier A", FunctionTask::Create(function));
58 * 3. queue.Submit("identifier B", FunctionTask::Create(std::bind(&Classname::function, this)));
59 * 4. queue.Submit("identifier C", FunctionTask::Create([]() { <code> }));
60 */
61 class TaskQueue {
62 public:
63 /** Constructor for the task queue.
64 @param aThreadPool Optional thread pool, if support for threading is desired.
65 */
66 explicit TaskQueue(const IThreadPool::Ptr& threadPool);
67 TaskQueue(const TaskQueue& other) = delete;
68 TaskQueue& operator=(const TaskQueue& other) = delete;
69
70 virtual ~TaskQueue();
71
72 /** Submit task to end of execution queue.
73 @param taskIdentifier Identifier of the task, must be unique.
74 @param task Task to execute.
75 */
76 virtual void Submit(uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task) = 0;
77
78 /** Remove task from execution queue.
79 @param taskIdentifier Identifier of the task.
80 */
81 virtual void Remove(uint64_t taskIdentifier) = 0;
82
83 /** Remove all tasks from execution queue. */
84 virtual void Clear() = 0;
85
86 /** Execute task queue in this thread. */
87 virtual void Execute() = 0;
88
89 /** Execute task queue asynchronously in new thread. */
90 virtual void ExecuteAsync();
91
92 /** Checks if task queue is running asynchronously.
93 @return True if task queue is currently running and can't be re-executed.
94 */
95 bool IsRunningAsync() const;
96
97 /** Waits until task queue has completed asynchronous execution */
98 void Wait();
99
100 protected:
101 class ExecuteAsyncTask final : public IThreadPool::ITask {
102 public:
103 explicit ExecuteAsyncTask(TaskQueue& queue);
104 void operator()() override;
105
106 protected:
107 void Destroy() override;
108
109 private:
110 TaskQueue& queue_;
111 };
112
113 struct Entry {
114 Entry() = default;
115 Entry(uint64_t identifier, IThreadPool::ITask::Ptr task);
116 bool operator==(uint64_t identifier) const;
117 bool operator==(const Entry& other) const;
118
119 IThreadPool::ITask::Ptr task;
120 uint64_t identifier {};
121 BASE_NS::vector<uint64_t> dependencies;
122 };
123
124 IThreadPool::Ptr threadPool_;
125 IThreadPool::IResult::Ptr asyncOperation_;
126 std::atomic_bool isRunningAsync_;
127 };
128 CORE_END_NAMESPACE()
129
130 #endif // CORE_THREADING_TASK_QUEUE_H
131