1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dcamera_stream_data_process.h"
17 
18 #include "anonymous_string.h"
19 #include "distributed_camera_constants.h"
20 #include "distributed_camera_errno.h"
21 #include "distributed_hardware_log.h"
22 
23 #include "dcamera_pipeline_source.h"
24 #include "dcamera_stream_data_process_pipeline_listener.h"
25 
26 namespace OHOS {
27 namespace DistributedHardware {
DCameraStreamDataProcess(std::string devId,std::string dhId,DCStreamType streamType)28 DCameraStreamDataProcess::DCameraStreamDataProcess(std::string devId, std::string dhId, DCStreamType streamType)
29     : devId_(devId), dhId_(dhId), streamType_(streamType)
30 {
31     DHLOGI("DCameraStreamDataProcess Constructor devId %{public}s dhId %{public}s", GetAnonyString(devId_).c_str(),
32         GetAnonyString(dhId_).c_str());
33     pipeline_ = nullptr;
34     listener_ = nullptr;
35 }
36 
~DCameraStreamDataProcess()37 DCameraStreamDataProcess::~DCameraStreamDataProcess()
38 {
39     DHLOGI("DCameraStreamDataProcess Destructor devId %{public}s dhId %{public}s streamType: %{public}d",
40         GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_);
41     streamIds_.clear();
42     producers_.clear();
43     if (pipeline_ != nullptr) {
44         pipeline_->DestroyDataProcessPipeline();
45     }
46 }
47 
FeedStream(std::shared_ptr<DataBuffer> & buffer)48 void DCameraStreamDataProcess::FeedStream(std::shared_ptr<DataBuffer>& buffer)
49 {
50     for (auto streamId : streamIds_) {
51         uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
52         DHLOGD("FeedStream devId %{public}s dhId %{public}s streamId %{public}d streamType %{public}d streamSize: "
53             "%{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId,
54             streamType_, buffersSize);
55     }
56     switch (streamType_) {
57         case SNAPSHOT_FRAME: {
58             FeedStreamToSnapShot(buffer);
59             break;
60         }
61         case CONTINUOUS_FRAME: {
62             FeedStreamToContinue(buffer);
63             break;
64         }
65         default:
66             break;
67     }
68 }
69 
ConfigStreams(std::shared_ptr<DCameraStreamConfig> & dstConfig,std::set<int32_t> & streamIds)70 void DCameraStreamDataProcess::ConfigStreams(std::shared_ptr<DCameraStreamConfig>& dstConfig,
71     std::set<int32_t>& streamIds)
72 {
73     for (auto streamId : streamIds) {
74         DHLOGI("ConfigStreams devId %{public}s dhId %{public}s streamId %{public}d, width: %{public}d, height: "
75             "%{public}d, format: %{public}d, dataspace: %{public}d, encodeType: %{public}d, streamType: %{public}d",
76             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId, dstConfig->width_,
77             dstConfig->height_, dstConfig->format_, dstConfig->dataspace_, dstConfig->encodeType_, dstConfig->type_);
78     }
79     dstConfig_ = dstConfig;
80     streamIds_ = streamIds;
81 }
82 
ReleaseStreams(std::set<int32_t> & streamIds)83 void DCameraStreamDataProcess::ReleaseStreams(std::set<int32_t>& streamIds)
84 {
85     for (auto streamId : streamIds) {
86         DHLOGI("ReleaseStreams devId %{public}s dhId %{public}s streamId %{public}d streamType %{public}d",
87             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamId, streamType_);
88     }
89     std::lock_guard<std::mutex> autoLock(producerMutex_);
90     for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
91         int32_t streamId = *iter;
92         DHLOGI("ReleaseStreams devId %{public}s dhId %{public}s streamId: %{public}d", GetAnonyString(devId_).c_str(),
93             GetAnonyString(dhId_).c_str(), streamId);
94         streamIds_.erase(streamId);
95         auto producerIter = producers_.find(streamId);
96         if (producerIter == producers_.end()) {
97             continue;
98         }
99         producerIter->second->Stop();
100         producers_.erase(streamId);
101     }
102 }
103 
StartCapture(std::shared_ptr<DCameraStreamConfig> & srcConfig,std::set<int32_t> & streamIds)104 void DCameraStreamDataProcess::StartCapture(std::shared_ptr<DCameraStreamConfig>& srcConfig,
105     std::set<int32_t>& streamIds)
106 {
107     for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
108         DHLOGI("StartCapture devId %{public}s dhId %{public}s streamType: %{public}d streamId: %{public}d, "
109             "srcConfig: width: %{public}d, height: %{public}d, format: %{public}d, dataspace: %{public}d, "
110             "streamType: %{public}d, encodeType: %{public}d",
111             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, *iter,
112             srcConfig->width_,  srcConfig->height_, srcConfig->format_, srcConfig->dataspace_,
113             srcConfig->type_, srcConfig->encodeType_);
114     }
115     srcConfig_ = srcConfig;
116     if (streamType_ == CONTINUOUS_FRAME) {
117         CreatePipeline();
118     }
119     {
120         std::lock_guard<std::mutex> autoLock(producerMutex_);
121         for (auto iter = streamIds_.begin(); iter != streamIds_.end(); iter++) {
122             uint32_t streamId = *iter;
123             DHLOGI("StartCapture streamId: %{public}d", streamId);
124             if (streamIds.find(streamId) == streamIds.end()) {
125                 continue;
126             }
127 
128             DHLOGI("StartCapture findProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
129                 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
130             auto producerIter = producers_.find(streamId);
131             if (producerIter != producers_.end()) {
132                 continue;
133             }
134             DHLOGI("StartCapture CreateProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
135                 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
136             producers_[streamId] =
137                 std::make_shared<DCameraStreamDataProcessProducer>(devId_, dhId_, streamId, streamType_);
138             producers_[streamId]->Start();
139         }
140     }
141 }
142 
StopCapture(std::set<int32_t> & streamIds)143 void DCameraStreamDataProcess::StopCapture(std::set<int32_t>& streamIds)
144 {
145     for (auto iter = streamIds.begin(); iter != streamIds.end(); iter++) {
146         DHLOGI("StopCapture devId %{public}s dhId %{public}s streamType: %{public}d streamId: %{public}d",
147             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, *iter);
148     }
149     {
150         std::lock_guard<std::mutex> autoLock(producerMutex_);
151         for (auto iter = streamIds_.begin(); iter != streamIds_.end(); iter++) {
152             uint32_t streamId = *iter;
153             DHLOGI("StopCapture streamId: %{public}d", streamId);
154             if (streamIds.find(streamId) == streamIds.end()) {
155                 continue;
156             }
157 
158             DHLOGI("StopCapture findProducer devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
159                 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
160             auto producerIter = producers_.find(streamId);
161             if (producerIter == producers_.end()) {
162                 DHLOGE("StopCapture no producer, devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
163                     "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
164                 continue;
165             }
166             DHLOGI("StopCapture stop producer, devId %{public}s dhId %{public}s streamType: %{public}d streamId: "
167                 "%{public}d", GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, streamId);
168             producerIter->second->Stop();
169             producerIter = producers_.erase(producerIter);
170         }
171     }
172 }
173 
GetAllStreamIds(std::set<int32_t> & streamIds)174 void DCameraStreamDataProcess::GetAllStreamIds(std::set<int32_t>& streamIds)
175 {
176     streamIds = streamIds_;
177 }
178 
GetProducerSize()179 int32_t DCameraStreamDataProcess::GetProducerSize()
180 {
181     DHLOGI("DCameraStreamDataProcess GetProducerSize devId %{public}s dhId %{public}s",
182         GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
183     std::lock_guard<std::mutex> autoLock(producerMutex_);
184     return producers_.size();
185 }
186 
FeedStreamToSnapShot(const std::shared_ptr<DataBuffer> & buffer)187 void DCameraStreamDataProcess::FeedStreamToSnapShot(const std::shared_ptr<DataBuffer>& buffer)
188 {
189     uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
190     DHLOGD("DCameraStreamDataProcess FeedStreamToSnapShot devId %{public}s dhId %{public}s streamType %{public}d "
191         "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
192         streamType_, buffersSize);
193     std::lock_guard<std::mutex> autoLock(producerMutex_);
194     for (auto iter = producers_.begin(); iter != producers_.end(); iter++) {
195         iter->second->FeedStream(buffer);
196     }
197 }
198 
FeedStreamToContinue(const std::shared_ptr<DataBuffer> & buffer)199 void DCameraStreamDataProcess::FeedStreamToContinue(const std::shared_ptr<DataBuffer>& buffer)
200 {
201     uint64_t buffersSize = static_cast<uint64_t>(buffer->Size());
202     DHLOGD("DCameraStreamDataProcess FeedStreamToContinue devId %{public}s dhId %{public}s streamType %{public}d "
203         "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
204         streamType_, buffersSize);
205     std::lock_guard<std::mutex> autoLock(pipelineMutex_);
206     std::vector<std::shared_ptr<DataBuffer>> buffers;
207     buffers.push_back(buffer);
208     if (pipeline_ == nullptr) {
209         buffersSize = static_cast<uint64_t>(buffer->Size());
210         DHLOGE("pipeline null devId %{public}s dhId %{public}s type: %{public}d streamSize: %{public}" PRIu64,
211             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(), streamType_, buffersSize);
212         return;
213     }
214     int32_t ret = pipeline_->ProcessData(buffers);
215     if (ret != DCAMERA_OK) {
216         DHLOGE("pipeline ProcessData failed, ret: %{public}d", ret);
217     }
218 }
219 
OnProcessedVideoBuffer(const std::shared_ptr<DataBuffer> & videoResult)220 void DCameraStreamDataProcess::OnProcessedVideoBuffer(const std::shared_ptr<DataBuffer>& videoResult)
221 {
222     uint64_t resultSize = static_cast<uint64_t>(videoResult->Size());
223     DHLOGI("DCameraStreamDataProcess OnProcessedVideoBuffer devId %{public}s dhId %{public}s streamType: %{public}d "
224         "streamSize: %{public}" PRIu64, GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str(),
225         streamType_, resultSize);
226     std::lock_guard<std::mutex> autoLock(producerMutex_);
227     for (auto iter = producers_.begin(); iter != producers_.end(); iter++) {
228         iter->second->FeedStream(videoResult);
229     }
230 }
231 
OnError(const DataProcessErrorType errorType)232 void DCameraStreamDataProcess::OnError(const DataProcessErrorType errorType)
233 {
234     DHLOGE("DCameraStreamDataProcess OnError pipeline errorType: %{public}d", errorType);
235 }
236 
CreatePipeline()237 void DCameraStreamDataProcess::CreatePipeline()
238 {
239     DHLOGI("DCameraStreamDataProcess CreatePipeline devId %{public}s dhId %{public}s",
240         GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
241     std::lock_guard<std::mutex> autoLock(pipelineMutex_);
242     if (pipeline_ != nullptr) {
243         DHLOGI("DCameraStreamDataProcess CreatePipeline already exist, devId %{public}s dhId %{public}s",
244             GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
245         return;
246     }
247     pipeline_ = std::make_shared<DCameraPipelineSource>();
248     auto process = std::shared_ptr<DCameraStreamDataProcess>(shared_from_this());
249     listener_ = std::make_shared<DCameraStreamDataProcessPipelineListener>(process);
250     VideoConfigParams srcParams(GetPipelineCodecType(srcConfig_->encodeType_), GetPipelineFormat(srcConfig_->format_),
251         DCAMERA_PRODUCER_FPS_DEFAULT, srcConfig_->width_, srcConfig_->height_);
252     VideoConfigParams dstParams(GetPipelineCodecType(dstConfig_->encodeType_), GetPipelineFormat(dstConfig_->format_),
253         DCAMERA_PRODUCER_FPS_DEFAULT, dstConfig_->width_, dstConfig_->height_);
254     int32_t ret = pipeline_->CreateDataProcessPipeline(PipelineType::VIDEO, srcParams, dstParams, listener_);
255     if (ret != DCAMERA_OK) {
256         DHLOGE("DCameraStreamDataProcess CreateDataProcessPipeline type: %{public}d failed, ret: %{public}d",
257             PipelineType::VIDEO, ret);
258     }
259 }
260 
DestroyPipeline()261 void DCameraStreamDataProcess::DestroyPipeline()
262 {
263     DHLOGI("DCameraStreamDataProcess DestroyPipeline devId %{public}s dhId %{public}s",
264         GetAnonyString(devId_).c_str(), GetAnonyString(dhId_).c_str());
265     std::lock_guard<std::mutex> autoLock(pipelineMutex_);
266     if (pipeline_ == nullptr) {
267         return;
268     }
269     pipeline_->DestroyDataProcessPipeline();
270     pipeline_ = nullptr;
271 }
272 
GetPipelineCodecType(DCEncodeType encodeType)273 VideoCodecType DCameraStreamDataProcess::GetPipelineCodecType(DCEncodeType encodeType)
274 {
275     VideoCodecType codecType;
276     switch (encodeType) {
277         case ENCODE_TYPE_H264:
278             codecType = VideoCodecType::CODEC_H264;
279             break;
280         case ENCODE_TYPE_H265:
281             codecType = VideoCodecType::CODEC_H265;
282             break;
283         case ENCODE_TYPE_MPEG4_ES:
284             codecType = VideoCodecType::CODEC_MPEG4_ES;
285             break;
286         default:
287             codecType = VideoCodecType::NO_CODEC;
288             break;
289     }
290     return codecType;
291 }
292 
GetPipelineFormat(int32_t format)293 Videoformat DCameraStreamDataProcess::GetPipelineFormat(int32_t format)
294 {
295     Videoformat videoFormat;
296     switch (format) {
297         case OHOS_CAMERA_FORMAT_RGBA_8888:
298             videoFormat = Videoformat::RGBA_8888;
299             break;
300         default:
301             videoFormat = Videoformat::NV21;
302             break;
303     }
304     return videoFormat;
305 }
306 } // namespace DistributedHardware
307 } // namespace OHOS
308