1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef NETSTACK_THREAD_POOL 17 #define NETSTACK_THREAD_POOL 18 19 #include <atomic> 20 #include <condition_variable> 21 #include <queue> 22 #include <thread> 23 #include <vector> 24 25 namespace OHOS::NetStack { 26 template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool { 27 public: 28 /** 29 * disallow default constructor 30 */ 31 ThreadPool() = delete; 32 33 /** 34 * disallow copy and move 35 */ 36 ThreadPool(const ThreadPool &) = delete; 37 38 /** 39 * disallow copy and move 40 */ 41 ThreadPool &operator=(const ThreadPool &) = delete; 42 43 /** 44 * disallow copy and move 45 */ 46 ThreadPool(ThreadPool &&) = delete; 47 48 /** 49 * disallow copy and move 50 */ 51 ThreadPool &operator=(ThreadPool &&) = delete; 52 53 /** 54 * make DEFAULT_THREAD_NUM threads 55 * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated 56 */ ThreadPool(uint32_t timeout)57 explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) 58 { 59 for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { 60 std::thread([this] { RunTask(); }).detach(); 61 } 62 } 63 64 /** 65 * if ~ThreadPool, terminate all thread 66 */ ~ThreadPool()67 ~ThreadPool() 68 { 69 // set needRun_ = false, and notify all the thread to wake and terminate 70 needRun_ = false; 71 while (runningNum_ > 0) { 72 needRunCondition_.notify_all(); 73 } 74 } 75 76 /** 77 * push it to taskQueue_ and notify a thread to run it 78 * @param task new task to Execute 79 */ Push(const Task & task)80 void Push(const Task &task) 81 { 82 PushTask(task); 83 84 if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { 85 std::thread([this] { RunTask(); }).detach(); 86 } 87 88 needRunCondition_.notify_all(); 89 } 90 91 private: IsQueueEmpty()92 bool IsQueueEmpty() 93 { 94 std::lock_guard<std::mutex> guard(mutex_); 95 return taskQueue_.empty(); 96 } 97 GetTask(Task & task)98 bool GetTask(Task &task) 99 { 100 std::lock_guard<std::mutex> guard(mutex_); 101 102 // if taskQueue_ is empty, means timeout 103 if (taskQueue_.empty()) { 104 return false; 105 } 106 107 // if run to this line, means that taskQueue_ is not empty 108 task = taskQueue_.top(); 109 taskQueue_.pop(); 110 return true; 111 } 112 PushTask(const Task & task)113 void PushTask(const Task &task) 114 { 115 std::lock_guard<std::mutex> guard(mutex_); 116 taskQueue_.push(task); 117 } 118 119 class NumWrapper { 120 public: 121 NumWrapper() = delete; 122 NumWrapper(std::atomic<uint32_t> & num)123 explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num) 124 { 125 ++num_; 126 } 127 ~NumWrapper()128 ~NumWrapper() 129 { 130 --num_; 131 } 132 133 private: 134 std::atomic<uint32_t> &num_; 135 }; 136 Sleep()137 void Sleep() 138 { 139 std::mutex needRunMutex; 140 std::unique_lock<std::mutex> lock(needRunMutex); 141 142 /** 143 * if the thread is waiting, it is idle 144 * if wake up, this thread is not idle: 145 * 1 this thread should return 146 * 2 this thread should run task 147 * 3 this thread should go to next loop 148 */ 149 NumWrapper idleWrapper(idleThreadNum_); 150 (void)idleWrapper; 151 152 needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), 153 [this] { return !needRun_ || !IsQueueEmpty(); }); 154 } 155 RunTask()156 void RunTask() 157 { 158 NumWrapper runningWrapper(runningNum_); 159 (void)runningWrapper; 160 161 while (needRun_) { 162 Task task; 163 if (GetTask(task)) { 164 task.Execute(); 165 continue; 166 } 167 168 Sleep(); 169 170 if (!needRun_) { 171 return; 172 } 173 174 if (GetTask(task)) { 175 task.Execute(); 176 continue; 177 } 178 179 if (runningNum_ > DEFAULT_THREAD_NUM) { 180 return; 181 } 182 } 183 } 184 185 private: 186 /** 187 * other thread put a task to the taskQueue_ 188 */ 189 std::mutex mutex_; 190 std::priority_queue<Task> taskQueue_; 191 /** 192 * 1 terminate the thread if it is idle for timeout_ seconds 193 * 2 wait for the thread started util timeout_ 194 * 3 wait for the thread notified util timeout_ 195 * 4 wait for the thread terminated util timeout_ 196 */ 197 uint32_t timeout_; 198 /** 199 * if idleThreadNum_ is zero, make a new thread 200 */ 201 std::atomic<uint32_t> idleThreadNum_; 202 /** 203 * when ThreadPool object is deleted, wait until runningNum_ is zero. 204 */ 205 std::atomic<uint32_t> runningNum_; 206 /** 207 * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated 208 */ 209 std::atomic_bool needRun_; 210 std::condition_variable needRunCondition_; 211 }; 212 } // namespace OHOS::NetStack 213 #endif /* NETSTACK_THREAD_POOL */ 214