1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "threading/dispatcher_task_queue.h"
17 
18 #include <algorithm>
19 #include <cstddef>
20 #include <deque>
21 #include <mutex>
22 
23 #include <base/containers/array_view.h>
24 #include <base/containers/iterator.h>
25 #include <base/containers/unique_ptr.h>
26 #include <base/containers/vector.h>
27 #include <core/namespace.h>
28 #include <core/threading/intf_thread_pool.h>
29 
30 CORE_BEGIN_NAMESPACE()
31 using BASE_NS::vector;
32 
33 // -- Dispatcher task queue.
DispatcherTaskQueue(const IThreadPool::Ptr & threadPool)34 DispatcherTaskQueue::DispatcherTaskQueue(const IThreadPool::Ptr& threadPool) : TaskQueue(threadPool) {}
35 
~DispatcherTaskQueue()36 DispatcherTaskQueue::~DispatcherTaskQueue()
37 {
38     Wait();
39 }
40 
Remove(uint64_t taskIdentifier)41 void DispatcherTaskQueue::Remove(uint64_t taskIdentifier)
42 {
43     std::lock_guard lock(queueLock_);
44 
45     auto it = std::find(tasks_.begin(), tasks_.end(), taskIdentifier);
46     if (it != tasks_.end()) {
47         tasks_.erase(it);
48     }
49 }
50 
Clear()51 void DispatcherTaskQueue::Clear()
52 {
53     Wait();
54     {
55         std::lock_guard lock(queueLock_);
56 
57         tasks_.clear();
58     }
59 }
60 
Submit(uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)61 void DispatcherTaskQueue::Submit(uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
62 {
63     std::lock_guard lock(queueLock_);
64 
65     tasks_.emplace_back(taskIdentifier, move(task));
66 }
67 
SubmitAfter(uint64_t afterIdentifier,uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)68 void DispatcherTaskQueue::SubmitAfter(uint64_t afterIdentifier, uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
69 {
70     std::lock_guard lock(queueLock_);
71 
72     auto it = std::find(tasks_.begin(), tasks_.end(), afterIdentifier);
73     if (it != tasks_.end()) {
74         tasks_.emplace(++it, taskIdentifier, move(task));
75     } else {
76         tasks_.emplace_back(taskIdentifier, move(task));
77     }
78 }
79 
SubmitAfter(BASE_NS::array_view<const uint64_t> afterIdentifiers,uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)80 void DispatcherTaskQueue::SubmitAfter(
81     BASE_NS::array_view<const uint64_t> afterIdentifiers, uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
82 {
83     ptrdiff_t pos = -1;
84     for (const auto afterIdentifier : afterIdentifiers) {
85         auto it = std::find(tasks_.begin(), tasks_.end(), afterIdentifier);
86         if (it != tasks_.end()) {
87             pos = std::max(pos, std::distance(tasks_.begin(), it));
88         }
89     }
90     if (pos >= 0) {
91         tasks_.emplace(tasks_.begin() + (pos + 1), taskIdentifier, move(task));
92     } else {
93         tasks_.emplace_back(taskIdentifier, move(task));
94     }
95 }
96 
Execute()97 void DispatcherTaskQueue::Execute()
98 {
99     Entry entry;
100     bool hasTaskEntry = false;
101 
102     {
103         // Retrieve first task in task queue.
104         std::lock_guard lock(queueLock_);
105 
106         if (!tasks_.empty()) {
107             entry = BASE_NS::move(tasks_.front());
108             tasks_.pop_front();
109             hasTaskEntry = true;
110         }
111     }
112 
113     if (hasTaskEntry) {
114         // Execute.
115         (*entry.task)();
116 
117         {
118             // Move to completed list and finish.
119             std::lock_guard lock(queueLock_);
120             finishedTasks_.emplace_back(std::move(entry));
121         }
122     }
123 }
124 
CollectFinishedTasks()125 vector<uint64_t> DispatcherTaskQueue::CollectFinishedTasks()
126 {
127     std::lock_guard lock(queueLock_);
128 
129     vector<uint64_t> result;
130     result.reserve(finishedTasks_.size());
131     for (auto& entry : finishedTasks_) {
132         result.emplace_back(entry.identifier);
133     }
134 
135     finishedTasks_.clear();
136 
137     return result;
138 }
139 CORE_END_NAMESPACE()
140