1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "task_pool_impl.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18
19 namespace DistributedDB {
20 constexpr int TaskPoolImpl::IDLE_WAIT_PERIOD;
21
TaskPoolImpl(int maxThreads,int minThreads)22 TaskPoolImpl::TaskPoolImpl(int maxThreads, int minThreads)
23 : genericTasks_(false),
24 genericTaskCount_(0),
25 queuedTaskCount_(0),
26 isStarted_(false),
27 isStopping_(false),
28 maxThreads_(maxThreads),
29 minThreads_(minThreads),
30 curThreads_(0),
31 idleThreads_(0)
32 {}
33
~TaskPoolImpl()34 TaskPoolImpl::~TaskPoolImpl()
35 {}
36
Start()37 int TaskPoolImpl::Start()
38 {
39 if (maxThreads_ < minThreads_) {
40 LOGE("Start task pool failed, maxThreads(%d) < minThreads(%d).",
41 maxThreads_, minThreads_);
42 return -E_INVALID_ARGS;
43 }
44 if (maxThreads_ <= 0) {
45 LOGE("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
46 return -E_INVALID_ARGS;
47 }
48 if (minThreads_ < 0) {
49 LOGE("Start task pool failed, minThreads(%d) < 0.", minThreads_);
50 return -E_INVALID_ARGS;
51 }
52 LOGI("Start task pool min:%d, max:%d", minThreads_, maxThreads_);
53 std::lock_guard<std::mutex> guard(tasksMutex_);
54 isStarted_ = true; // parameters checked ok.
55 isStopping_ = false;
56 int errCode = SpawnThreads(true);
57 if (errCode != E_OK) {
58 LOGW("Spawn threads failed when starting the task pool.");
59 // ignore the error, we will try when schedule().
60 }
61 return E_OK;
62 }
63
Stop()64 void TaskPoolImpl::Stop()
65 {
66 std::unique_lock<std::mutex> lock(tasksMutex_);
67 if (!isStarted_) {
68 return;
69 }
70 isStopping_ = true;
71 hasTasks_.notify_all();
72 allThreadsExited_.wait(lock, [this]() {
73 return this->curThreads_ <= 0;
74 });
75 isStarted_ = false;
76 }
77
Schedule(const Task & task)78 int TaskPoolImpl::Schedule(const Task &task)
79 {
80 if (!task) {
81 return -E_INVALID_ARGS;
82 }
83 std::lock_guard<std::mutex> guard(tasksMutex_);
84 if (!isStarted_) {
85 LOGE("Schedule failed, the task pool is not started.");
86 return -E_NOT_PERMIT;
87 }
88 if (isStopping_) {
89 LOGI("Schedule failed, the task pool is stopping.");
90 return -E_STALE;
91 }
92 genericTasks_.PutTask(task);
93 ++genericTaskCount_;
94 hasTasks_.notify_one();
95 TryToSpawnThreads();
96 return E_OK;
97 }
98
Schedule(const std::string & queueTag,const Task & task)99 int TaskPoolImpl::Schedule(const std::string &queueTag, const Task &task)
100 {
101 if (!task) {
102 return -E_INVALID_ARGS;
103 }
104 std::lock_guard<std::mutex> guard(tasksMutex_);
105 if (!isStarted_) {
106 LOGE("Schedule failed, the task pool is not started.");
107 return -E_NOT_PERMIT;
108 }
109 if (isStopping_) {
110 LOGI("Schedule failed, the task pool is stopping.");
111 return -E_STALE;
112 }
113 queuedTasks_[queueTag].PutTask(task);
114 ++queuedTaskCount_;
115 hasTasks_.notify_all();
116 TryToSpawnThreads();
117 return E_OK;
118 }
119
ShrinkMemory(const std::string & tag)120 void TaskPoolImpl::ShrinkMemory(const std::string &tag)
121 {
122 std::lock_guard<std::mutex> guard(tasksMutex_);
123 auto iter = queuedTasks_.find(tag);
124 if (iter != queuedTasks_.end()) {
125 if (iter->second.IsEmptyAndUnlocked()) {
126 queuedTasks_.erase(iter);
127 }
128 }
129 }
130
IdleExit(std::unique_lock<std::mutex> & lock)131 bool TaskPoolImpl::IdleExit(std::unique_lock<std::mutex> &lock)
132 {
133 if (isStopping_) {
134 return true;
135 }
136 ++idleThreads_;
137 bool isGenericWorker = IsGenericWorker();
138 if (!isGenericWorker && (curThreads_ > minThreads_)) {
139 std::cv_status status = hasTasks_.wait_for(lock,
140 std::chrono::seconds(IDLE_WAIT_PERIOD));
141 if (status == std::cv_status::timeout && IsExecutingTasksEmpty()) {
142 --idleThreads_;
143 return true;
144 }
145 } else {
146 if (isGenericWorker) {
147 hasTasks_.notify_all();
148 }
149 hasTasks_.wait(lock);
150 }
151 --idleThreads_;
152 return false;
153 }
154
SetThreadFree()155 void TaskPoolImpl::SetThreadFree()
156 {
157 for (auto &pair : queuedTasks_) {
158 TaskQueue *tq = &pair.second;
159 tq->ReleaseLock();
160 }
161 }
162
ReapTask(TaskQueue * & queue)163 Task TaskPoolImpl::ReapTask(TaskQueue *&queue)
164 {
165 Task task = genericTasks_.GetTaskAutoLock();
166 if (task != nullptr) {
167 queue = nullptr;
168 return task;
169 }
170
171 queue = nullptr;
172 if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
173 SetThreadFree();
174 return nullptr;
175 }
176 for (auto &pair : queuedTasks_) {
177 TaskQueue *tq = &pair.second;
178 task = tq->GetTaskAutoLock();
179 if (task != nullptr) {
180 queue = tq;
181 return task;
182 }
183 }
184 return nullptr;
185 }
186
GetTask(Task & task,TaskQueue * & queue)187 void TaskPoolImpl::GetTask(Task &task, TaskQueue *&queue)
188 {
189 std::unique_lock<std::mutex> lock(tasksMutex_);
190
191 while (true) {
192 task = ReapTask(queue);
193 if (task != nullptr) {
194 return;
195 }
196
197 if (IdleExit(lock)) {
198 break;
199 }
200 }
201 if (task == nullptr) {
202 // Idle thread exit.
203 if (IsGenericWorker()) {
204 genericThread_ = std::thread::id();
205 }
206 --curThreads_;
207 }
208 }
209
SpawnThreads(bool isStart)210 int TaskPoolImpl::SpawnThreads(bool isStart)
211 {
212 if (!isStarted_) {
213 LOGE("Spawn task pool threads failed, pool is not started.");
214 return -E_NOT_PERMIT;
215 }
216 if (curThreads_ >= maxThreads_) {
217 // the pool is full of threads.
218 return E_OK;
219 }
220
221 int limits = isStart ? minThreads_ : (curThreads_ + 1);
222 while (curThreads_ < limits) {
223 ++curThreads_;
224 std::thread thread([this]() {
225 TaskWorker();
226 });
227 LOGI("Task pool spawn cur:%d idle:%d.", curThreads_, idleThreads_);
228 thread.detach();
229 }
230 return E_OK;
231 }
232
IsGenericWorker() const233 bool TaskPoolImpl::IsGenericWorker() const
234 {
235 return genericThread_ == std::this_thread::get_id();
236 }
237
BecomeGenericWorker()238 void TaskPoolImpl::BecomeGenericWorker()
239 {
240 std::lock_guard<std::mutex> guard(tasksMutex_);
241 if (genericThread_ == std::thread::id()) {
242 genericThread_ = std::this_thread::get_id();
243 }
244 }
245
ExitWorker()246 void TaskPoolImpl::ExitWorker()
247 {
248 std::lock_guard<std::mutex> guard(tasksMutex_);
249 allThreadsExited_.notify_all();
250 LOGI("Task pool thread exit, cur:%d idle:%d, genericTaskCount:%d, queuedTaskCount:%d.",
251 curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
252 }
253
TaskWorker()254 void TaskPoolImpl::TaskWorker()
255 {
256 BecomeGenericWorker();
257
258 while (true) {
259 TaskQueue *taskQueue = nullptr;
260 Task task = nullptr;
261
262 GetTask(task, taskQueue);
263 if (task == nullptr) {
264 // Idle thread exit.
265 break;
266 }
267
268 task();
269 FinishExecuteTask(taskQueue);
270 }
271
272 ExitWorker();
273 }
274
FinishExecuteTask(TaskQueue * taskQueue)275 void TaskPoolImpl::FinishExecuteTask(TaskQueue *taskQueue)
276 {
277 std::lock_guard<std::mutex> guard(tasksMutex_);
278 if (taskQueue != nullptr) {
279 taskQueue->ReleaseLock();
280 --queuedTaskCount_;
281 } else {
282 --genericTaskCount_;
283 }
284 }
285
TryToSpawnThreads()286 void TaskPoolImpl::TryToSpawnThreads()
287 {
288 if ((curThreads_ >= maxThreads_) ||
289 (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
290 return;
291 }
292 (void)(SpawnThreads(false));
293 }
294
IsExecutingTasksEmpty() const295 bool TaskPoolImpl::IsExecutingTasksEmpty() const
296 {
297 if (genericTaskCount_ > 0) {
298 return false;
299 }
300 for (const auto &item: queuedTasks_) {
301 if (item.second.CanGetTask()) {
302 return false;
303 }
304 }
305 return true;
306 }
307 } // namespace DistributedDB
308