1 /*
2 * Copyright (C) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "thumbnail_generate_worker.h"
17
18 #include <pthread.h>
19
20 #include "medialibrary_errno.h"
21 #include "medialibrary_notify.h"
22 #include "media_log.h"
23
24 namespace OHOS {
25 namespace Media {
26 static constexpr int32_t THREAD_NUM_FOREGROUND = 4;
27 static constexpr int32_t THREAD_NUM_BACKGROUND = 2;
28 constexpr size_t TASK_INSERT_COUNT = 15;
29 constexpr size_t CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL = 270000;
30
~ThumbnailGenerateWorker()31 ThumbnailGenerateWorker::~ThumbnailGenerateWorker()
32 {
33 ClearWorkerThreads();
34
35 if (timerId_ != 0) {
36 timer_.Unregister(timerId_);
37 timer_.Shutdown();
38 timerId_ = 0;
39 }
40 }
41
Init(const ThumbnailTaskType & taskType)42 int32_t ThumbnailGenerateWorker::Init(const ThumbnailTaskType &taskType)
43 {
44 std::unique_lock<std::mutex> lock(taskMutex_);
45 if (!threads_.empty()) {
46 return E_OK;
47 }
48
49 MEDIA_INFO_LOG("threads empty, need to init, taskType:%{public}d", taskType);
50 int32_t threadNum;
51 std::string threadName;
52 taskType_ = taskType;
53 CpuAffinityType cpuAffinityType;
54 if (taskType == ThumbnailTaskType::FOREGROUND) {
55 threadNum = THREAD_NUM_FOREGROUND;
56 threadName = THREAD_NAME_FOREGROUND;
57 cpuAffinityType = CpuAffinityType::CPU_IDX_9;
58 } else if (taskType == ThumbnailTaskType::BACKGROUND) {
59 threadNum = THREAD_NUM_BACKGROUND;
60 threadName = THREAD_NAME_BACKGROUND;
61 cpuAffinityType = CpuAffinityType::CPU_IDX_9;
62 } else {
63 MEDIA_ERR_LOG("invalid task type");
64 return E_ERR;
65 }
66
67 isThreadRunning_ = true;
68 for (auto i = 0; i < threadNum; i++) {
69 std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus =
70 std::make_shared<ThumbnailGenerateThreadStatus>(i);
71 threadStatus->cpuAffinityType = cpuAffinityType;
72 std::thread thread([this, threadStatus] { this->StartWorker(threadStatus); });
73 pthread_setname_np(thread.native_handle(), threadName.c_str());
74 threads_.emplace_back(std::move(thread));
75 threadsStatus_.emplace_back(threadStatus);
76 }
77 lock.unlock();
78 RegisterWorkerTimer();
79 return E_OK;
80 }
81
ReleaseTaskQueue(const ThumbnailTaskPriority & taskPriority)82 int32_t ThumbnailGenerateWorker::ReleaseTaskQueue(const ThumbnailTaskPriority &taskPriority)
83 {
84 if (taskPriority == ThumbnailTaskPriority::HIGH) {
85 highPriorityTaskQueue_.Clear();
86 } else if (taskPriority == ThumbnailTaskPriority::MID) {
87 midPriorityTaskQueue_.Clear();
88 } else if (taskPriority == ThumbnailTaskPriority::LOW) {
89 lowPriorityTaskQueue_.Clear();
90 } else {
91 MEDIA_ERR_LOG("invalid task priority");
92 return E_ERR;
93 }
94 return E_OK;
95 }
96
AddTask(const std::shared_ptr<ThumbnailGenerateTask> & task,const ThumbnailTaskPriority & taskPriority)97 int32_t ThumbnailGenerateWorker::AddTask(
98 const std::shared_ptr<ThumbnailGenerateTask> &task, const ThumbnailTaskPriority &taskPriority)
99 {
100 IncreaseRequestIdTaskNum(task);
101 if (taskPriority == ThumbnailTaskPriority::HIGH) {
102 highPriorityTaskQueue_.Push(task);
103 } else if (taskPriority == ThumbnailTaskPriority::MID) {
104 midPriorityTaskQueue_.Push(task);
105 } else if (taskPriority == ThumbnailTaskPriority::LOW) {
106 lowPriorityTaskQueue_.Push(task);
107 } else {
108 MEDIA_ERR_LOG("invalid task priority");
109 return E_ERR;
110 }
111
112 Init(taskType_);
113 workerCv_.notify_one();
114 return E_OK;
115 }
116
IgnoreTaskByRequestId(int32_t requestId)117 void ThumbnailGenerateWorker::IgnoreTaskByRequestId(int32_t requestId)
118 {
119 if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty()) {
120 MEDIA_INFO_LOG("task queue empty, no need to ignore task requestId: %{public}d", requestId);
121 return;
122 }
123 ignoreRequestId_.store(requestId);
124
125 std::unique_lock<std::mutex> lock(requestIdMapLock_);
126 requestIdTaskMap_.erase(requestId);
127 MEDIA_INFO_LOG("IgnoreTaskByRequestId, requestId: %{public}d", requestId);
128 }
129
WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)130 bool ThumbnailGenerateWorker::WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
131 {
132 std::unique_lock<std::mutex> lock(workerLock_);
133 if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty() &&
134 isThreadRunning_) {
135 ignoreRequestId_ = 0;
136 threadStatus->isThreadWaiting_ = true;
137 bool ret = workerCv_.wait_for(lock, std::chrono::milliseconds(CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL), [this]() {
138 return !isThreadRunning_ || !highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() ||
139 !lowPriorityTaskQueue_.Empty();
140 });
141 if (!ret) {
142 MEDIA_INFO_LOG("Wait for task timeout");
143 return false;
144 }
145 }
146 threadStatus->isThreadWaiting_ = false;
147 return isThreadRunning_;
148 }
149
StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)150 void ThumbnailGenerateWorker::StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
151 {
152 std::string name("ThumbnailGenerateWorker");
153 pthread_setname_np(pthread_self(), name.c_str());
154 MEDIA_INFO_LOG("ThumbnailGenerateWorker thread start, taskType:%{public}d, id:%{public}d, "
155 "cpuAffinityType:%{public}d", taskType_, threadStatus->threadId_, threadStatus->cpuAffinityType);
156 while (isThreadRunning_) {
157 if (!WaitForTask(threadStatus)) {
158 continue;
159 }
160 CpuUtils::SetSelfThreadAffinity(threadStatus->cpuAffinityType);
161 std::shared_ptr<ThumbnailGenerateTask> task;
162 if (!highPriorityTaskQueue_.Empty() && highPriorityTaskQueue_.Pop(task) && task != nullptr) {
163 if (NeedIgnoreTask(task->data_->requestId_)) {
164 continue;
165 }
166 task->executor_(task->data_);
167 ++(threadStatus->taskNum_);
168 DecreaseRequestIdTaskNum(task);
169 continue;
170 }
171
172 if (!midPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Pop(task) && task != nullptr) {
173 if (NeedIgnoreTask(task->data_->requestId_)) {
174 continue;
175 }
176 task->executor_(task->data_);
177 ++(threadStatus->taskNum_);
178 DecreaseRequestIdTaskNum(task);
179 continue;
180 }
181
182 if (!lowPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Pop(task) && task != nullptr) {
183 if (NeedIgnoreTask(task->data_->requestId_)) {
184 continue;
185 }
186 task->executor_(task->data_);
187 ++(threadStatus->taskNum_);
188 DecreaseRequestIdTaskNum(task);
189 }
190 }
191 MEDIA_INFO_LOG("ThumbnailGenerateWorker thread finish, taskType:%{public}d, id:%{public}d",
192 taskType_, threadStatus->threadId_);
193 }
194
NeedIgnoreTask(int32_t requestId)195 bool ThumbnailGenerateWorker::NeedIgnoreTask(int32_t requestId)
196 {
197 return ignoreRequestId_ != 0 && ignoreRequestId_ == requestId;
198 }
199
IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)200 void ThumbnailGenerateWorker::IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
201 {
202 if (task == nullptr || task->data_->requestId_ == 0) {
203 // If requestId is 0, no need to manager this task.
204 return;
205 }
206
207 std::unique_lock<std::mutex> lock(requestIdMapLock_);
208 int32_t requestId = task->data_->requestId_;
209 auto pos = requestIdTaskMap_.find(requestId);
210 if (pos == requestIdTaskMap_.end()) {
211 requestIdTaskMap_.insert(std::make_pair(requestId, 1));
212 return;
213 }
214 ++(pos->second);
215 }
216
DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)217 void ThumbnailGenerateWorker::DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
218 {
219 if (task == nullptr || task->data_->requestId_ == 0) {
220 // If requestId is 0, no need to manager this task.
221 return;
222 }
223
224 std::unique_lock<std::mutex> lock(requestIdMapLock_);
225 int32_t requestId = task->data_->requestId_;
226 auto pos = requestIdTaskMap_.find(requestId);
227 if (pos == requestIdTaskMap_.end()) {
228 return;
229 }
230
231 if (--(pos->second) == 0) {
232 requestIdTaskMap_.erase(requestId);
233 NotifyTaskFinished(requestId);
234 }
235 }
236
NotifyTaskFinished(int32_t requestId)237 void ThumbnailGenerateWorker::NotifyTaskFinished(int32_t requestId)
238 {
239 auto watch = MediaLibraryNotify::GetInstance();
240 if (watch == nullptr) {
241 MEDIA_ERR_LOG("watch is nullptr");
242 return;
243 }
244 std::string notifyUri = PhotoColumn::PHOTO_URI_PREFIX + std::to_string(requestId);
245 watch->Notify(notifyUri, NotifyType::NOTIFY_THUMB_UPDATE);
246 }
247
ClearWorkerThreads()248 void ThumbnailGenerateWorker::ClearWorkerThreads()
249 {
250 {
251 std::unique_lock<std::mutex> lock(workerLock_);
252 isThreadRunning_ = false;
253 }
254 ignoreRequestId_ = 0;
255 workerCv_.notify_all();
256 for (auto &thread : threads_) {
257 if (!thread.joinable()) {
258 continue;
259 }
260 thread.join();
261 }
262 threadsStatus_.clear();
263 threads_.clear();
264 MEDIA_INFO_LOG("Clear ThumbnailGenerateWorker threads successfully, taskType:%{public}d", taskType_);
265 }
266
IsAllThreadWaiting()267 bool ThumbnailGenerateWorker::IsAllThreadWaiting()
268 {
269 std::unique_lock<std::mutex> lock(workerLock_);
270 if (!highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() || !lowPriorityTaskQueue_.Empty()) {
271 MEDIA_INFO_LOG("task queue is not empty, no need to clear worker threads, taskType:%{public}d", taskType_);
272 return false;
273 }
274
275 for (auto &threadStatus : threadsStatus_) {
276 if (!threadStatus->isThreadWaiting_) {
277 MEDIA_INFO_LOG("thread is running, taskType:%{public}d, id:%{public}d, taskNum:%{public}d",
278 taskType_, threadStatus->threadId_, threadStatus->taskNum_);
279 return false;
280 }
281 }
282 isThreadRunning_ = false;
283 return true;
284 }
285
TryClearWorkerThreads()286 void ThumbnailGenerateWorker::TryClearWorkerThreads()
287 {
288 std::unique_lock<std::mutex> lock(taskMutex_);
289 if (!IsAllThreadWaiting()) {
290 return;
291 }
292
293 ClearWorkerThreads();
294 }
295
RegisterWorkerTimer()296 void ThumbnailGenerateWorker::RegisterWorkerTimer()
297 {
298 Utils::Timer::TimerCallback timerCallback = [this]() {
299 MEDIA_INFO_LOG("ThumbnailGenerateWorker timerCallback, ClearWorkerThreads, taskType:%{public}d", taskType_);
300 TryClearWorkerThreads();
301 };
302
303 std::lock_guard<std::mutex> lock(timerMutex_);
304 if (timerId_ == 0) {
305 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Setup, taskType:%{public}d", taskType_);
306 timer_.Setup();
307 } else {
308 timer_.Unregister(timerId_);
309 }
310
311 timerId_ = timer_.Register(timerCallback, CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL, false);
312 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Restart, taskType:%{public}d, timeId:%{public}u",
313 taskType_, timerId_);
314 }
315
TryCloseTimer()316 void ThumbnailGenerateWorker::TryCloseTimer()
317 {
318 std::lock_guard<std::mutex> lock(timerMutex_);
319 if (threads_.empty() && timerId_ != 0) {
320 timer_.Unregister(timerId_);
321 timer_.Shutdown();
322 timerId_ = 0;
323 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Shutdown, taskType:%{public}d", taskType_);
324 }
325 }
326 } // namespace Media
327 } // namespace OHOS