1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "thread_pool.h"
17 #include <sys/time.h>
18 #include <algorithm>
19 #include "thread_utils.h"
20 #include "dp_log.h"
21
22 namespace OHOS {
23 namespace CameraStandard {
24 namespace DeferredProcessing {
Create(const std::string & name,uint32_t numThreads)25 std::unique_ptr<ThreadPool> ThreadPool::Create(const std::string& name, uint32_t numThreads)
26 {
27 auto pool = std::make_unique<ThreadPool>(name, numThreads);
28 if (pool) {
29 pool->Initialize();
30 }
31 return pool;
32 }
33
ThreadPool(const std::string & name,uint32_t numThreads)34 ThreadPool::ThreadPool(const std::string& name, uint32_t numThreads)
35 : name_(name), numThreads_(numThreads), workers_(), isStopped_(false), mutex_(), taskCv_(), tasks_()
36 {
37 if (numThreads_ == 0) {
38 numThreads_ = 1;
39 }
40 numThreads_ = std::min(numThreads_, static_cast<uint32_t>(std::thread::hardware_concurrency()));
41 DP_DEBUG_LOG("name: %s, numThreads, orig: %u, new: %u.", name.c_str(), numThreads, numThreads_);
42 }
43
~ThreadPool()44 ThreadPool::~ThreadPool()
45 {
46 CAMERA_DP_SYNC_TRACE;
47 DP_DEBUG_LOG("name: %s.", name_.c_str());
48 isStopped_ = true;
49 taskCv_.notify_all();
50 for (auto& threadInfo : workers_) {
51 if (threadInfo.thread.joinable()) {
52 DP_DEBUG_LOG("joining thread (%s).", threadInfo.name.c_str());
53 threadInfo.thread.join();
54 }
55 }
56 }
57
Initialize()58 void ThreadPool::Initialize()
59 {
60 DP_DEBUG_LOG("entered.");
61 workers_.reserve(numThreads_);
62 std::string threadNamePrefix = name_ + "_DPS_Worker_";
63 for (uint32_t i = 0; i < numThreads_; ++i) {
64 auto threadName = threadNamePrefix + std::to_string(i);
65 workers_.emplace_back(threadName, [this, threadName]() { WorkerLoop(threadName); });
66 SetThreadName(workers_.back().thread.native_handle(), workers_.back().name);
67 }
68 PrintThreadInfo();
69 }
70
WorkerLoop(const std::string & threadName)71 void ThreadPool::WorkerLoop(const std::string& threadName)
72 {
73 DP_DEBUG_LOG("(%s) entered.", threadName.c_str());
74 while (!isStopped_.load()) {
75 DP_DEBUG_LOG("(%s) task excute start entered.", threadName.c_str());
76 auto task = GetTask();
77 if (task) {
78 DP_DEBUG_LOG("(%s) task excuting entered.", threadName.c_str());
79 task();
80 DP_DEBUG_LOG("(%s) task excuting complete.", threadName.c_str());
81 } else {
82 DP_DEBUG_LOG("empty task.");
83 }
84 }
85 DP_DEBUG_LOG("(%s) exited.", threadName.c_str());
86 }
87
BeginBackgroundTasks() const88 void ThreadPool::BeginBackgroundTasks() const
89 {
90 DP_DEBUG_LOG("entered.");
91 for (auto& workerInfo : workers_) {
92 SetThreadPriority(workerInfo.thread.native_handle(), PRIORITY_BACKGROUND);
93 }
94 }
95
EndBackgroundTasks() const96 void ThreadPool::EndBackgroundTasks() const
97 {
98 DP_DEBUG_LOG("entered.");
99 for (auto& workerInfo : workers_) {
100 SetThreadPriority(workerInfo.thread.native_handle(), PRIORITY_NORMAL);
101 }
102 }
103
SetThreadPoolPriority(int priority)104 void ThreadPool::SetThreadPoolPriority(int priority)
105 {
106 DP_DEBUG_LOG("entered.");
107 for (auto& workerInfo : workers_) {
108 SetThreadPriority(workerInfo.thread.native_handle(), priority);
109 }
110 }
111
GetThreadPoolPriority() const112 int ThreadPool::GetThreadPoolPriority() const
113 {
114 return GetThreadPriority(workers_[0].thread.native_handle());
115 }
116
PrintThreadInfo()117 void ThreadPool::PrintThreadInfo()
118 {
119 struct sched_param sch;
120 int policy = -1;
121 for (auto& workerInfo : workers_) {
122 int ret = pthread_getschedparam(workerInfo.thread.native_handle(), &policy, &sch);
123 if (ret == 0) {
124 DP_DEBUG_LOG("thread (%s) priority: %{public}d, policy = %{public}d(0:OTHER, 1:FIFO, 2:RR)",
125 workerInfo.name.c_str(), sch.sched_priority, policy);
126 } else {
127 DP_DEBUG_LOG("thread (%s) pthread_getschedparam failed, ret = %{public}d.", workerInfo.name.c_str(), ret);
128 }
129 }
130 }
131
GetTask() const132 ThreadPool::Task ThreadPool::GetTask() const
133 {
134 std::unique_lock<std::mutex> lock(mutex_);
135 taskCv_.wait(lock, [this] { return isStopped_.load() || !tasks_.empty(); });
136 if (isStopped_.load()) {
137 return {};
138 }
139 auto task = std::move(tasks_.front());
140 tasks_.pop_front();
141 return task;
142 }
143
HasPendingTasks() const144 bool ThreadPool::HasPendingTasks() const
145 {
146 std::unique_lock<std::mutex> lock(mutex_);
147 return !tasks_.empty();
148 }
149
Submit(Task func,bool isUrgent) const150 bool ThreadPool::Submit(Task func, bool isUrgent) const
151 {
152 DP_DEBUG_LOG("entered.");
153 if (!isStopped_.load()) {
154 {
155 std::unique_lock<std::mutex> lock(mutex_);
156 if (isUrgent) {
157 tasks_.emplace_front([task = std::move(func)] { task(); });
158 } else {
159 tasks_.emplace_back([task = std::move(func)] { task(); });
160 }
161 }
162 taskCv_.notify_one();
163 } else {
164 DP_ERR_LOG("failed due to thread pool has been stopped.");
165 return false;
166 }
167 return true;
168 }
169 } // namespace DeferredProcessing
170 } // namespace CameraStandard
171 } // namespace OHOS
172