1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "base_task_group.h"
17 #include "dp_log.h"
18
19 namespace OHOS {
20 namespace CameraStandard {
21 namespace DeferredProcessing {
BaseTaskGroup(const std::string & name,TaskFunc func,bool serial,const ThreadPool * threadPool)22 BaseTaskGroup::BaseTaskGroup(const std::string& name, TaskFunc func, bool serial, const ThreadPool* threadPool)
23 : name_(name),
24 func_(std::move(func)),
25 serial_(serial),
26 threadPool_(threadPool),
27 handle_(INVALID_TASK_GROUP_HANDLE),
28 inflight_(false),
29 que_(name + " queue")
30 {
31 DP_DEBUG_LOG("task group (%s).", name_.c_str());
32 }
33
~BaseTaskGroup()34 BaseTaskGroup::~BaseTaskGroup()
35 {
36 DP_DEBUG_LOG("task group name: %s, handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
37 que_.SetActive(false);
38 que_.Clear();
39 }
40
Initialize()41 void BaseTaskGroup::Initialize()
42 {
43 handle_ = GenerateHandle();
44 que_.SetActive(true);
45 DP_DEBUG_LOG("task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
46 }
47
GetName()48 const std::string& BaseTaskGroup::GetName()
49 {
50 return name_;
51 }
52
GetHandle()53 TaskGroupHandle BaseTaskGroup::GetHandle()
54 {
55 return handle_;
56 }
57
SubmitTask(std::any param)58 bool BaseTaskGroup::SubmitTask(std::any param)
59 {
60 std::lock_guard<std::mutex> lock(mutex_);
61 if (que_.Full()) {
62 DP_WARNING_LOG("Submit task (%s), handle: %{public}d, que is full!", name_.c_str(), static_cast<int>(handle_));
63 } else {
64 DP_DEBUG_LOG("Submit task (%s), handle: %{public}d, size: %zu.", name_.c_str(),
65 static_cast<int>(handle_), que_.Size());
66 }
67 que_.Push(std::move(param));
68 DispatchTaskUnlocked();
69 return true;
70 }
71
CancelAllTasks()72 void BaseTaskGroup::CancelAllTasks()
73 {
74 std::lock_guard<std::mutex> lock(mutex_);
75 DP_DEBUG_LOG("Cancel all tasks for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
76 que_.Clear();
77 if (serial_) {
78 inflight_ = false;
79 }
80 }
81
GetTaskCount()82 size_t BaseTaskGroup::GetTaskCount()
83 {
84 std::lock_guard<std::mutex> lock(mutex_);
85 DP_DEBUG_LOG("Get task count for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
86 return que_.Size();
87 }
88
GetTaskUnlocked()89 std::function<void()> BaseTaskGroup::GetTaskUnlocked()
90 {
91 if (que_.Empty()) {
92 DP_DEBUG_LOG("(%s) no available tasks.", name_.c_str());
93 return {};
94 }
95 std::weak_ptr<BaseTaskGroup> weakThis(shared_from_this());
96 auto task = [param = que_.Pop(), weakThis]() {
97 auto thiz = weakThis.lock();
98 if (thiz) {
99 thiz->func_(std::move(param));
100 thiz->OnTaskComplete();
101 }
102 };
103 DP_DEBUG_LOG("return one task %s, handle:%{public}d, size: %zu.", name_.c_str(), static_cast<int>(handle_),
104 que_.Size());
105 return task;
106 }
107
GenerateHandle()108 TaskGroupHandle BaseTaskGroup::GenerateHandle()
109 {
110 static std::atomic<uint64_t> counter = 0;
111 DP_DEBUG_LOG("(%s) entered.", name_.c_str());
112 uint64_t prefix = std::hash<std::string>{}(name_);
113 uint64_t handle = ++counter;
114 return prefix | handle;
115 }
116
DispatchTaskUnlocked()117 void BaseTaskGroup::DispatchTaskUnlocked()
118 {
119 DP_DEBUG_LOG("(%s) entered.", name_.c_str());
120 if (serial_ && inflight_.load()) {
121 DP_DEBUG_LOG("(%s), task is running, redispatch tasks after finish running.", name_.c_str());
122 return;
123 }
124 auto task = GetTaskUnlocked();
125 if (task) {
126 inflight_ = true;
127 threadPool_->Submit([task = std::move(task)] { task(); });
128 } else {
129 DP_DEBUG_LOG("all tasks have completed for %s handle:%{public}d.", name_.c_str(), static_cast<int>(handle_));
130 }
131 }
OnTaskComplete()132 void BaseTaskGroup::OnTaskComplete()
133 {
134 std::lock_guard<std::mutex> lock(mutex_);
135 if (serial_) {
136 inflight_ = false;
137 }
138 DispatchTaskUnlocked();
139 }
140 } //namespace DeferredProcessing
141 } // namespace CameraStandard
142 } // namespace OHOS
143