1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "medialibrary_async_worker.h"
17 
18 #include <pthread.h>
19 #include "media_log.h"
20 
21 using namespace std;
22 
23 namespace OHOS {
24 namespace Media {
25 static const int32_t SUCCESS = 0;
26 static const int32_t BG_SLEEP_COUNT = 500;
27 static const int32_t FG_SLEEP_COUNT = 50;
28 static const int32_t REST_FOR_MILLISECOND = 20;
29 static const int32_t REST_FOR_LONG_SECOND = 2;
30 static const int32_t THREAD_NUM = 3;
31 shared_ptr<MediaLibraryAsyncWorker> MediaLibraryAsyncWorker::asyncWorkerInstance_{nullptr};
32 mutex MediaLibraryAsyncWorker::instanceLock_;
33 
GetInstance()34 shared_ptr<MediaLibraryAsyncWorker> MediaLibraryAsyncWorker::GetInstance()
35 {
36     if (asyncWorkerInstance_ == nullptr) {
37         lock_guard<mutex> lockGuard(instanceLock_);
38         asyncWorkerInstance_ = shared_ptr<MediaLibraryAsyncWorker>(new MediaLibraryAsyncWorker());
39         if (asyncWorkerInstance_ != nullptr) {
40             asyncWorkerInstance_->Init();
41         }
42     }
43     return asyncWorkerInstance_;
44 }
45 
MediaLibraryAsyncWorker()46 MediaLibraryAsyncWorker::MediaLibraryAsyncWorker() : isThreadRunning_(false), doneTotal_(0)
47 {}
48 
~MediaLibraryAsyncWorker()49 MediaLibraryAsyncWorker::~MediaLibraryAsyncWorker()
50 {
51     isThreadRunning_ = false;
52     bgWorkCv_.notify_all();
53     for (auto &thread : threads_) {
54         if (thread.joinable()) {
55             thread.join();
56         }
57     }
58     asyncWorkerInstance_ = nullptr;
59 }
60 
Init()61 void MediaLibraryAsyncWorker::Init()
62 {
63     isThreadRunning_ = true;
64     doneTotal_ = 0;
65     for (auto i = 0; i < THREAD_NUM; i++) {
66         threads_.emplace_back(
67             std::thread([this, num = i]() { this->StartWorker(num); })
68         );
69     }
70 }
71 
Interrupt()72 void MediaLibraryAsyncWorker::Interrupt()
73 {
74     ReleaseBgTask();
75 }
76 
Stop()77 void MediaLibraryAsyncWorker::Stop()
78 {
79     ReleaseBgTask();
80     ReleaseFgTask();
81 }
82 
AddTask(const shared_ptr<MediaLibraryAsyncTask> & task,bool isFg)83 int32_t MediaLibraryAsyncWorker::AddTask(const shared_ptr<MediaLibraryAsyncTask> &task, bool isFg)
84 {
85     if (isFg) {
86         lock_guard<mutex> lockGuard(fgTaskLock_);
87         fgTaskQueue_.push(task);
88     } else {
89         lock_guard<mutex> lockGuard(bgTaskLock_);
90         bgTaskQueue_.push(task);
91     }
92 
93     bgWorkCv_.notify_one();
94     return SUCCESS;
95 }
96 
GetFgTask()97 shared_ptr<MediaLibraryAsyncTask> MediaLibraryAsyncWorker::GetFgTask()
98 {
99     lock_guard<mutex> lockGuard(fgTaskLock_);
100     if (fgTaskQueue_.empty()) {
101         return nullptr;
102     }
103     shared_ptr<MediaLibraryAsyncTask> task = fgTaskQueue_.front();
104     fgTaskQueue_.pop();
105     return task;
106 }
107 
ReleaseFgTask()108 void MediaLibraryAsyncWorker::ReleaseFgTask()
109 {
110     lock_guard<mutex> lockGuard(fgTaskLock_);
111     std::queue<std::shared_ptr<MediaLibraryAsyncTask>> tmp;
112     fgTaskQueue_.swap(tmp);
113 }
114 
GetBgTask()115 shared_ptr<MediaLibraryAsyncTask> MediaLibraryAsyncWorker::GetBgTask()
116 {
117     lock_guard<mutex> lockGuard(bgTaskLock_);
118     if (bgTaskQueue_.empty()) {
119         return nullptr;
120     }
121     shared_ptr<MediaLibraryAsyncTask> task = bgTaskQueue_.front();
122     bgTaskQueue_.pop();
123     return task;
124 }
125 
ReleaseBgTask()126 void MediaLibraryAsyncWorker::ReleaseBgTask()
127 {
128     lock_guard<mutex> lockGuard(bgTaskLock_);
129     std::queue<std::shared_ptr<MediaLibraryAsyncTask>> tmp;
130     bgTaskQueue_.swap(tmp);
131 }
132 
IsFgQueueEmpty()133 bool MediaLibraryAsyncWorker::IsFgQueueEmpty()
134 {
135     lock_guard<mutex> lock_Guard(fgTaskLock_);
136     return fgTaskQueue_.empty();
137 }
138 
IsBgQueueEmpty()139 bool MediaLibraryAsyncWorker::IsBgQueueEmpty()
140 {
141     lock_guard<mutex> lock_Guard(bgTaskLock_);
142     return bgTaskQueue_.empty();
143 }
144 
ClearRefreshTaskQueue()145 void MediaLibraryAsyncWorker::ClearRefreshTaskQueue()
146 {
147     lock_guard<mutex> lockGuardFg(fgTaskLock_);
148     std::queue<std::shared_ptr<MediaLibraryAsyncTask>> tmp;
149     while (!fgTaskQueue_.empty()) {
150         auto task = fgTaskQueue_.front();
151         fgTaskQueue_.pop();
152         if (task->taskType_ != TaskType::REFRESH_ALBUM) {
153             tmp.push(task);
154         }
155     }
156     fgTaskQueue_.swap(tmp);
157 
158     std::queue<std::shared_ptr<MediaLibraryAsyncTask>> tail;
159     tmp.swap(tail);
160 
161     lock_guard<mutex> lockGuardBg(bgTaskLock_);
162     while (!bgTaskQueue_.empty()) {
163         auto task = bgTaskQueue_.front();
164         bgTaskQueue_.pop();
165         if (task->taskType_ != TaskType::REFRESH_ALBUM) {
166             tmp.push(task);
167         }
168     }
169     bgTaskQueue_.swap(tmp);
170 }
171 
WaitForTask()172 void MediaLibraryAsyncWorker::WaitForTask()
173 {
174     std::unique_lock<std::mutex> lock(bgWorkLock_);
175     bgWorkCv_.wait(lock,
176         [this]() { return !isThreadRunning_ || !IsFgQueueEmpty() || !IsBgQueueEmpty(); });
177 }
178 
SleepFgWork()179 void MediaLibraryAsyncWorker::SleepFgWork()
180 {
181     if ((doneTotal_.load() % FG_SLEEP_COUNT) == 0) {
182         this_thread::sleep_for(chrono::milliseconds(REST_FOR_MILLISECOND));
183     }
184 }
185 
SleepBgWork()186 void MediaLibraryAsyncWorker::SleepBgWork()
187 {
188     this_thread::sleep_for(chrono::milliseconds(REST_FOR_MILLISECOND));
189     if ((doneTotal_.load() % BG_SLEEP_COUNT) == 0) {
190         this_thread::sleep_for(chrono::seconds(REST_FOR_LONG_SECOND));
191     }
192 }
193 
StartWorker(int num)194 void MediaLibraryAsyncWorker::StartWorker(int num)
195 {
196     string name("MediaLibraryAsyncWorker");
197     name.append(to_string(num));
198     pthread_setname_np(pthread_self(), name.c_str());
199     while (true) {
200         WaitForTask();
201         if (!isThreadRunning_) {
202             return;
203         }
204         if (!IsFgQueueEmpty()) {
205             shared_ptr<MediaLibraryAsyncTask> fgTask = GetFgTask();
206             if (fgTask != nullptr) {
207                 fgTask->executor_(fgTask->data_);
208                 fgTask = nullptr;
209                 doneTotal_++;
210             }
211         } else if (!IsBgQueueEmpty()) {
212             shared_ptr<MediaLibraryAsyncTask> bgTask = GetBgTask();
213             if (bgTask != nullptr) {
214                 bgTask->executor_(bgTask->data_);
215                 bgTask = nullptr;
216                 doneTotal_++;
217                 SleepBgWork();
218             }
219         }
220     }
221 }
222 } // namespace Media
223 } // namespace OHOS
224