1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "platform/threadpool/include/thread_pool.h"
17
18 #include "platform/time/include/time.h"
19 #include "platform/time/include/time_elapser.h"
20
21 namespace OHOS {
22 namespace AI {
23 std::mutex ThreadPool::mutex_;
24 ThreadPool *ThreadPool::instance_ = nullptr;
25
GetInstance()26 ThreadPool *ThreadPool::GetInstance()
27 {
28 CHK_RET(instance_ != nullptr, instance_);
29
30 std::lock_guard<std::mutex> lock(mutex_);
31 CHK_RET(instance_ != nullptr, instance_);
32
33 AIE_NEW(instance_, ThreadPool);
34
35 return instance_;
36 }
37
ReleaseInstance()38 void ThreadPool::ReleaseInstance()
39 {
40 std::lock_guard<std::mutex> lock(mutex_);
41
42 AIE_DELETE(instance_);
43 }
44
ThreadPool()45 ThreadPool::ThreadPool()
46 : stackSize_(THREAD_DEFAULT_STACK_SIZE)
47 {
48 }
49
~ThreadPool()50 ThreadPool::~ThreadPool()
51 {
52 StopThreads();
53 }
54
SetStackSize(size_t size)55 void ThreadPool::SetStackSize(size_t size)
56 {
57 stackSize_ = size;
58 }
59
getStackSize()60 int ThreadPool::getStackSize()
61 {
62 return stackSize_;
63 }
64
Pop()65 std::shared_ptr<Thread> ThreadPool::Pop()
66 {
67 std::lock_guard<std::mutex> guard(mutex4Inner_);
68
69 std::shared_ptr<Thread> thread;
70 if (idleThreads_.empty()) {
71 Thread *ptr = nullptr;
72 AIE_NEW(ptr, Thread);
73 CHK_RET(ptr == nullptr, nullptr);
74
75 ptr->SetStackSize(stackSize_);
76 thread.reset(ptr);
77 } else {
78 thread = idleThreads_.front();
79 idleThreads_.pop_front();
80 }
81
82 busyThreads_.push_back(thread);
83
84 return thread;
85 }
86
Push(std::shared_ptr<Thread> & thread)87 void ThreadPool::Push(std::shared_ptr<Thread> &thread)
88 {
89 thread->StopThread();
90
91 std::lock_guard<std::mutex> guard(mutex4Inner_);
92 for (auto iter = busyThreads_.begin(); iter != busyThreads_.end(); ++iter) {
93 if ((*iter)->GetThreadId() == thread->GetThreadId()) {
94 busyThreads_.erase(iter);
95 break;
96 }
97 }
98
99 idleThreads_.push_back(thread);
100 }
101
StopThreads()102 void ThreadPool::StopThreads()
103 {
104 (void)StopThreads(THREAD_STOP_TIMEOUT);
105 }
106
StopThreads(int32_t timeOut)107 bool ThreadPool::StopThreads(int32_t timeOut)
108 {
109 std::lock_guard<std::mutex> guard(mutex4Inner_);
110
111 for (auto &thread : busyThreads_) {
112 thread->running_ = false;
113 }
114
115 time_t now = GetCurTimeSec();
116
117 TimeElapser elapser;
118
119 while (true) {
120 // judge if the wait time over timeOut
121 CHK_RET(static_cast<int32_t>(elapser.ElapseMilli()) > timeOut, false);
122
123 // count the num of busy threads
124 int32_t activeThreadCount = 0;
125 for (auto &thread : busyThreads_) {
126 if (!thread->IsActive()) {
127 continue;
128 }
129 // if thread is Hung, stop failed
130 CHK_RET(thread->IsHung(now), false);
131 ++activeThreadCount;
132 }
133
134 if (activeThreadCount == 0) {
135 break;
136 }
137
138 (void)StepSleepMs(THREAD_SLEEP_TIME);
139 }
140
141 // WaitForEnd
142 for (auto &thread : busyThreads_) {
143 thread->WaitForEnd();
144 idleThreads_.push_back(thread);
145 }
146
147 busyThreads_.clear();
148
149 return true;
150 }
151 } // namespace AI
152 } // namespace OHOS
153