1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dcamera_stream_data_process.h"
17
18 #include "anonymous_string.h"
19 #include "distributed_camera_constants.h"
20 #include "distributed_camera_errno.h"
21 #include "distributed_hardware_log.h"
22
23 #include "dcamera_pipeline_source.h"
24 #include "dcamera_stream_data_process_pipeline_listener.h"
25
26 namespace OHOS {
27 namespace DistributedHardware {
DCameraStreamDataProcess(std::string devId,std::string dhId,DCStreamType streamType)28 DCameraStreamDataProcess::DCameraStreamDataProcess(std::string devId, std::string dhId, DCStreamType streamType)
29 : devId_(devId), dhId_(dhId), streamType_(streamType)
30 {
31 DHLOGI("DCameraStreamDataProcess Constructor devId %{public}s dhId %{public}s", GetAnonyString(devId_).c_str(),
32 GetAnonyString(dhId_).c_str());
33 pipeline_ = nullptr;
34 listener_ = nullptr;
35 }
36
~DCameraStreamDataProcess()37 DCameraStreamDataProcess::~DCameraStreamDataProcess()
38 {
39 DHLOGI("DCameraStreamDataProcess Destructor devId %{public}s dhId %{public}s streamType: %{public}d",
40 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_);
41 streamIds_.clear();
42 producers_.clear();
43 if (pipeline_ != nullptr) {
44 pipeline_->DestroyDataProcessPipeline();
45 }
46 }
47
FeedStream(std::shared_ptr<DataBuffer> & buffer)48 void DCameraStreamDataProcess::FeedStream(std::shared_ptr<DataBuffer>& buffer)
49 {
50 for (auto streamId : streamIds_) {
51 uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
52 DHLOGD("FeedStream devId %{public}s dhId %{public}s streamId %{public}d streamType %{public}d streamSize: "
53 "%{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId,
54 streamType_, buffersSize);
55 }
56 switch (streamType_) {
57 case SNAPSHOT_FRAME: {
58 FeedStreamToSnapShot(buffer);
59 break;
60 }
61 case CONTINUOUS_FRAME: {
62 FeedStreamToContinue(buffer);
63 break;
64 }
65 default:
66 break;
67 }
68 }
69
ConfigStreams(std::shared_ptr<DCameraStreamConfig> & dstConfig,std::set<int32_t> & streamIds)70 void DCameraStreamDataProcess::ConfigStreams(std::shared_ptr<DCameraStreamConfig>& dstConfig,
71 std::set<int32_t>& streamIds)
72 {
73 for (auto streamId : streamIds) {
74 DHLOGI("ConfigStreams devId %{public}s dhId %{public}s streamId %{public}d, width: %{public}d, height: "
75 "%{public}d, format: %{public}d, dataspace: %{public}d, encodeType: %{public}d, streamType: %{public}d",
76 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId, dstConfig->width_,
77 dstConfig->height_, dstConfig->format_, dstConfig->dataspace_, dstConfig->encodeType_, dstConfig->type_);
78 }
79 dstConfig_ = dstConfig;
80 streamIds_ = streamIds;
81 }
82
ReleaseStreams(std::set<int32_t> & streamIds)83 void DCameraStreamDataProcess::ReleaseStreams(std::set<int32_t>& streamIds)
84 {
85 for (auto streamId : streamIds) {
86 DHLOGI("ReleaseStreams devId %{public}s dhId %{public}s streamId %{public}d streamType %{public}d",
87 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId, streamType_);
88 }
89 std::lock_guard<std::mutex> autoLock(producerMutex_);
90 for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
91 int32_t streamId = *iter;
92 DHLOGI("ReleaseStreams devId %{public}s dhId %{public}s streamId: %{public}d", GetAnonyString(devId_).c_str(),
93 GetAnonyString(dhId_).c_str(), streamId);
94 streamIds_.erase(streamId);
95 auto producerIter = producers_.find(streamId);
96 if (producerIter == producers_.end()) {
97 continue;
98 }
99 producerIter->second->Stop();
100 producers_.erase(streamId);
101 }
102 }
103
StartCapture(std::shared_ptr<DCameraStreamConfig> & srcConfig,std::set<int32_t> & streamIds)104 void DCameraStreamDataProcess::StartCapture(std::shared_ptr<DCameraStreamConfig>& srcConfig,
105 std::set<int32_t>& streamIds)
106 {
107 for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
108 DHLOGI("StartCapture devId %{public}s dhId %{public}s streamType: %{public}d streamId: %{public}d, "
109 "srcConfig: width: %{public}d, height: %{public}d, format: %{public}d, dataspace: %{public}d, "
110 "streamType: %{public}d, encodeType: %{public}d",
111 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, *iter,
112 srcConfig->width_, srcConfig->height_, srcConfig->format_, srcConfig->dataspace_,
113 srcConfig->type_, srcConfig->encodeType_);
114 }
115 srcConfig_ = srcConfig;
116 if (streamType_ == CONTINUOUS_FRAME) {
117 CreatePipeline();
118 }
119 {
120 std::lock_guard<std::mutex> autoLock(producerMutex_);
121 for (auto iter = streamIds_.begin(); iter != streamIds_.end(); iter++) {
122 uint32_t streamId = *iter;
123 DHLOGI("StartCapture streamId: %{public}d", streamId);
124 if (streamIds.find(streamId) == streamIds.end()) {
125 continue;
126 }
127
128 DHLOGI("StartCapture findProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
129 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
130 auto producerIter = producers_.find(streamId);
131 if (producerIter != producers_.end()) {
132 continue;
133 }
134 DHLOGI("StartCapture CreateProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
135 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
136 producers_[streamId] =
137 std::make_shared<DCameraStreamDataProcessProducer>(devId_, dhId_, streamId, streamType_);
138 producers_[streamId]->Start();
139 }
140 }
141 }
142
StopCapture(std::set<int32_t> & streamIds)143 void DCameraStreamDataProcess::StopCapture(std::set<int32_t>& streamIds)
144 {
145 for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
146 DHLOGI("StopCapture devId %{public}s dhId %{public}s streamType: %{public}d streamId: %{public}d",
147 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, *iter);
148 }
149 {
150 std::lock_guard<std::mutex> autoLock(producerMutex_);
151 for (auto iter = streamIds_.begin(); iter != streamIds_.end(); iter++) {
152 uint32_t streamId = *iter;
153 DHLOGI("StopCapture streamId: %{public}d", streamId);
154 if (streamIds.find(streamId) == streamIds.end()) {
155 continue;
156 }
157
158 DHLOGI("StopCapture findProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
159 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
160 auto producerIter = producers_.find(streamId);
161 if (producerIter == producers_.end()) {
162 DHLOGE("StopCapture no producer, devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
163 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
164 continue;
165 }
166 DHLOGI("StopCapture stop producer, devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
167 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
168 producerIter->second->Stop();
169 producerIter = producers_.erase(producerIter);
170 }
171 }
172 }
173
GetAllStreamIds(std::set<int32_t> & streamIds)174 void DCameraStreamDataProcess::GetAllStreamIds(std::set<int32_t>& streamIds)
175 {
176 streamIds = streamIds_;
177 }
178
GetProducerSize()179 int32_t DCameraStreamDataProcess::GetProducerSize()
180 {
181 DHLOGI("DCameraStreamDataProcess GetProducerSize devId %{public}s dhId %{public}s",
182 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
183 std::lock_guard<std::mutex> autoLock(producerMutex_);
184 return producers_.size();
185 }
186
FeedStreamToSnapShot(const std::shared_ptr<DataBuffer> & buffer)187 void DCameraStreamDataProcess::FeedStreamToSnapShot(const std::shared_ptr<DataBuffer>& buffer)
188 {
189 uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
190 DHLOGD("DCameraStreamDataProcess FeedStreamToSnapShot devId %{public}s dhId %{public}s streamType %{public}d "
191 "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
192 streamType_, buffersSize);
193 std::lock_guard<std::mutex> autoLock(producerMutex_);
194 for (auto iter = producers_.begin(); iter != producers_.end(); iter++) {
195 iter->second->FeedStream(buffer);
196 }
197 }
198
FeedStreamToContinue(const std::shared_ptr<DataBuffer> & buffer)199 void DCameraStreamDataProcess::FeedStreamToContinue(const std::shared_ptr<DataBuffer>& buffer)
200 {
201 uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
202 DHLOGD("DCameraStreamDataProcess FeedStreamToContinue devId %{public}s dhId %{public}s streamType %{public}d "
203 "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
204 streamType_, buffersSize);
205 std::lock_guard<std::mutex> autoLock(pipelineMutex_);
206 std::vector<std::shared_ptr<DataBuffer>> buffers;
207 buffers.push_back(buffer);
208 if (pipeline_ == nullptr) {
209 buffersSize = static_cast<uint64_t>(buffer->Size());
210 DHLOGE("pipeline null devId %{public}s dhId %{public}s type: %{public}d streamSize: %{public}" PRIu64,
211 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, buffersSize);
212 return;
213 }
214 int32_t ret = pipeline_->ProcessData(buffers);
215 if (ret != DCAMERA_OK) {
216 DHLOGE("pipeline ProcessData failed, ret: %{public}d", ret);
217 }
218 }
219
OnProcessedVideoBuffer(const std::shared_ptr<DataBuffer> & videoResult)220 void DCameraStreamDataProcess::OnProcessedVideoBuffer(const std::shared_ptr<DataBuffer>& videoResult)
221 {
222 uint64_t resultSize = static_cast<uint64_t>(videoResult->Size());
223 DHLOGI("DCameraStreamDataProcess OnProcessedVideoBuffer devId %{public}s dhId %{public}s streamType: %{public}d "
224 "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
225 streamType_, resultSize);
226 std::lock_guard<std::mutex> autoLock(producerMutex_);
227 for (auto iter = producers_.begin(); iter != producers_.end(); iter++) {
228 iter->second->FeedStream(videoResult);
229 }
230 }
231
OnError(const DataProcessErrorType errorType)232 void DCameraStreamDataProcess::OnError(const DataProcessErrorType errorType)
233 {
234 DHLOGE("DCameraStreamDataProcess OnError pipeline errorType: %{public}d", errorType);
235 }
236
CreatePipeline()237 void DCameraStreamDataProcess::CreatePipeline()
238 {
239 DHLOGI("DCameraStreamDataProcess CreatePipeline devId %{public}s dhId %{public}s",
240 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
241 std::lock_guard<std::mutex> autoLock(pipelineMutex_);
242 if (pipeline_ != nullptr) {
243 DHLOGI("DCameraStreamDataProcess CreatePipeline already exist, devId %{public}s dhId %{public}s",
244 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
245 return;
246 }
247 pipeline_ = std::make_shared<DCameraPipelineSource>();
248 auto process = std::shared_ptr<DCameraStreamDataProcess>(shared_from_this());
249 listener_ = std::make_shared<DCameraStreamDataProcessPipelineListener>(process);
250 VideoConfigParams srcParams(GetPipelineCodecType(srcConfig_->encodeType_), GetPipelineFormat(srcConfig_->format_),
251 DCAMERA_PRODUCER_FPS_DEFAULT, srcConfig_->width_, srcConfig_->height_);
252 VideoConfigParams dstParams(GetPipelineCodecType(dstConfig_->encodeType_), GetPipelineFormat(dstConfig_->format_),
253 DCAMERA_PRODUCER_FPS_DEFAULT, dstConfig_->width_, dstConfig_->height_);
254 int32_t ret = pipeline_->CreateDataProcessPipeline(PipelineType::VIDEO, srcParams, dstParams, listener_);
255 if (ret != DCAMERA_OK) {
256 DHLOGE("DCameraStreamDataProcess CreateDataProcessPipeline type: %{public}d failed, ret: %{public}d",
257 PipelineType::VIDEO, ret);
258 }
259 }
260
DestroyPipeline()261 void DCameraStreamDataProcess::DestroyPipeline()
262 {
263 DHLOGI("DCameraStreamDataProcess DestroyPipeline devId %{public}s dhId %{public}s",
264 GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
265 std::lock_guard<std::mutex> autoLock(pipelineMutex_);
266 if (pipeline_ == nullptr) {
267 return;
268 }
269 pipeline_->DestroyDataProcessPipeline();
270 pipeline_ = nullptr;
271 }
272
GetPipelineCodecType(DCEncodeType encodeType)273 VideoCodecType DCameraStreamDataProcess::GetPipelineCodecType(DCEncodeType encodeType)
274 {
275 VideoCodecType codecType;
276 switch (encodeType) {
277 case ENCODE_TYPE_H264:
278 codecType = VideoCodecType::CODEC_H264;
279 break;
280 case ENCODE_TYPE_H265:
281 codecType = VideoCodecType::CODEC_H265;
282 break;
283 case ENCODE_TYPE_MPEG4_ES:
284 codecType = VideoCodecType::CODEC_MPEG4_ES;
285 break;
286 default:
287 codecType = VideoCodecType::NO_CODEC;
288 break;
289 }
290 return codecType;
291 }
292
GetPipelineFormat(int32_t format)293 Videoformat DCameraStreamDataProcess::GetPipelineFormat(int32_t format)
294 {
295 Videoformat videoFormat;
296 switch (format) {
297 case OHOS_CAMERA_FORMAT_RGBA_8888:
298 videoFormat = Videoformat::RGBA_8888;
299 break;
300 default:
301 videoFormat = Videoformat::NV21;
302 break;
303 }
304 return videoFormat;
305 }
306 } // namespace DistributedHardware
307 } // namespace OHOS
308