1 /*
2  * Copyright (C) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "thumbnail_generate_worker.h"
17 
18 #include <pthread.h>
19 
20 #include "medialibrary_errno.h"
21 #include "medialibrary_notify.h"
22 #include "media_log.h"
23 
24 namespace OHOS {
25 namespace Media {
26 static constexpr int32_t THREAD_NUM_FOREGROUND = 4;
27 static constexpr int32_t THREAD_NUM_BACKGROUND = 2;
28 constexpr size_t TASK_INSERT_COUNT = 15;
29 constexpr size_t CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL = 270000;
30 
~ThumbnailGenerateWorker()31 ThumbnailGenerateWorker::~ThumbnailGenerateWorker()
32 {
33     ClearWorkerThreads();
34 
35     if (timerId_ != 0) {
36         timer_.Unregister(timerId_);
37         timer_.Shutdown();
38         timerId_ = 0;
39     }
40 }
41 
Init(const ThumbnailTaskType & taskType)42 int32_t ThumbnailGenerateWorker::Init(const ThumbnailTaskType &taskType)
43 {
44     std::unique_lock<std::mutex> lock(taskMutex_);
45     if (!threads_.empty()) {
46         return E_OK;
47     }
48 
49     MEDIA_INFO_LOG("threads empty, need to init, taskType:%{public}d", taskType);
50     int32_t threadNum;
51     std::string threadName;
52     taskType_ = taskType;
53     CpuAffinityType cpuAffinityType;
54     if (taskType == ThumbnailTaskType::FOREGROUND) {
55         threadNum = THREAD_NUM_FOREGROUND;
56         threadName = THREAD_NAME_FOREGROUND;
57         cpuAffinityType = CpuAffinityType::CPU_IDX_9;
58     } else if (taskType == ThumbnailTaskType::BACKGROUND) {
59         threadNum = THREAD_NUM_BACKGROUND;
60         threadName = THREAD_NAME_BACKGROUND;
61         cpuAffinityType = CpuAffinityType::CPU_IDX_9;
62     } else {
63         MEDIA_ERR_LOG("invalid task type");
64         return E_ERR;
65     }
66 
67     isThreadRunning_ = true;
68     for (auto i = 0; i < threadNum; i++) {
69         std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus =
70             std::make_shared<ThumbnailGenerateThreadStatus>(i);
71         threadStatus->cpuAffinityType = cpuAffinityType;
72         std::thread thread([this, threadStatus] { this->StartWorker(threadStatus); });
73         pthread_setname_np(thread.native_handle(), threadName.c_str());
74         threads_.emplace_back(std::move(thread));
75         threadsStatus_.emplace_back(threadStatus);
76     }
77     lock.unlock();
78     RegisterWorkerTimer();
79     return E_OK;
80 }
81 
ReleaseTaskQueue(const ThumbnailTaskPriority & taskPriority)82 int32_t ThumbnailGenerateWorker::ReleaseTaskQueue(const ThumbnailTaskPriority &taskPriority)
83 {
84     if (taskPriority == ThumbnailTaskPriority::HIGH) {
85         highPriorityTaskQueue_.Clear();
86     } else if (taskPriority == ThumbnailTaskPriority::MID) {
87         midPriorityTaskQueue_.Clear();
88     } else if (taskPriority == ThumbnailTaskPriority::LOW) {
89         lowPriorityTaskQueue_.Clear();
90     } else {
91         MEDIA_ERR_LOG("invalid task priority");
92         return E_ERR;
93     }
94     return E_OK;
95 }
96 
AddTask(const std::shared_ptr<ThumbnailGenerateTask> & task,const ThumbnailTaskPriority & taskPriority)97 int32_t ThumbnailGenerateWorker::AddTask(
98     const std::shared_ptr<ThumbnailGenerateTask> &task, const ThumbnailTaskPriority &taskPriority)
99 {
100     IncreaseRequestIdTaskNum(task);
101     if (taskPriority == ThumbnailTaskPriority::HIGH) {
102         highPriorityTaskQueue_.Push(task);
103     } else if (taskPriority == ThumbnailTaskPriority::MID) {
104         midPriorityTaskQueue_.Push(task);
105     } else if (taskPriority == ThumbnailTaskPriority::LOW) {
106         lowPriorityTaskQueue_.Push(task);
107     } else {
108         MEDIA_ERR_LOG("invalid task priority");
109         return E_ERR;
110     }
111 
112     Init(taskType_);
113     workerCv_.notify_one();
114     return E_OK;
115 }
116 
IgnoreTaskByRequestId(int32_t requestId)117 void ThumbnailGenerateWorker::IgnoreTaskByRequestId(int32_t requestId)
118 {
119     if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty()) {
120         MEDIA_INFO_LOG("task queue empty, no need to ignore task requestId: %{public}d", requestId);
121         return;
122     }
123     ignoreRequestId_.store(requestId);
124 
125     std::unique_lock<std::mutex> lock(requestIdMapLock_);
126     requestIdTaskMap_.erase(requestId);
127     MEDIA_INFO_LOG("IgnoreTaskByRequestId, requestId: %{public}d", requestId);
128 }
129 
WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)130 bool ThumbnailGenerateWorker::WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
131 {
132     std::unique_lock<std::mutex> lock(workerLock_);
133     if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty() &&
134         isThreadRunning_) {
135         ignoreRequestId_ = 0;
136         threadStatus->isThreadWaiting_ = true;
137         bool ret = workerCv_.wait_for(lock, std::chrono::milliseconds(CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL), [this]() {
138             return !isThreadRunning_ || !highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() ||
139                    !lowPriorityTaskQueue_.Empty();
140         });
141         if (!ret) {
142             MEDIA_INFO_LOG("Wait for task timeout");
143             return false;
144         }
145     }
146     threadStatus->isThreadWaiting_ = false;
147     return isThreadRunning_;
148 }
149 
StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)150 void ThumbnailGenerateWorker::StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
151 {
152     std::string name("ThumbnailGenerateWorker");
153     pthread_setname_np(pthread_self(), name.c_str());
154     MEDIA_INFO_LOG("ThumbnailGenerateWorker thread start, taskType:%{public}d, id:%{public}d, "
155         "cpuAffinityType:%{public}d", taskType_, threadStatus->threadId_, threadStatus->cpuAffinityType);
156     while (isThreadRunning_) {
157         if (!WaitForTask(threadStatus)) {
158             continue;
159         }
160         CpuUtils::SetSelfThreadAffinity(threadStatus->cpuAffinityType);
161         std::shared_ptr<ThumbnailGenerateTask> task;
162         if (!highPriorityTaskQueue_.Empty() && highPriorityTaskQueue_.Pop(task) && task != nullptr) {
163             if (NeedIgnoreTask(task->data_->requestId_)) {
164                 continue;
165             }
166             task->executor_(task->data_);
167             ++(threadStatus->taskNum_);
168             DecreaseRequestIdTaskNum(task);
169             continue;
170         }
171 
172         if (!midPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Pop(task) && task != nullptr) {
173             if (NeedIgnoreTask(task->data_->requestId_)) {
174                 continue;
175             }
176             task->executor_(task->data_);
177             ++(threadStatus->taskNum_);
178             DecreaseRequestIdTaskNum(task);
179             continue;
180         }
181 
182         if (!lowPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Pop(task) && task != nullptr) {
183             if (NeedIgnoreTask(task->data_->requestId_)) {
184                 continue;
185             }
186             task->executor_(task->data_);
187             ++(threadStatus->taskNum_);
188             DecreaseRequestIdTaskNum(task);
189         }
190     }
191     MEDIA_INFO_LOG("ThumbnailGenerateWorker thread finish, taskType:%{public}d, id:%{public}d",
192         taskType_, threadStatus->threadId_);
193 }
194 
NeedIgnoreTask(int32_t requestId)195 bool ThumbnailGenerateWorker::NeedIgnoreTask(int32_t requestId)
196 {
197     return ignoreRequestId_ != 0 && ignoreRequestId_ == requestId;
198 }
199 
IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)200 void ThumbnailGenerateWorker::IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
201 {
202     if (task == nullptr || task->data_->requestId_ == 0) {
203         // If requestId is 0, no need to manager this task.
204         return;
205     }
206 
207     std::unique_lock<std::mutex> lock(requestIdMapLock_);
208     int32_t requestId = task->data_->requestId_;
209     auto pos = requestIdTaskMap_.find(requestId);
210     if (pos == requestIdTaskMap_.end()) {
211         requestIdTaskMap_.insert(std::make_pair(requestId, 1));
212         return;
213     }
214     ++(pos->second);
215 }
216 
DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)217 void ThumbnailGenerateWorker::DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
218 {
219     if (task == nullptr || task->data_->requestId_ == 0) {
220         // If requestId is 0, no need to manager this task.
221         return;
222     }
223 
224     std::unique_lock<std::mutex> lock(requestIdMapLock_);
225     int32_t requestId = task->data_->requestId_;
226     auto pos = requestIdTaskMap_.find(requestId);
227     if (pos == requestIdTaskMap_.end()) {
228         return;
229     }
230 
231     if (--(pos->second) == 0) {
232         requestIdTaskMap_.erase(requestId);
233         NotifyTaskFinished(requestId);
234     }
235 }
236 
NotifyTaskFinished(int32_t requestId)237 void ThumbnailGenerateWorker::NotifyTaskFinished(int32_t requestId)
238 {
239     auto watch = MediaLibraryNotify::GetInstance();
240     if (watch == nullptr) {
241         MEDIA_ERR_LOG("watch is nullptr");
242         return;
243     }
244     std::string notifyUri = PhotoColumn::PHOTO_URI_PREFIX + std::to_string(requestId);
245     watch->Notify(notifyUri, NotifyType::NOTIFY_THUMB_UPDATE);
246 }
247 
ClearWorkerThreads()248 void ThumbnailGenerateWorker::ClearWorkerThreads()
249 {
250     {
251         std::unique_lock<std::mutex> lock(workerLock_);
252         isThreadRunning_ = false;
253     }
254     ignoreRequestId_ = 0;
255     workerCv_.notify_all();
256     for (auto &thread : threads_) {
257         if (!thread.joinable()) {
258             continue;
259         }
260         thread.join();
261     }
262     threadsStatus_.clear();
263     threads_.clear();
264     MEDIA_INFO_LOG("Clear ThumbnailGenerateWorker threads successfully, taskType:%{public}d", taskType_);
265 }
266 
IsAllThreadWaiting()267 bool ThumbnailGenerateWorker::IsAllThreadWaiting()
268 {
269     std::unique_lock<std::mutex> lock(workerLock_);
270     if (!highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() || !lowPriorityTaskQueue_.Empty()) {
271         MEDIA_INFO_LOG("task queue is not empty, no need to clear worker threads, taskType:%{public}d", taskType_);
272         return false;
273     }
274 
275     for (auto &threadStatus : threadsStatus_) {
276         if (!threadStatus->isThreadWaiting_) {
277             MEDIA_INFO_LOG("thread is running, taskType:%{public}d, id:%{public}d, taskNum:%{public}d",
278                 taskType_, threadStatus->threadId_, threadStatus->taskNum_);
279             return false;
280         }
281     }
282     isThreadRunning_ = false;
283     return true;
284 }
285 
TryClearWorkerThreads()286 void ThumbnailGenerateWorker::TryClearWorkerThreads()
287 {
288     std::unique_lock<std::mutex> lock(taskMutex_);
289     if (!IsAllThreadWaiting()) {
290         return;
291     }
292 
293     ClearWorkerThreads();
294 }
295 
RegisterWorkerTimer()296 void ThumbnailGenerateWorker::RegisterWorkerTimer()
297 {
298     Utils::Timer::TimerCallback timerCallback = [this]() {
299         MEDIA_INFO_LOG("ThumbnailGenerateWorker timerCallback, ClearWorkerThreads, taskType:%{public}d", taskType_);
300         TryClearWorkerThreads();
301     };
302 
303     std::lock_guard<std::mutex> lock(timerMutex_);
304     if (timerId_ == 0) {
305         MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Setup, taskType:%{public}d", taskType_);
306         timer_.Setup();
307     } else {
308         timer_.Unregister(timerId_);
309     }
310 
311     timerId_ = timer_.Register(timerCallback, CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL, false);
312     MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Restart, taskType:%{public}d, timeId:%{public}u",
313         taskType_, timerId_);
314 }
315 
TryCloseTimer()316 void ThumbnailGenerateWorker::TryCloseTimer()
317 {
318     std::lock_guard<std::mutex> lock(timerMutex_);
319     if (threads_.empty() && timerId_ != 0) {
320         timer_.Unregister(timerId_);
321         timer_.Shutdown();
322         timerId_ = 0;
323         MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Shutdown, taskType:%{public}d", taskType_);
324     }
325 }
326 } // namespace Media
327 } // namespace OHOS