1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef NETSTACK_THREAD_POOL
17 #define NETSTACK_THREAD_POOL
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <queue>
22 #include <thread>
23 #include <vector>
24 
25 namespace OHOS::NetStack {
26 template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
27 public:
28     /**
29      * disallow default constructor
30      */
31     ThreadPool() = delete;
32 
33     /**
34      * disallow copy and move
35      */
36     ThreadPool(const ThreadPool &) = delete;
37 
38     /**
39      * disallow copy and move
40      */
41     ThreadPool &operator=(const ThreadPool &) = delete;
42 
43     /**
44      * disallow copy and move
45      */
46     ThreadPool(ThreadPool &&) = delete;
47 
48     /**
49      * disallow copy and move
50      */
51     ThreadPool &operator=(ThreadPool &&) = delete;
52 
53     /**
54      * make DEFAULT_THREAD_NUM threads
55      * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
56      */
ThreadPool(uint32_t timeout)57     explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
58     {
59         for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
60             std::thread([this] { RunTask(); }).detach();
61         }
62     }
63 
64     /**
65      * if ~ThreadPool, terminate all thread
66      */
~ThreadPool()67     ~ThreadPool()
68     {
69         // set needRun_ = false, and notify all the thread to wake and terminate
70         needRun_ = false;
71         while (runningNum_ > 0) {
72             needRunCondition_.notify_all();
73         }
74     }
75 
76     /**
77      * push it to taskQueue_ and notify a thread to run it
78      * @param task new task to Execute
79      */
Push(const Task & task)80     void Push(const Task &task)
81     {
82         PushTask(task);
83 
84         if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
85             std::thread([this] { RunTask(); }).detach();
86         }
87 
88         needRunCondition_.notify_all();
89     }
90 
91 private:
IsQueueEmpty()92     bool IsQueueEmpty()
93     {
94         std::lock_guard<std::mutex> guard(mutex_);
95         return taskQueue_.empty();
96     }
97 
GetTask(Task & task)98     bool GetTask(Task &task)
99     {
100         std::lock_guard<std::mutex> guard(mutex_);
101 
102         // if taskQueue_ is empty, means timeout
103         if (taskQueue_.empty()) {
104             return false;
105         }
106 
107         // if run to this line, means that taskQueue_ is not empty
108         task = taskQueue_.top();
109         taskQueue_.pop();
110         return true;
111     }
112 
PushTask(const Task & task)113     void PushTask(const Task &task)
114     {
115         std::lock_guard<std::mutex> guard(mutex_);
116         taskQueue_.push(task);
117     }
118 
119     class NumWrapper {
120     public:
121         NumWrapper() = delete;
122 
NumWrapper(std::atomic<uint32_t> & num)123         explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
124         {
125             ++num_;
126         }
127 
~NumWrapper()128         ~NumWrapper()
129         {
130             --num_;
131         }
132 
133     private:
134         std::atomic<uint32_t> &num_;
135     };
136 
Sleep()137     void Sleep()
138     {
139         std::mutex needRunMutex;
140         std::unique_lock<std::mutex> lock(needRunMutex);
141 
142         /**
143          * if the thread is waiting, it is idle
144          * if wake up, this thread is not idle:
145          *     1 this thread should return
146          *     2 this thread should run task
147          *     3 this thread should go to next loop
148          */
149         NumWrapper idleWrapper(idleThreadNum_);
150         (void)idleWrapper;
151 
152         needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
153                                    [this] { return !needRun_ || !IsQueueEmpty(); });
154     }
155 
RunTask()156     void RunTask()
157     {
158         NumWrapper runningWrapper(runningNum_);
159         (void)runningWrapper;
160 
161         while (needRun_) {
162             Task task;
163             if (GetTask(task)) {
164                 task.Execute();
165                 continue;
166             }
167 
168             Sleep();
169 
170             if (!needRun_) {
171                 return;
172             }
173 
174             if (GetTask(task)) {
175                 task.Execute();
176                 continue;
177             }
178 
179             if (runningNum_ > DEFAULT_THREAD_NUM) {
180                 return;
181             }
182         }
183     }
184 
185 private:
186     /**
187      * other thread put a task to the taskQueue_
188      */
189     std::mutex mutex_;
190     std::priority_queue<Task> taskQueue_;
191     /**
192      * 1 terminate the thread if it is idle for timeout_ seconds
193      * 2 wait for the thread started util timeout_
194      * 3 wait for the thread notified util timeout_
195      * 4 wait for the thread terminated util timeout_
196      */
197     uint32_t timeout_;
198     /**
199      * if idleThreadNum_ is zero, make a new thread
200      */
201     std::atomic<uint32_t> idleThreadNum_;
202     /**
203      * when ThreadPool object is deleted, wait until runningNum_ is zero.
204      */
205     std::atomic<uint32_t> runningNum_;
206     /**
207      * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
208      */
209     std::atomic_bool needRun_;
210     std::condition_variable needRunCondition_;
211 };
212 } // namespace OHOS::NetStack
213 #endif /* NETSTACK_THREAD_POOL */
214