1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "base_task_group.h"
17 #include "dp_log.h"
18 
19 namespace OHOS {
20 namespace CameraStandard {
21 namespace DeferredProcessing {
BaseTaskGroup(const std::string & name,TaskFunc func,bool serial,const ThreadPool * threadPool)22 BaseTaskGroup::BaseTaskGroup(const std::string& name, TaskFunc func, bool serial, const ThreadPool* threadPool)
23     : name_(name),
24       func_(std::move(func)),
25       serial_(serial),
26       threadPool_(threadPool),
27       handle_(INVALID_TASK_GROUP_HANDLE),
28       inflight_(false),
29       que_(name + " queue")
30 {
31     DP_DEBUG_LOG("task group (%s).", name_.c_str());
32 }
33 
~BaseTaskGroup()34 BaseTaskGroup::~BaseTaskGroup()
35 {
36     DP_DEBUG_LOG("task group name: %s, handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
37     que_.SetActive(false);
38     que_.Clear();
39 }
40 
Initialize()41 void BaseTaskGroup::Initialize()
42 {
43     handle_ = GenerateHandle();
44     que_.SetActive(true);
45     DP_DEBUG_LOG("task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
46 }
47 
GetName()48 const std::string& BaseTaskGroup::GetName()
49 {
50     return name_;
51 }
52 
GetHandle()53 TaskGroupHandle BaseTaskGroup::GetHandle()
54 {
55     return handle_;
56 }
57 
SubmitTask(std::any param)58 bool BaseTaskGroup::SubmitTask(std::any param)
59 {
60     std::lock_guard<std::mutex> lock(mutex_);
61     if (que_.Full()) {
62         DP_WARNING_LOG("Submit task (%s), handle: %{public}d, que is full!", name_.c_str(), static_cast<int>(handle_));
63     } else {
64         DP_DEBUG_LOG("Submit task (%s), handle: %{public}d, size: %zu.", name_.c_str(),
65             static_cast<int>(handle_), que_.Size());
66     }
67     que_.Push(std::move(param));
68     DispatchTaskUnlocked();
69     return true;
70 }
71 
CancelAllTasks()72 void BaseTaskGroup::CancelAllTasks()
73 {
74     std::lock_guard<std::mutex> lock(mutex_);
75     DP_DEBUG_LOG("Cancel all tasks for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
76     que_.Clear();
77     if (serial_) {
78         inflight_ = false;
79     }
80 }
81 
GetTaskCount()82 size_t BaseTaskGroup::GetTaskCount()
83 {
84     std::lock_guard<std::mutex> lock(mutex_);
85     DP_DEBUG_LOG("Get task count for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
86     return que_.Size();
87 }
88 
GetTaskUnlocked()89 std::function<void()> BaseTaskGroup::GetTaskUnlocked()
90 {
91     if (que_.Empty()) {
92         DP_DEBUG_LOG("(%s) no available tasks.", name_.c_str());
93         return {};
94     }
95     std::weak_ptr<BaseTaskGroup> weakThis(shared_from_this());
96     auto task = [param = que_.Pop(), weakThis]() {
97         auto thiz = weakThis.lock();
98         if (thiz) {
99             thiz->func_(std::move(param));
100             thiz->OnTaskComplete();
101         }
102     };
103     DP_DEBUG_LOG("return one task %s, handle:%{public}d, size: %zu.", name_.c_str(), static_cast<int>(handle_),
104         que_.Size());
105     return task;
106 }
107 
GenerateHandle()108 TaskGroupHandle BaseTaskGroup::GenerateHandle()
109 {
110     static std::atomic<uint64_t> counter = 0;
111     DP_DEBUG_LOG("(%s) entered.", name_.c_str());
112     uint64_t prefix = std::hash<std::string>{}(name_);
113     uint64_t handle = ++counter;
114     return prefix | handle;
115 }
116 
DispatchTaskUnlocked()117 void BaseTaskGroup::DispatchTaskUnlocked()
118 {
119     DP_DEBUG_LOG("(%s) entered.", name_.c_str());
120     if (serial_ && inflight_.load()) {
121         DP_DEBUG_LOG("(%s), task is running, redispatch tasks after finish running.", name_.c_str());
122         return;
123     }
124     auto task = GetTaskUnlocked();
125     if (task) {
126         inflight_ = true;
127         threadPool_->Submit([task = std::move(task)] { task(); });
128     } else {
129         DP_DEBUG_LOG("all tasks have completed for %s handle:%{public}d.", name_.c_str(), static_cast<int>(handle_));
130     }
131 }
OnTaskComplete()132 void BaseTaskGroup::OnTaskComplete()
133 {
134     std::lock_guard<std::mutex> lock(mutex_);
135     if (serial_) {
136         inflight_ = false;
137     }
138     DispatchTaskUnlocked();
139 }
140 } //namespace DeferredProcessing
141 } // namespace CameraStandard
142 } // namespace OHOS
143