1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "offline_pipeline.h"
17 #include "buffer_manager.h"
18 #include "ibuffer_pool.h"
19 #include <vector>
20
21 namespace OHOS::Camera {
OfflinePipeline()22 OfflinePipeline::OfflinePipeline() : calltimes_(0) {}
23
~OfflinePipeline()24 OfflinePipeline::~OfflinePipeline()
25 {
26 StopProcess();
27 bufferCache_.clear();
28 }
29
StartProcess()30 RetCode OfflinePipeline::StartProcess()
31 {
32 int origin = calltimes_.fetch_add(1);
33 if (origin != 0) {
34 // already called at most 1 time, no reenter
35 CAMERA_LOGE("Now will not start, current start %{public}d times", calltimes_.load());
36 return RC_ERROR;
37 }
38
39 running_ = true;
40 processThread_ = new std::thread([this]() {
41 prctl(PR_SET_NAME, "offlinepipeline");
42 CAMERA_LOGI("offlinepipeline thread start");
43 while (true) {
44 {
45 std::unique_lock<std::mutex> l(queueLock_);
46 if (running_ == false) {
47 CAMERA_LOGD("offlinepipeline thread break");
48 break;
49 }
50 }
51 HandleBuffers();
52 }
53 CAMERA_LOGI("offlinepipeline thread start");
54 });
55
56 if (processThread_ == nullptr) {
57 return RC_ERROR;
58 }
59 return RC_OK;
60 }
61
StopProcess()62 RetCode OfflinePipeline::StopProcess()
63 {
64 if (processThread_ == nullptr) {
65 CAMERA_LOGE("cannot stop.");
66 return RC_ERROR;
67 }
68 {
69 std::unique_lock<std::mutex> l(queueLock_);
70 if (running_.load() == false) {
71 return RC_OK;
72 }
73
74 running_ = false;
75 cv_.notify_one();
76 }
77
78 processThread_->detach();
79 delete processThread_;
80 processThread_ = nullptr;
81 return RC_OK;
82 }
83
BindOfflineStreamCallback(std::function<void (std::shared_ptr<IBuffer> &)> & callback)84 void OfflinePipeline::BindOfflineStreamCallback(std::function<void(std::shared_ptr<IBuffer>&)>& callback)
85 {
86 std::lock_guard<std::mutex> l(cbLock_);
87 callback_ = callback;
88
89 return;
90 }
91
SwitchToOfflineMode()92 void OfflinePipeline::SwitchToOfflineMode()
93 {
94 offlineMode_ = true;
95 }
96
CancelCapture(int32_t captureId)97 RetCode OfflinePipeline::CancelCapture(int32_t captureId)
98 {
99 CAMERA_LOGE("cancel capture begin");
100 if (bufferCache_.empty()) {
101 CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
102 return RC_OK;
103 }
104
105 std::vector<std::shared_ptr<IBuffer>> cache;
106 {
107 std::unique_lock<std::mutex> l(queueLock_);
108 auto it = std::find_if(bufferCache_.begin(), bufferCache_.end(),
109 [&captureId](const std::vector<std::shared_ptr<IBuffer>>& c) {
110 for (auto b : c) {
111 if (b->GetCaptureId() == captureId) {
112 return true;
113 }
114 }
115 return false;
116 });
117 if (it == bufferCache_.end()) {
118 CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
119 return RC_OK;
120 }
121 cache = *it;
122 bufferCache_.erase(it);
123 }
124 for (auto it : cache) {
125 it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
126 }
127 DeliverCancelCache(cache);
128 CAMERA_LOGE("cancel capture end");
129 return RC_OK;
130 }
131
FlushOfflineStream()132 RetCode OfflinePipeline::FlushOfflineStream()
133 {
134 if (!offlineMode_.load()) {
135 CAMERA_LOGE("can't flush in online mode");
136 return RC_ERROR;
137 }
138
139 if (!bufferCache_.empty()) {
140 std::unique_lock<std::mutex> l(queueLock_);
141 while (!bufferCache_.empty()) {
142 auto cache = bufferCache_.front();
143 bufferCache_.pop_front();
144
145 for (auto it : cache) {
146 it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
147 }
148 DeliverCancelCache(cache);
149 }
150 }
151
152 return RC_OK;
153 }
154
ReceiveCache(std::vector<std::shared_ptr<IBuffer>> & buffers)155 void OfflinePipeline::ReceiveCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
156 {
157 if (!buffers.empty() && buffers[0]->GetBufferStatus() != CAMERA_BUFFER_STATUS_OK) {
158 DeliverCancelCache(buffers);
159 return;
160 }
161
162 std::unique_lock<std::mutex> l(queueLock_);
163 bufferCache_.emplace_back(buffers);
164 cv_.notify_one();
165
166 return;
167 }
168
HandleBuffers()169 void OfflinePipeline::HandleBuffers()
170 {
171 std::vector<std::shared_ptr<IBuffer>> buffers = {};
172 {
173 std::unique_lock<std::mutex> l(queueLock_);
174 cv_.wait(l, [this] { return !(running_.load() && bufferCache_.empty()); });
175 if (running_ == false) {
176 return;
177 }
178 buffers = bufferCache_.front();
179 bufferCache_.pop_front();
180 if (buffers.empty()) {
181 return;
182 }
183 }
184 CAMERA_LOGI("OfflinePipeline::HandleBuffers, begin to ProcessCache buffer, size = %{public}d ", buffers.size());
185 ProcessCache(buffers);
186 return;
187 }
188
ProcessCache(std::vector<std::shared_ptr<IBuffer>> & buffers)189 void OfflinePipeline::ProcessCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
190 {
191 DeliverCache(buffers);
192 return;
193 }
DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>> & buffers)194 void OfflinePipeline::DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>>& buffers)
195 {
196 for (auto it : buffers) {
197 if (it == nullptr) {
198 continue;
199 }
200 auto bufferManager = BufferManager::GetInstance();
201 if (bufferManager == nullptr) {
202 CAMERA_LOGE("can't get buffer manager");
203 continue;
204 }
205 auto bufferPool = bufferManager->GetBufferPool(it->GetPoolId());
206 if (bufferPool == nullptr) {
207 CAMERA_LOGE("can't get buffer pool");
208 return;
209 }
210 bufferPool->ReturnBuffer(it);
211 }
212 }
DeliverCache(std::vector<std::shared_ptr<IBuffer>> & buffers)213 void OfflinePipeline::DeliverCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
214 {
215 DeliverCacheCheck(buffers);
216 if (offlineMode_.load()) {
217 std::shared_ptr<IBuffer> nullBuffer = nullptr;
218 DeliverOfflineBuffer(nullBuffer);
219 }
220 return;
221 }
222
DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>> & buffers)223 void OfflinePipeline::DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
224 {
225 DeliverCache(buffers);
226 return;
227 }
228
DeliverOfflineBuffer(std::shared_ptr<IBuffer> & buffer)229 void OfflinePipeline::DeliverOfflineBuffer(std::shared_ptr<IBuffer>& buffer)
230 {
231 if (!offlineMode_.load()) {
232 CAMERA_LOGE("cannot deliver buffer in online mode");
233 return;
234 }
235
236 if (callback_ == nullptr) {
237 CAMERA_LOGE("cannot deliver offline buffer, callback_ is null");
238 return;
239 }
240
241 callback_(buffer);
242
243 return;
244 }
245
CacheQueueDry()246 bool OfflinePipeline::CacheQueueDry()
247 {
248 std::unique_lock<std::mutex> l(queueLock_);
249 return bufferCache_.empty();
250 }
251
CheckOwnerOfCaptureId(int32_t captureId)252 bool OfflinePipeline::CheckOwnerOfCaptureId(int32_t captureId)
253 {
254 std::unique_lock<std::mutex> l(queueLock_);
255 for (auto it : bufferCache_) {
256 for (auto buffer : it) {
257 if (captureId == buffer->GetCaptureId()) {
258 return true;
259 }
260 }
261 }
262 return false;
263 }
264 } // namespace OHOS::Camera
265