1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "threading/dispatcher_task_queue.h"
17
18 #include <algorithm>
19 #include <cstddef>
20 #include <deque>
21 #include <mutex>
22
23 #include <base/containers/array_view.h>
24 #include <base/containers/iterator.h>
25 #include <base/containers/unique_ptr.h>
26 #include <base/containers/vector.h>
27 #include <core/namespace.h>
28 #include <core/threading/intf_thread_pool.h>
29
30 CORE_BEGIN_NAMESPACE()
31 using BASE_NS::vector;
32
33 // -- Dispatcher task queue.
DispatcherTaskQueue(const IThreadPool::Ptr & threadPool)34 DispatcherTaskQueue::DispatcherTaskQueue(const IThreadPool::Ptr& threadPool) : TaskQueue(threadPool) {}
35
~DispatcherTaskQueue()36 DispatcherTaskQueue::~DispatcherTaskQueue()
37 {
38 Wait();
39 }
40
Remove(uint64_t taskIdentifier)41 void DispatcherTaskQueue::Remove(uint64_t taskIdentifier)
42 {
43 std::lock_guard lock(queueLock_);
44
45 auto it = std::find(tasks_.begin(), tasks_.end(), taskIdentifier);
46 if (it != tasks_.end()) {
47 tasks_.erase(it);
48 }
49 }
50
Clear()51 void DispatcherTaskQueue::Clear()
52 {
53 Wait();
54 {
55 std::lock_guard lock(queueLock_);
56
57 tasks_.clear();
58 }
59 }
60
Submit(uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)61 void DispatcherTaskQueue::Submit(uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
62 {
63 std::lock_guard lock(queueLock_);
64
65 tasks_.emplace_back(taskIdentifier, move(task));
66 }
67
SubmitAfter(uint64_t afterIdentifier,uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)68 void DispatcherTaskQueue::SubmitAfter(uint64_t afterIdentifier, uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
69 {
70 std::lock_guard lock(queueLock_);
71
72 auto it = std::find(tasks_.begin(), tasks_.end(), afterIdentifier);
73 if (it != tasks_.end()) {
74 tasks_.emplace(++it, taskIdentifier, move(task));
75 } else {
76 tasks_.emplace_back(taskIdentifier, move(task));
77 }
78 }
79
SubmitAfter(BASE_NS::array_view<const uint64_t> afterIdentifiers,uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)80 void DispatcherTaskQueue::SubmitAfter(
81 BASE_NS::array_view<const uint64_t> afterIdentifiers, uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
82 {
83 ptrdiff_t pos = -1;
84 for (const auto afterIdentifier : afterIdentifiers) {
85 auto it = std::find(tasks_.begin(), tasks_.end(), afterIdentifier);
86 if (it != tasks_.end()) {
87 pos = std::max(pos, std::distance(tasks_.begin(), it));
88 }
89 }
90 if (pos >= 0) {
91 tasks_.emplace(tasks_.begin() + (pos + 1), taskIdentifier, move(task));
92 } else {
93 tasks_.emplace_back(taskIdentifier, move(task));
94 }
95 }
96
Execute()97 void DispatcherTaskQueue::Execute()
98 {
99 Entry entry;
100 bool hasTaskEntry = false;
101
102 {
103 // Retrieve first task in task queue.
104 std::lock_guard lock(queueLock_);
105
106 if (!tasks_.empty()) {
107 entry = BASE_NS::move(tasks_.front());
108 tasks_.pop_front();
109 hasTaskEntry = true;
110 }
111 }
112
113 if (hasTaskEntry) {
114 // Execute.
115 (*entry.task)();
116
117 {
118 // Move to completed list and finish.
119 std::lock_guard lock(queueLock_);
120 finishedTasks_.emplace_back(std::move(entry));
121 }
122 }
123 }
124
CollectFinishedTasks()125 vector<uint64_t> DispatcherTaskQueue::CollectFinishedTasks()
126 {
127 std::lock_guard lock(queueLock_);
128
129 vector<uint64_t> result;
130 result.reserve(finishedTasks_.size());
131 for (auto& entry : finishedTasks_) {
132 result.emplace_back(entry.identifier);
133 }
134
135 finishedTasks_.clear();
136
137 return result;
138 }
139 CORE_END_NAMESPACE()
140