1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "task_pool_impl.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18 
19 namespace DistributedDB {
20 constexpr int TaskPoolImpl::IDLE_WAIT_PERIOD;
21 
TaskPoolImpl(int maxThreads,int minThreads)22 TaskPoolImpl::TaskPoolImpl(int maxThreads, int minThreads)
23     : genericTasks_(false),
24       genericTaskCount_(0),
25       queuedTaskCount_(0),
26       isStarted_(false),
27       isStopping_(false),
28       maxThreads_(maxThreads),
29       minThreads_(minThreads),
30       curThreads_(0),
31       idleThreads_(0)
32 {}
33 
~TaskPoolImpl()34 TaskPoolImpl::~TaskPoolImpl()
35 {}
36 
Start()37 int TaskPoolImpl::Start()
38 {
39     if (maxThreads_ < minThreads_) {
40         LOGE("Start task pool failed, maxThreads(%d) < minThreads(%d).",
41             maxThreads_, minThreads_);
42         return -E_INVALID_ARGS;
43     }
44     if (maxThreads_ <= 0) {
45         LOGE("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
46         return -E_INVALID_ARGS;
47     }
48     if (minThreads_ < 0) {
49         LOGE("Start task pool failed, minThreads(%d) < 0.", minThreads_);
50         return -E_INVALID_ARGS;
51     }
52     LOGI("Start task pool min:%d, max:%d", minThreads_, maxThreads_);
53     std::lock_guard<std::mutex> guard(tasksMutex_);
54     isStarted_ = true; // parameters checked ok.
55     isStopping_ = false;
56     int errCode = SpawnThreads(true);
57     if (errCode != E_OK) {
58         LOGW("Spawn threads failed when starting the task pool.");
59         // ignore the error, we will try when schedule().
60     }
61     return E_OK;
62 }
63 
Stop()64 void TaskPoolImpl::Stop()
65 {
66     std::unique_lock<std::mutex> lock(tasksMutex_);
67     if (!isStarted_) {
68         return;
69     }
70     isStopping_ = true;
71     hasTasks_.notify_all();
72     allThreadsExited_.wait(lock, [this]() {
73         return this->curThreads_ <= 0;
74     });
75     isStarted_ = false;
76 }
77 
Schedule(const Task & task)78 int TaskPoolImpl::Schedule(const Task &task)
79 {
80     if (!task) {
81         return -E_INVALID_ARGS;
82     }
83     std::lock_guard<std::mutex> guard(tasksMutex_);
84     if (!isStarted_) {
85         LOGE("Schedule failed, the task pool is not started.");
86         return -E_NOT_PERMIT;
87     }
88     if (isStopping_) {
89         LOGI("Schedule failed, the task pool is stopping.");
90         return -E_STALE;
91     }
92     genericTasks_.PutTask(task);
93     ++genericTaskCount_;
94     hasTasks_.notify_one();
95     TryToSpawnThreads();
96     return E_OK;
97 }
98 
Schedule(const std::string & queueTag,const Task & task)99 int TaskPoolImpl::Schedule(const std::string &queueTag, const Task &task)
100 {
101     if (!task) {
102         return -E_INVALID_ARGS;
103     }
104     std::lock_guard<std::mutex> guard(tasksMutex_);
105     if (!isStarted_) {
106         LOGE("Schedule failed, the task pool is not started.");
107         return -E_NOT_PERMIT;
108     }
109     if (isStopping_) {
110         LOGI("Schedule failed, the task pool is stopping.");
111         return -E_STALE;
112     }
113     queuedTasks_[queueTag].PutTask(task);
114     ++queuedTaskCount_;
115     hasTasks_.notify_all();
116     TryToSpawnThreads();
117     return E_OK;
118 }
119 
ShrinkMemory(const std::string & tag)120 void TaskPoolImpl::ShrinkMemory(const std::string &tag)
121 {
122     std::lock_guard<std::mutex> guard(tasksMutex_);
123     auto iter = queuedTasks_.find(tag);
124     if (iter != queuedTasks_.end()) {
125         if (iter->second.IsEmptyAndUnlocked()) {
126             queuedTasks_.erase(iter);
127         }
128     }
129 }
130 
IdleExit(std::unique_lock<std::mutex> & lock)131 bool TaskPoolImpl::IdleExit(std::unique_lock<std::mutex> &lock)
132 {
133     if (isStopping_) {
134         return true;
135     }
136     ++idleThreads_;
137     bool isGenericWorker = IsGenericWorker();
138     if (!isGenericWorker && (curThreads_ > minThreads_)) {
139         std::cv_status status = hasTasks_.wait_for(lock,
140             std::chrono::seconds(IDLE_WAIT_PERIOD));
141         if (status == std::cv_status::timeout && IsExecutingTasksEmpty()) {
142             --idleThreads_;
143             return true;
144         }
145     } else {
146         if (isGenericWorker) {
147             hasTasks_.notify_all();
148         }
149         hasTasks_.wait(lock);
150     }
151     --idleThreads_;
152     return false;
153 }
154 
SetThreadFree()155 void TaskPoolImpl::SetThreadFree()
156 {
157     for (auto &pair : queuedTasks_) {
158         TaskQueue *tq = &pair.second;
159         tq->ReleaseLock();
160     }
161 }
162 
ReapTask(TaskQueue * & queue)163 Task TaskPoolImpl::ReapTask(TaskQueue *&queue)
164 {
165     Task task = genericTasks_.GetTaskAutoLock();
166     if (task != nullptr) {
167         queue = nullptr;
168         return task;
169     }
170 
171     queue = nullptr;
172     if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
173         SetThreadFree();
174         return nullptr;
175     }
176     for (auto &pair : queuedTasks_) {
177         TaskQueue *tq = &pair.second;
178         task = tq->GetTaskAutoLock();
179         if (task != nullptr) {
180             queue = tq;
181             return task;
182         }
183     }
184     return nullptr;
185 }
186 
GetTask(Task & task,TaskQueue * & queue)187 void TaskPoolImpl::GetTask(Task &task, TaskQueue *&queue)
188 {
189     std::unique_lock<std::mutex> lock(tasksMutex_);
190 
191     while (true) {
192         task = ReapTask(queue);
193         if (task != nullptr) {
194             return;
195         }
196 
197         if (IdleExit(lock)) {
198             break;
199         }
200     }
201     if (task == nullptr) {
202         // Idle thread exit.
203         if (IsGenericWorker()) {
204             genericThread_ = std::thread::id();
205         }
206         --curThreads_;
207     }
208 }
209 
SpawnThreads(bool isStart)210 int TaskPoolImpl::SpawnThreads(bool isStart)
211 {
212     if (!isStarted_) {
213         LOGE("Spawn task pool threads failed, pool is not started.");
214         return -E_NOT_PERMIT;
215     }
216     if (curThreads_ >= maxThreads_) {
217         // the pool is full of threads.
218         return E_OK;
219     }
220 
221     int limits = isStart ? minThreads_ : (curThreads_ + 1);
222     while (curThreads_ < limits) {
223         ++curThreads_;
224         std::thread thread([this]() {
225             TaskWorker();
226         });
227         LOGI("Task pool spawn cur:%d idle:%d.", curThreads_, idleThreads_);
228         thread.detach();
229     }
230     return E_OK;
231 }
232 
IsGenericWorker() const233 bool TaskPoolImpl::IsGenericWorker() const
234 {
235     return genericThread_ == std::this_thread::get_id();
236 }
237 
BecomeGenericWorker()238 void TaskPoolImpl::BecomeGenericWorker()
239 {
240     std::lock_guard<std::mutex> guard(tasksMutex_);
241     if (genericThread_ == std::thread::id()) {
242         genericThread_ = std::this_thread::get_id();
243     }
244 }
245 
ExitWorker()246 void TaskPoolImpl::ExitWorker()
247 {
248     std::lock_guard<std::mutex> guard(tasksMutex_);
249     allThreadsExited_.notify_all();
250     LOGI("Task pool thread exit, cur:%d idle:%d, genericTaskCount:%d, queuedTaskCount:%d.",
251         curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
252 }
253 
TaskWorker()254 void TaskPoolImpl::TaskWorker()
255 {
256     BecomeGenericWorker();
257 
258     while (true) {
259         TaskQueue *taskQueue = nullptr;
260         Task task = nullptr;
261 
262         GetTask(task, taskQueue);
263         if (task == nullptr) {
264             // Idle thread exit.
265             break;
266         }
267 
268         task();
269         FinishExecuteTask(taskQueue);
270     }
271 
272     ExitWorker();
273 }
274 
FinishExecuteTask(TaskQueue * taskQueue)275 void TaskPoolImpl::FinishExecuteTask(TaskQueue *taskQueue)
276 {
277     std::lock_guard<std::mutex> guard(tasksMutex_);
278     if (taskQueue != nullptr) {
279         taskQueue->ReleaseLock();
280         --queuedTaskCount_;
281     } else {
282         --genericTaskCount_;
283     }
284 }
285 
TryToSpawnThreads()286 void TaskPoolImpl::TryToSpawnThreads()
287 {
288     if ((curThreads_ >= maxThreads_) ||
289         (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
290         return;
291     }
292     (void)(SpawnThreads(false));
293 }
294 
IsExecutingTasksEmpty() const295 bool TaskPoolImpl::IsExecutingTasksEmpty() const
296 {
297     if (genericTaskCount_ > 0) {
298         return false;
299     }
300     for (const auto &item: queuedTasks_) {
301         if (item.second.CanGetTask()) {
302             return false;
303         }
304     }
305     return true;
306 }
307 } // namespace DistributedDB
308