/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef NETSTACK_THREAD_POOL #define NETSTACK_THREAD_POOL #include #include #include #include #include namespace OHOS::NetStack { template class ThreadPool { public: /** * disallow default constructor */ ThreadPool() = delete; /** * disallow copy and move */ ThreadPool(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool &operator=(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool(ThreadPool &&) = delete; /** * disallow copy and move */ ThreadPool &operator=(ThreadPool &&) = delete; /** * make DEFAULT_THREAD_NUM threads * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated */ explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) { for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { std::thread([this] { RunTask(); }).detach(); } } /** * if ~ThreadPool, terminate all thread */ ~ThreadPool() { // set needRun_ = false, and notify all the thread to wake and terminate needRun_ = false; while (runningNum_ > 0) { needRunCondition_.notify_all(); } } /** * push it to taskQueue_ and notify a thread to run it * @param task new task to Execute */ void Push(const Task &task) { PushTask(task); if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { std::thread([this] { RunTask(); }).detach(); } needRunCondition_.notify_all(); } private: bool IsQueueEmpty() { std::lock_guard guard(mutex_); return taskQueue_.empty(); } bool GetTask(Task &task) { std::lock_guard guard(mutex_); // if taskQueue_ is empty, means timeout if (taskQueue_.empty()) { return false; } // if run to this line, means that taskQueue_ is not empty task = taskQueue_.top(); taskQueue_.pop(); return true; } void PushTask(const Task &task) { std::lock_guard guard(mutex_); taskQueue_.push(task); } class NumWrapper { public: NumWrapper() = delete; explicit NumWrapper(std::atomic &num) : num_(num) { ++num_; } ~NumWrapper() { --num_; } private: std::atomic &num_; }; void Sleep() { std::mutex needRunMutex; std::unique_lock lock(needRunMutex); /** * if the thread is waiting, it is idle * if wake up, this thread is not idle: * 1 this thread should return * 2 this thread should run task * 3 this thread should go to next loop */ NumWrapper idleWrapper(idleThreadNum_); (void)idleWrapper; needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), [this] { return !needRun_ || !IsQueueEmpty(); }); } void RunTask() { NumWrapper runningWrapper(runningNum_); (void)runningWrapper; while (needRun_) { Task task; if (GetTask(task)) { task.Execute(); continue; } Sleep(); if (!needRun_) { return; } if (GetTask(task)) { task.Execute(); continue; } if (runningNum_ > DEFAULT_THREAD_NUM) { return; } } } private: /** * other thread put a task to the taskQueue_ */ std::mutex mutex_; std::priority_queue taskQueue_; /** * 1 terminate the thread if it is idle for timeout_ seconds * 2 wait for the thread started util timeout_ * 3 wait for the thread notified util timeout_ * 4 wait for the thread terminated util timeout_ */ uint32_t timeout_; /** * if idleThreadNum_ is zero, make a new thread */ std::atomic idleThreadNum_; /** * when ThreadPool object is deleted, wait until runningNum_ is zero. */ std::atomic runningNum_; /** * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated */ std::atomic_bool needRun_; std::condition_variable needRunCondition_; }; } // namespace OHOS::NetStack #endif /* NETSTACK_THREAD_POOL */