1 /*
2  * Copyright (c) 2024-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define HST_LOG_TAG "Task"
16 #include "osal/task/task.h"
17 #include "osal/task/taskInner.h"
18 #include "osal/task/thread.h"
19 #include "osal/utils/util.h"
20 #include "cpp_ext/memory_ext.h"
21 #include "common/log.h"
22 
23 #include <mutex>
24 
25 namespace {
26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "PipelineTreadPool" };
27 }
28 
29 namespace OHOS {
30 namespace Media {
31 namespace {
32     constexpr int64_t ADJUST_US = 500;
33     constexpr int64_t US_PER_MS = 1000;
34 }
35 
ConvertPriorityType(TaskPriority priority)36 static ThreadPriority ConvertPriorityType(TaskPriority priority)
37 {
38     switch (priority) {
39         case TaskPriority::LOW:
40             return ThreadPriority::LOW;
41         case TaskPriority::NORMAL:
42             return ThreadPriority::NORMAL;
43         case TaskPriority::MIDDLE:
44             return ThreadPriority::MIDDLE;
45         case TaskPriority::HIGHEST:
46             return ThreadPriority::HIGHEST;
47         default:
48             return ThreadPriority::NORMAL;
49     }
50 }
51 
TaskTypeConvert(TaskType type)52 static std::string TaskTypeConvert(TaskType type)
53 {
54     static const std::map<TaskType, std::string> table = {
55         {TaskType::GLOBAL, "G"},
56         {TaskType::VIDEO, "V"},
57         {TaskType::AUDIO, "A"},
58         {TaskType::SUBTITLE, "T"},
59         {TaskType::SINGLETON, "S"},
60     };
61     auto it = table.find(type);
62     if (it != table.end()) {
63         return it->second;
64     }
65     return "NA";
66 }
67 
GetNowUs()68 static int64_t GetNowUs()
69 {
70     auto now = std::chrono::steady_clock::now();
71     return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
72 }
73 
GetInstance()74 PipeLineThreadPool& PipeLineThreadPool::GetInstance()
75 {
76     static PipeLineThreadPool instance;
77     return instance;
78 }
79 
FindThread(const std::string & groupId,TaskType taskType,TaskPriority priority)80 std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(const std::string &groupId,
81     TaskType taskType, TaskPriority priority)
82 {
83     AutoLock lock(mutex_);
84     if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
85         workerGroupMap[groupId] = std::make_shared<std::list<std::shared_ptr<PipeLineThread>>>();
86     }
87     std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId];
88     for (auto thread : *threadList.get()) {
89         if (thread->type_ == taskType) {
90             return thread;
91         }
92     }
93     std::shared_ptr<PipeLineThread> newThread = std::make_shared<PipeLineThread>(groupId, taskType, priority);
94     threadList->push_back(newThread);
95     return newThread;
96 }
97 
DestroyThread(const std::string & groupId)98 void PipeLineThreadPool::DestroyThread(const std::string &groupId)
99 {
100     MEDIA_LOG_I("DestroyThread groupId:" PUBLIC_LOG_S, groupId.c_str());
101     std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList;
102     {
103         AutoLock lock(mutex_);
104         if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
105             MEDIA_LOG_E("DestroyThread groupId not exist");
106             return;
107         }
108         threadList = workerGroupMap[groupId];
109         workerGroupMap.erase(groupId);
110     }
111     for (auto thread : *threadList.get()) {
112         thread->Exit();
113     }
114 }
115 
PipeLineThread(std::string groupId,TaskType type,TaskPriority priority)116 PipeLineThread::PipeLineThread(std::string groupId, TaskType type, TaskPriority priority)
117     : groupId_(groupId), type_(type)
118 {
119     MEDIA_LOG_I("PipeLineThread groupId:" PUBLIC_LOG_S " type:%{public}d created call", groupId_.c_str(), type);
120     loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority));
121     name_ = groupId_ + "_" + TaskTypeConvert(type);
122     loop_->SetName(name_);
123     threadExit_ = false;
124     if (loop_->CreateThread([this] { Run(); })) {
125         threadExit_ = false;
126     } else {
127         threadExit_ = true;
128         loop_ = nullptr;
129         MEDIA_LOG_E("PipeLineThread " PUBLIC_LOG_S " create failed", name_.c_str());
130     }
131 }
132 
~PipeLineThread()133 PipeLineThread::~PipeLineThread()
134 {
135     Exit();
136 }
137 
Exit()138 void PipeLineThread::Exit()
139 {
140     {
141         AutoLock lock(mutex_);
142         FALSE_RETURN_W(!threadExit_.load() && loop_);
143 
144         MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " exit", name_.c_str());
145         threadExit_ = true;
146         syncCond_.NotifyAll();
147 
148         // trigger to quit thread in current running thread, must not wait,
149         // or else the current thread will be suspended and can not quit.
150         if (IsRunningInSelf()) {
151             return;
152         }
153     }
154     // loop_ destroy will wait thread join
155     loop_ = nullptr;
156 }
157 
Run()158 void PipeLineThread::Run()
159 {
160     MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " run enter", name_.c_str());
161     while (true) {
162         std::shared_ptr<TaskInner> nextTask;
163         {
164             AutoLock lock(mutex_);
165             if (threadExit_.load()) {
166                 break;
167             }
168             int64_t nextJobUs = INT64_MAX;
169             for (auto task: taskList_) {
170                 int64_t taskJobUs = task->NextJobUs();
171                 if (taskJobUs == -1) {
172                     continue;
173                 }
174                 if (taskJobUs < nextJobUs) {
175                     nextJobUs = taskJobUs;
176                     nextTask = task;
177                 }
178             }
179             if (nextTask == nullptr) {
180                 syncCond_.Wait(lock);
181                 continue;
182             }
183             int64_t nowUs = GetNowUs();
184             if (nextJobUs > (nowUs + ADJUST_US)) {
185                 syncCond_.WaitFor(lock, (nextJobUs - nowUs + ADJUST_US) / US_PER_MS);
186                 continue;
187             }
188         }
189         nextTask->HandleJob();
190     }
191 }
192 
AddTask(std::shared_ptr<TaskInner> task)193 void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task)
194 {
195     AutoLock lock(mutex_);
196     taskList_.push_back(task);
197 }
198 
RemoveTask(std::shared_ptr<TaskInner> task)199 void PipeLineThread::RemoveTask(std::shared_ptr<TaskInner> task)
200 {
201     {
202         AutoLock lock(mutex_);
203         taskList_.remove(task);
204         FALSE_LOG_MSG(!taskList_.empty(),
205          "PipeLineThread " PUBLIC_LOG_S " remove all Task", name_.c_str());
206     }
207     if (type_ == TaskType::SINGLETON) {
208         PipeLineThreadPool::GetInstance().DestroyThread(groupId_);
209     }
210 }
211 
LockJobState()212 void PipeLineThread::LockJobState()
213 {
214     if (IsRunningInSelf()) {
215         return;
216     }
217     mutex_.lock();
218 }
219 
UnLockJobState(bool notifyChange)220 void PipeLineThread::UnLockJobState(bool notifyChange)
221 {
222     if (IsRunningInSelf()) {
223         return;
224     }
225     mutex_.unlock();
226     if (notifyChange) {
227         syncCond_.NotifyAll();
228     }
229 }
230 
IsRunningInSelf()231 bool PipeLineThread::IsRunningInSelf()
232 {
233     return loop_ ? loop_->IsRunningInSelf() : false;
234 }
235 } // namespace Media
236 } // namespace OHOS
237