1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "offline_pipeline.h"
17 #include "buffer_manager.h"
18 #include "ibuffer_pool.h"
19 #include <vector>
20 
21 namespace OHOS::Camera {
OfflinePipeline()22 OfflinePipeline::OfflinePipeline() : calltimes_(0) {}
23 
~OfflinePipeline()24 OfflinePipeline::~OfflinePipeline()
25 {
26     StopProcess();
27     bufferCache_.clear();
28 }
29 
StartProcess()30 RetCode OfflinePipeline::StartProcess()
31 {
32     int origin = calltimes_.fetch_add(1);
33     if (origin != 0) {
34         // already called at most 1 time, no reenter
35         CAMERA_LOGE("Now will not start, current start %{public}d times", calltimes_.load());
36         return RC_ERROR;
37     }
38 
39     running_ = true;
40     processThread_ = new std::thread([this]() {
41         prctl(PR_SET_NAME, "offlinepipeline");
42         CAMERA_LOGI("offlinepipeline thread start");
43         while (true) {
44             {
45                 std::unique_lock<std::mutex> l(queueLock_);
46                 if (running_ == false) {
47                     CAMERA_LOGD("offlinepipeline thread break");
48                     break;
49                 }
50             }
51             HandleBuffers();
52         }
53         CAMERA_LOGI("offlinepipeline thread start");
54     });
55 
56     if (processThread_ == nullptr) {
57         return RC_ERROR;
58     }
59     return RC_OK;
60 }
61 
StopProcess()62 RetCode OfflinePipeline::StopProcess()
63 {
64     if (processThread_ == nullptr) {
65         CAMERA_LOGE("cannot stop.");
66         return RC_ERROR;
67     }
68     {
69         std::unique_lock<std::mutex> l(queueLock_);
70         if (running_.load() == false) {
71             return RC_OK;
72         }
73 
74         running_ = false;
75         cv_.notify_one();
76     }
77 
78     processThread_->detach();
79     delete processThread_;
80     processThread_ = nullptr;
81     return RC_OK;
82 }
83 
BindOfflineStreamCallback(std::function<void (std::shared_ptr<IBuffer> &)> & callback)84 void OfflinePipeline::BindOfflineStreamCallback(std::function<void(std::shared_ptr<IBuffer>&)>& callback)
85 {
86     std::lock_guard<std::mutex> l(cbLock_);
87     callback_ = callback;
88 
89     return;
90 }
91 
SwitchToOfflineMode()92 void OfflinePipeline::SwitchToOfflineMode()
93 {
94     offlineMode_ = true;
95 }
96 
CancelCapture(int32_t captureId)97 RetCode OfflinePipeline::CancelCapture(int32_t captureId)
98 {
99     CAMERA_LOGE("cancel capture begin");
100     if (bufferCache_.empty()) {
101         CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
102         return RC_OK;
103     }
104 
105     std::vector<std::shared_ptr<IBuffer>> cache;
106     {
107         std::unique_lock<std::mutex> l(queueLock_);
108         auto it = std::find_if(bufferCache_.begin(), bufferCache_.end(),
109             [&captureId](const std::vector<std::shared_ptr<IBuffer>>& c) {
110                 for (auto b : c) {
111                     if (b->GetCaptureId() == captureId) {
112                         return true;
113                     }
114                 }
115                 return false;
116             });
117         if (it == bufferCache_.end()) {
118             CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
119             return RC_OK;
120         }
121         cache = *it;
122         bufferCache_.erase(it);
123     }
124     for (auto it : cache) {
125         it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
126     }
127     DeliverCancelCache(cache);
128     CAMERA_LOGE("cancel capture end");
129     return RC_OK;
130 }
131 
FlushOfflineStream()132 RetCode OfflinePipeline::FlushOfflineStream()
133 {
134     if (!offlineMode_.load()) {
135         CAMERA_LOGE("can't flush in online mode");
136         return RC_ERROR;
137     }
138 
139     if (!bufferCache_.empty()) {
140         std::unique_lock<std::mutex> l(queueLock_);
141         while (!bufferCache_.empty()) {
142             auto cache = bufferCache_.front();
143             bufferCache_.pop_front();
144 
145             for (auto it : cache) {
146                 it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
147             }
148             DeliverCancelCache(cache);
149         }
150     }
151 
152     return RC_OK;
153 }
154 
ReceiveCache(std::vector<std::shared_ptr<IBuffer>> & buffers)155 void OfflinePipeline::ReceiveCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
156 {
157     if (!buffers.empty() && buffers[0]->GetBufferStatus() != CAMERA_BUFFER_STATUS_OK) {
158         DeliverCancelCache(buffers);
159         return;
160     }
161 
162     std::unique_lock<std::mutex> l(queueLock_);
163     bufferCache_.emplace_back(buffers);
164     cv_.notify_one();
165 
166     return;
167 }
168 
HandleBuffers()169 void OfflinePipeline::HandleBuffers()
170 {
171     std::vector<std::shared_ptr<IBuffer>> buffers = {};
172     {
173         std::unique_lock<std::mutex> l(queueLock_);
174         cv_.wait(l, [this] { return !(running_.load() && bufferCache_.empty()); });
175         if (running_ == false) {
176             return;
177         }
178         buffers = bufferCache_.front();
179         bufferCache_.pop_front();
180         if (buffers.empty()) {
181             return;
182         }
183     }
184     CAMERA_LOGI("OfflinePipeline::HandleBuffers, begin to ProcessCache buffer, size = %{public}d ", buffers.size());
185     ProcessCache(buffers);
186     return;
187 }
188 
ProcessCache(std::vector<std::shared_ptr<IBuffer>> & buffers)189 void OfflinePipeline::ProcessCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
190 {
191     DeliverCache(buffers);
192     return;
193 }
DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>> & buffers)194 void OfflinePipeline::DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>>& buffers)
195 {
196     for (auto it : buffers) {
197         if (it == nullptr) {
198             continue;
199         }
200         auto bufferManager = BufferManager::GetInstance();
201         if (bufferManager == nullptr) {
202             CAMERA_LOGE("can't get buffer manager");
203             continue;
204         }
205         auto bufferPool = bufferManager->GetBufferPool(it->GetPoolId());
206         if (bufferPool == nullptr) {
207             CAMERA_LOGE("can't get buffer pool");
208             return;
209         }
210         bufferPool->ReturnBuffer(it);
211     }
212 }
DeliverCache(std::vector<std::shared_ptr<IBuffer>> & buffers)213 void OfflinePipeline::DeliverCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
214 {
215     DeliverCacheCheck(buffers);
216     if (offlineMode_.load()) {
217         std::shared_ptr<IBuffer> nullBuffer = nullptr;
218         DeliverOfflineBuffer(nullBuffer);
219     }
220     return;
221 }
222 
DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>> & buffers)223 void OfflinePipeline::DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
224 {
225     DeliverCache(buffers);
226     return;
227 }
228 
DeliverOfflineBuffer(std::shared_ptr<IBuffer> & buffer)229 void OfflinePipeline::DeliverOfflineBuffer(std::shared_ptr<IBuffer>& buffer)
230 {
231     if (!offlineMode_.load()) {
232         CAMERA_LOGE("cannot deliver buffer in online mode");
233         return;
234     }
235 
236     if (callback_ == nullptr) {
237         CAMERA_LOGE("cannot deliver offline buffer, callback_ is null");
238         return;
239     }
240 
241     callback_(buffer);
242 
243     return;
244 }
245 
CacheQueueDry()246 bool OfflinePipeline::CacheQueueDry()
247 {
248     std::unique_lock<std::mutex> l(queueLock_);
249     return bufferCache_.empty();
250 }
251 
CheckOwnerOfCaptureId(int32_t captureId)252 bool OfflinePipeline::CheckOwnerOfCaptureId(int32_t captureId)
253 {
254     std::unique_lock<std::mutex> l(queueLock_);
255     for (auto it : bufferCache_) {
256         for (auto buffer : it) {
257             if (captureId == buffer->GetCaptureId()) {
258                 return true;
259             }
260         }
261     }
262     return false;
263 }
264 } // namespace OHOS::Camera
265