1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "vtp_stream_socket.h"
17
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "session.h"
30 #include "softbus_adapter_crypto.h"
31 #include "softbus_adapter_socket.h"
32 #include "softbus_adapter_timer.h"
33 #include "softbus_errcode.h"
34 #include "softbus_trans_def.h"
35 #include "stream_common_data.h"
36 #include "stream_depacketizer.h"
37 #include "stream_packetizer.h"
38 #include "vtp_stream_opt.h"
39
40 namespace Communication {
41 namespace SoftBus {
42 bool g_logOn = false;
43 const int32_t FEED_BACK_PERIOD = 1; /* feedback period of fillp stream traffic statistics is 1s */
44 const int32_t MS_PER_SECOND = 1000;
45 const int32_t US_PER_MS = 1000;
46
47 namespace {
PrintOptionInfo(int32_t type,const StreamAttr & value)48 void PrintOptionInfo(int32_t type, const StreamAttr &value)
49 {
50 switch (value.GetType()) {
51 case INT_TYPE:
52 TRANS_LOGI(TRANS_STREAM,
53 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
54 break;
55 case BOOL_TYPE:
56 TRANS_LOGI(TRANS_STREAM,
57 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
58 break;
59 case STRING_TYPE:
60 TRANS_LOGD(TRANS_STREAM,
61 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
62 break;
63 default:
64 TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
65 (void)type;
66 }
67 }
68 } // namespace
69 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
70
71 std::map<int32_t, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
72 std::mutex VtpStreamSocket::streamSocketLockMapLock_;
73 std::map<int32_t, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
74 std::mutex VtpStreamSocket::streamSocketMapLock_;
75
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)76 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo *frameInfo,
77 const Communication::SoftBus::StreamFrameInfo *streamFrameInfo)
78 {
79 frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
80 frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
81 frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
82 frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
83 frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
84 frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
85 }
86
AddStreamSocketLock(int32_t fd,std::mutex & streamsocketlock)87 void VtpStreamSocket::AddStreamSocketLock(int32_t fd, std::mutex &streamsocketlock)
88 {
89 std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
90 if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
91 TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
92 return;
93 }
94
95 g_streamSocketLockMap.emplace(std::pair<int32_t, std::mutex &>(fd, streamsocketlock));
96 }
97
AddStreamSocketListener(int32_t fd,std::shared_ptr<VtpStreamSocket> streamreceiver)98 void VtpStreamSocket::AddStreamSocketListener(int32_t fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
99 {
100 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
101 if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
102 TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
103 return;
104 }
105
106 g_streamSocketMap.emplace(std::pair<int32_t, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
107 }
108
RemoveStreamSocketLock(int32_t fd)109 void VtpStreamSocket::RemoveStreamSocketLock(int32_t fd)
110 {
111 std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
112 if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
113 g_streamSocketLockMap.erase(fd);
114 TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
115 } else {
116 TRANS_LOGE(TRANS_STREAM,
117 "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
118 }
119 }
120
RemoveStreamSocketListener(int32_t fd)121 void VtpStreamSocket::RemoveStreamSocketListener(int32_t fd)
122 {
123 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
124 if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
125 g_streamSocketMap.erase(fd);
126 TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
127 } else {
128 TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
129 }
130 }
131
InsertElementToFuncMap(int32_t type,ValueType valueType,MySetFunc set,MyGetFunc get)132 void VtpStreamSocket::InsertElementToFuncMap(int32_t type, ValueType valueType, MySetFunc set, MyGetFunc get)
133 {
134 OptionFunc fun = {
135 valueType, set, get
136 };
137 optFuncMap_.insert(std::pair<int32_t, OptionFunc>(type, fun));
138 }
139
VtpStreamSocket()140 VtpStreamSocket::VtpStreamSocket()
141 {
142 InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
143 InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
144 InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
145 InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
146 InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
147 InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
148 InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
149 InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
150 InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
151 InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
152 InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
153 InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
154 InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
155 &VtpStreamSocket::GetVtpStackConfig);
156 InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157 &VtpStreamSocket::GetVtpStackConfig);
158 InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159 &VtpStreamSocket::GetVtpStackConfig);
160 InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161 &VtpStreamSocket::GetVtpStackConfig);
162 InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163 &VtpStreamSocket::GetVtpStackConfig);
164 InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
165 &VtpStreamSocket::GetVtpStackConfig);
166 InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
167 &VtpStreamSocket::GetVtpStackConfig);
168 InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
169 &VtpStreamSocket::GetVtpStackConfig);
170 InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
171 &VtpStreamSocket::GetVtpStackConfig);
172 InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
173 &VtpStreamSocket::GetVtpStackConfig);
174 InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
175 &VtpStreamSocket::GetVtpStackConfig);
176 InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
177 &VtpStreamSocket::GetVtpStackConfig);
178 InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
179 InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
180 InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
181 InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
182
183 scene_ = UNKNOWN_SCENE;
184 }
185
~VtpStreamSocket()186 VtpStreamSocket::~VtpStreamSocket()
187 {
188 TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
189 }
190
GetSelf()191 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
192 {
193 return shared_from_this();
194 }
195
HandleFillpFrameStats(int32_t fd,const FtEventCbkInfo * info)196 int32_t VtpStreamSocket::HandleFillpFrameStats(int32_t fd, const FtEventCbkInfo *info)
197 {
198 if (info == nullptr) {
199 TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
200 return SOFTBUS_INVALID_PARAM;
201 }
202 StreamSendStats stats = {};
203 if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
204 sizeof(info->info.frameSendStats)) != EOK) {
205 TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
206 return SOFTBUS_MEM_ERR;
207 }
208
209 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
210 auto itListener = g_streamSocketMap.find(fd);
211 if (itListener != g_streamSocketMap.end()) {
212 if (itListener->second->streamReceiver_ != nullptr) {
213 TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
214 itListener->second->streamReceiver_->OnFrameStats(&stats);
215 } else {
216 TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
217 }
218 } else {
219 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
220 }
221 return SOFTBUS_OK;
222 }
223
HandleRipplePolicy(int32_t fd,const FtEventCbkInfo * info)224 int32_t VtpStreamSocket::HandleRipplePolicy(int32_t fd, const FtEventCbkInfo *info)
225 {
226 if (info == nullptr) {
227 TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
228 return SOFTBUS_INVALID_PARAM;
229 }
230 TrafficStats stats;
231 (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
232 if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
233 sizeof(info->info.trafficData.stats)) != EOK) {
234 TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
235 return SOFTBUS_MEM_ERR;
236 }
237 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
238 auto itListener = g_streamSocketMap.find(fd);
239 if (itListener != g_streamSocketMap.end()) {
240 if (itListener->second->streamReceiver_ != nullptr) {
241 TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
242 itListener->second->streamReceiver_->OnRippleStats(&stats);
243 } else {
244 TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
245 }
246 } else {
247 TRANS_LOGE(TRANS_STREAM,
248 "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
249 }
250 return SOFTBUS_OK;
251 }
252
HandleFillpFrameEvt(int32_t fd,const FtEventCbkInfo * info)253 int32_t VtpStreamSocket::HandleFillpFrameEvt(int32_t fd, const FtEventCbkInfo *info)
254 {
255 if (info == nullptr) {
256 TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
257 return SOFTBUS_INVALID_PARAM;
258 }
259 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
260 auto itListener = g_streamSocketMap.find(fd);
261 if (itListener != g_streamSocketMap.end()) {
262 return itListener->second->HandleFillpFrameEvtInner(fd, info);
263 } else {
264 TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
265 }
266 return SOFTBUS_OK;
267 }
268
HandleFillpFrameEvtInner(int32_t fd,const FtEventCbkInfo * info)269 int32_t VtpStreamSocket::HandleFillpFrameEvtInner(int32_t fd, const FtEventCbkInfo *info)
270 {
271 if (onStreamEvtCb_ != nullptr) {
272 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
273 return HandleVtpFrameEvt(fd, onStreamEvtCb_, info);
274 } else {
275 TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
276 }
277 return SOFTBUS_OK;
278 }
279
280 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int32_t fd,const FtEventCbkInfo * info,QosTv * metricList)281 void VtpStreamSocket::FillSupportDet(int32_t fd, const FtEventCbkInfo *info, QosTv *metricList)
282 {
283 if (info == nullptr || metricList == nullptr) {
284 TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
285 return;
286 }
287 if (info->evt == FT_EVT_BW_DET) {
288 TRANS_LOGI(TRANS_STREAM,
289 "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
290 TRANS_LOGI(TRANS_STREAM,
291 "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
292 TRANS_LOGI(TRANS_STREAM,
293 "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
294 metricList->type = BANDWIDTH_ESTIMATE_VALUE;
295 metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
296 metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
297 }
298 if (info->evt == FT_EVT_JITTER_DET) {
299 TRANS_LOGI(TRANS_STREAM,
300 "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
301 TRANS_LOGI(TRANS_STREAM,
302 "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
303 TRANS_LOGI(TRANS_STREAM,
304 "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
305 metricList->type = JITTER_DETECTION_VALUE;
306 metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
307 metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
308 }
309 }
310 #endif
311
312 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int32_t fd,const FtEventCbkInfo * info)313 int32_t VtpStreamSocket::FillpStatistics(int32_t fd, const FtEventCbkInfo *info)
314 {
315 if (info == nullptr || fd < 0) {
316 TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
317 return SOFTBUS_INVALID_PARAM;
318 }
319 if (info->evt == FT_EVT_FRAME_STATS) {
320 TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
321 return HandleFillpFrameStats(fd, info);
322 } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
323 TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
324 return HandleRipplePolicy(fd, info);
325 } else if (IsVtpFrameSentEvt(info)) {
326 TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
327 return HandleFillpFrameEvt(fd, info);
328 }
329 #ifdef FILLP_SUPPORT_BW_DET
330 if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
331 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
332 int16_t tvCount = 1;
333 QosTv metricList = {};
334
335 FillSupportDet(fd, info, &metricList);
336 metricList.info.wifiChannelInfo = {};
337 metricList.info.frameStatusInfo = {};
338
339 std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
340 auto itLock = g_streamSocketLockMap.find(fd);
341 if (itLock != g_streamSocketLockMap.end()) {
342 std::lock_guard<std::mutex> guard(streamSocketMapLock_);
343 auto itListener = g_streamSocketMap.find(fd);
344 if (itListener != g_streamSocketMap.end()) {
345 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
346 const std::string threadName = "OS_qosEvent";
347 pthread_setname_np(pthread_self(), threadName.c_str());
348 std::lock_guard<std::mutex> guard(itLock->second);
349 itListener->second->OnQosEvent(eventId, tvCount, &metricList);
350 }).detach();
351 } else {
352 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
353 }
354 } else {
355 TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
356 }
357 } else {
358 TRANS_LOGE(TRANS_STREAM,
359 "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
360 return -1;
361 }
362 #endif
363 return SOFTBUS_OK;
364 }
365
FillpAppStatistics()366 void VtpStreamSocket::FillpAppStatistics()
367 {
368 int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
369 int16_t tvCount = 1;
370 QosTv metricList = {};
371 FillpStatisticsPcb fillpPcbStats = {};
372 SoftBusSysTime fillpStatsGetTime = {0};
373
374 int32_t getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
375 SoftBusGetTime(&fillpStatsGetTime);
376 if (getStatisticsRet == 0) {
377 metricList.type = STREAM_TRAFFIC_STASTICS;
378 metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
379 MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
380 metricList.info.appStatistics.periodRecvBits =
381 static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
382 metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
383 metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
384 metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
385 metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
386 metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
387 metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
388 metricList.info.appStatistics.periodRecvPktLossHighPrecision =
389 fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
390 metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
391 metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
392 metricList.info.appStatistics.periodSendPktLossHighPrecision =
393 fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
394 metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
395 metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
396
397 TRANS_LOGD(TRANS_STREAM,
398 "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
399 TRANS_LOGD(TRANS_STREAM,
400 "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
401
402 std::lock_guard<std::mutex> guard(streamSocketLock_);
403
404 if (streamReceiver_ != nullptr) {
405 TRANS_LOGD(TRANS_STREAM,
406 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
407 streamFd_);
408 streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
409 } else {
410 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
411 }
412 } else {
413 TRANS_LOGE(TRANS_STREAM,
414 "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
415 streamFd_, FtGetErrno());
416 }
417 }
418
CreateClient(IpAndPort & local,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)419 bool VtpStreamSocket::CreateClient(IpAndPort &local, int32_t streamType, std::pair<uint8_t *, uint32_t> sessionKey)
420 {
421 int32_t fd = CreateAndBindSocket(local, false);
422 if (fd == -1) {
423 TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
424 DestroyStreamSocket();
425 return false;
426 }
427
428 sessionKey_.second = sessionKey.second;
429 if (sessionKey_.first == nullptr) {
430 sessionKey_.first = new uint8_t[sessionKey_.second];
431 }
432 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
433 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
434 return false;
435 }
436
437 streamType_ = streamType;
438 std::lock_guard<std::mutex> guard(streamSocketLock_);
439 streamFd_ = fd;
440 configCv_.notify_all();
441
442 TRANS_LOGI(TRANS_STREAM,
443 "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
444 return true;
445 }
446
CreateClient(IpAndPort & local,const IpAndPort & remote,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)447 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int32_t streamType,
448 std::pair<uint8_t *, uint32_t> sessionKey)
449 {
450 if (!CreateClient(local, streamType, sessionKey)) {
451 return false;
452 }
453 /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
454 #ifdef FILLP_SUPPORT_BW_DET
455 bool isServer = false;
456 EnableBwEstimationAlgo(streamFd_, isServer);
457 #endif
458
459 bool connectRet = Connect(remote);
460 if (connectRet) {
461 bool isServer = false;
462 RegisterMetricCallback(isServer); /* register the callback function */
463 }
464 return connectRet;
465 }
466
CreateServer(IpAndPort & local,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)467 bool VtpStreamSocket::CreateServer(IpAndPort &local, int32_t streamType, std::pair<uint8_t *, uint32_t> sessionKey)
468 {
469 TRANS_LOGD(TRANS_STREAM, "enter.");
470 listenFd_ = CreateAndBindSocket(local, true);
471 if (listenFd_ == -1) {
472 TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
473 DestroyStreamSocket();
474 return false;
475 }
476
477 bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
478 if (ret) {
479 TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
480 DestroyStreamSocket();
481 return false;
482 }
483
484 epollFd_ = FtEpollCreate();
485 if (epollFd_ < 0) {
486 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
487 DestroyStreamSocket();
488 return false;
489 }
490 isStreamRecv_ = true;
491 streamType_ = streamType;
492 sessionKey_.second = sessionKey.second;
493 if (sessionKey_.first == nullptr) {
494 sessionKey_.first = new uint8_t[sessionKey_.second];
495 }
496 if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
497 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
498 return false;
499 }
500
501 CreateServerProcessThread();
502 TRANS_LOGI(TRANS_STREAM,
503 "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
504 streamType_);
505 return true;
506 }
507
DestroyStreamSocket()508 void VtpStreamSocket::DestroyStreamSocket()
509 {
510 TRANS_LOGD(TRANS_STREAM, "enter.");
511 std::lock_guard<std::mutex> guard(streamSocketLock_);
512 if (isDestroyed_) {
513 TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
514 return;
515 }
516 if (listenFd_ != -1) {
517 TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
518 FtClose(listenFd_);
519 listenFd_ = -1;
520 }
521
522 if (streamFd_ != -1) {
523 RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
524 RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
525 TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
526 FtClose(streamFd_);
527 streamFd_ = -1;
528 }
529
530 if (epollFd_ != -1) {
531 TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
532 FtClose(epollFd_);
533 epollFd_ = -1;
534 }
535
536 if (streamReceiver_ != nullptr) {
537 TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
538 streamReceiver_->OnStreamStatus(STREAM_CLOSED);
539 streamReceiver_.reset();
540 }
541
542 QuitStreamBuffer();
543 vtpInstance_->UpdateSocketStreamCount(false);
544 isDestroyed_ = true;
545 TRANS_LOGD(TRANS_STREAM, "ok");
546 }
547
Connect(const IpAndPort & remote)548 bool VtpStreamSocket::Connect(const IpAndPort &remote)
549 {
550 if (remote.ip.empty()) {
551 TRANS_LOGE(TRANS_STREAM, "remote addr error, ip is nullptr");
552 DestroyStreamSocket();
553 return false;
554 }
555
556 TRANS_LOGD(TRANS_STREAM,
557 "Connect to server remotePort=%{public}d", remote.port);
558 remoteIpPort_ = remote;
559
560 struct sockaddr_in remoteSockAddr;
561 remoteSockAddr.sin_family = AF_INET;
562 remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
563 remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
564
565 int32_t ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
566 if (ret != SOFTBUS_OK) {
567 TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
568 DestroyStreamSocket();
569 return false;
570 }
571
572 epollFd_ = FtEpollCreate();
573 if (epollFd_ < 0) {
574 TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
575 DestroyStreamSocket();
576 return false;
577 }
578
579 if (SetSocketEpollMode(streamFd_) != ERR_OK) {
580 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
581 DestroyStreamSocket();
582 return false;
583 }
584 isStreamRecv_ = true;
585 TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
586
587 CreateClientProcessThread();
588 return true;
589 }
590
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)591 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
592 {
593 StreamPacketizer packet(streamType_, std::move(stream));
594 auto plainData = packet.PacketizeStream();
595 if (plainData == nullptr) {
596 TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
597 return false;
598 }
599 len = packet.GetPacketLen() + GetEncryptOverhead();
600 TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
601 packet.GetPacketLen(), GetEncryptOverhead());
602 data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
603 ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
604 if (encLen != len) {
605 TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
606 return false;
607 }
608 InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
609 len += FRAME_HEADER_LEN;
610
611 return true;
612 }
613
Send(std::unique_ptr<IStream> stream)614 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
615 {
616 TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
617 streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
618
619 if (!isBlocked_) {
620 isBlocked_ = true;
621 if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
622 TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
623 return false;
624 }
625 }
626
627 int32_t ret = -1;
628 std::unique_ptr<char[]> data = nullptr;
629 ssize_t len = 0;
630
631 if (streamType_ == RAW_STREAM) {
632 data = stream->GetBuffer();
633 len = stream->GetBufferLen();
634
635 ret = FtSend(streamFd_, data.get(), len, 0);
636 } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
637 const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
638 if (streamFrameInfo == nullptr) {
639 TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
640 return false;
641 }
642 if (!EncryptStreamPacket(std::move(stream), data, len)) {
643 return false;
644 }
645
646 FrameInfo frameInfo;
647 ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
648 ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
649 }
650
651 if (ret == -1) {
652 TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
653 return false;
654 }
655
656 TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
657 return true;
658 }
659
SetOption(int32_t type,const StreamAttr & value)660 bool VtpStreamSocket::SetOption(int32_t type, const StreamAttr &value)
661 {
662 PrintOptionInfo(type, value);
663 auto it = optFuncMap_.find(type);
664 if (it == optFuncMap_.end()) {
665 TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
666 return false;
667 }
668
669 if (value.GetType() != it->second.valueType) {
670 TRANS_LOGW(TRANS_STREAM,
671 "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
672 return false;
673 }
674
675 MySetFunc set = it->second.set;
676 if (set == nullptr) {
677 TRANS_LOGW(TRANS_STREAM, "set is nullptr");
678 return false;
679 }
680
681 if (type == NON_BLOCK || type == TOS) {
682 return (this->*set)(static_cast<int32_t>(streamFd_), value);
683 }
684
685 auto outerIt = FILLP_TYPE_MAP.find(type);
686 if (outerIt != FILLP_TYPE_MAP.end()) {
687 return (this->*set)(outerIt->second, value);
688 }
689
690 auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
691 if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
692 return (this->*set)(innerIt->second, value);
693 }
694
695 return (this->*set)(static_cast<int32_t>(type), value);
696 }
697
GetOption(int32_t type) const698 StreamAttr VtpStreamSocket::GetOption(int32_t type) const
699 {
700 StreamAttr attr {};
701 auto it = optFuncMap_.find(type);
702 if (it != optFuncMap_.end()) {
703 MyGetFunc get = it->second.get;
704 if (get == nullptr) {
705 TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
706 return std::move(StreamAttr());
707 }
708 if (type == NON_BLOCK) {
709 attr = (this->*get)(static_cast<int32_t>(streamFd_));
710 } else {
711 attr = (this->*get)(static_cast<int32_t>(type));
712 }
713 }
714
715 PrintOptionInfo(type, attr);
716 return attr;
717 }
718
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)719 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
720 {
721 if (receiver == nullptr) {
722 TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
723 return false;
724 }
725
726 std::lock_guard<std::mutex> guard(streamSocketLock_);
727 streamReceiver_ = receiver;
728 TRANS_LOGI(TRANS_STREAM, "set receiver success");
729 return true;
730 }
731
InitVtpInstance(const std::string & pkgName)732 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
733 {
734 return vtpInstance_->InitVtp(pkgName);
735 }
736
DestroyVtpInstance(const std::string & pkgName)737 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
738 {
739 TRANS_LOGD(TRANS_STREAM, "enter.");
740 vtpInstance_->DestroyVtp(pkgName);
741 TRANS_LOGD(TRANS_STREAM, "ok");
742 }
743
CreateAndBindSocket(IpAndPort & local,bool isServer)744 int32_t VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
745 {
746 localIpPort_ = local;
747 vtpInstance_->UpdateSocketStreamCount(true);
748 if (local.ip.empty()) {
749 TRANS_LOGE(TRANS_STREAM, "ip is empty");
750 return -1;
751 }
752
753 int32_t sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
754 if (sockFd == -1) {
755 TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
756 return -1;
757 }
758 if (!isServer) {
759 TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
760 streamFd_ = sockFd;
761 }
762 SetDefaultConfig(sockFd);
763
764 // bind
765 sockaddr_in localSockAddr = { 0 };
766 char host[ADDR_MAX_SIZE];
767 localSockAddr.sin_family = AF_INET;
768 localSockAddr.sin_port = htons((short)local.port);
769 localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
770 localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
771 if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
772 TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
773 }
774
775 socklen_t localAddrLen = sizeof(localSockAddr);
776 int32_t ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
777 if (ret == -1) {
778 FtClose(sockFd);
779 TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
780 return -1;
781 }
782
783 // 获取port
784 ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
785 if (ret != ERR_OK) {
786 TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
787 FtClose(sockFd);
788 return -1;
789 }
790
791 localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
792 local.port = localIpPort_.port;
793
794 return sockFd;
795 }
796
797
EnableBwEstimationAlgo(int32_t streamFd,bool isServer) const798 bool VtpStreamSocket::EnableBwEstimationAlgo(int32_t streamFd, bool isServer) const
799 {
800 #ifdef FILLP_SUPPORT_BW_DET
801 int32_t errBwDet;
802 if (isServer) {
803 int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
804 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
805 &enableBwDet, sizeof(enableBwDet));
806 } else {
807 int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
808 errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
809 &enableBwDet, sizeof(enableBwDet));
810 }
811 if (errBwDet < 0) {
812 TRANS_LOGE(TRANS_STREAM,
813 "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
814 streamFd, FtGetErrno());
815 return true;
816 } else {
817 TRANS_LOGE(TRANS_STREAM,
818 "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
819 return false;
820 }
821 #else
822 return true;
823 #endif
824 }
825
EnableJitterDetectionAlgo(int32_t streamFd) const826 bool VtpStreamSocket::EnableJitterDetectionAlgo(int32_t streamFd) const
827 {
828 #ifdef FILLP_SUPPORT_CQE
829 int32_t enableCQE = FILLP_CQE_ENABLE;
830 auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
831 if (errCQE < 0) {
832 TRANS_LOGE(TRANS_STREAM,
833 "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
834 return true;
835 } else {
836 TRANS_LOGE(TRANS_STREAM,
837 "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
838 return false;
839 }
840 #else
841 return true;
842 #endif
843 }
844
EnableDirectlySend(int32_t streamFd) const845 bool VtpStreamSocket::EnableDirectlySend(int32_t streamFd) const
846 {
847 int32_t enable = 1;
848 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
849 if (ret < 0) {
850 TRANS_LOGE(TRANS_STREAM,
851 "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
852 return false;
853 }
854 TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
855 return true;
856 }
857
EnableSemiReliable(int32_t streamFd) const858 bool VtpStreamSocket::EnableSemiReliable(int32_t streamFd) const
859 {
860 int32_t enable = 1;
861 FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
862 if (ret < 0) {
863 TRANS_LOGE(TRANS_STREAM,
864 "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
865 return false;
866 }
867 TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
868 return true;
869 }
870
RegisterMetricCallback(bool isServer)871 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
872 {
873 VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
874 auto self = this->GetSelf();
875 VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
876 int32_t regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
877 int32_t value = 1;
878 auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
879 if (err < 0) {
880 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
881 return;
882 }
883 TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
884 if (isServer) {
885 if (regStatisticsRet == 0) {
886 TRANS_LOGI(TRANS_STREAM,
887 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
888 } else {
889 TRANS_LOGE(TRANS_STREAM,
890 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
891 streamFd_, regStatisticsRet);
892 }
893 } else {
894 if (regStatisticsRet == 0) {
895 TRANS_LOGI(TRANS_STREAM,
896 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
897 } else {
898 TRANS_LOGE(TRANS_STREAM,
899 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
900 streamFd_, regStatisticsRet);
901 }
902 }
903 }
904
Accept()905 bool VtpStreamSocket::Accept()
906 {
907 TRANS_LOGD(TRANS_STREAM, "enter.");
908 auto fd = FtAccept(listenFd_, nullptr, nullptr);
909 TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
910 if (fd == -1) {
911 TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
912 return false;
913 }
914
915 sockaddr remoteAddr {};
916 socklen_t remoteAddrLen = sizeof(remoteAddr);
917 auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
918 if (ret != ERR_OK) {
919 TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
920 FtClose(fd);
921 return false;
922 }
923
924 char host[ADDR_MAX_SIZE];
925 if (remoteAddr.sa_family == AF_INET) {
926 auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
927 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
928 remoteIpPort_.port = v4Addr->sin_port;
929 } else {
930 auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
931 remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
932 remoteIpPort_.port = v6Addr->sin6_port;
933 }
934
935 TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
936
937 if (SetSocketEpollMode(fd) != ERR_OK) {
938 TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
939 FtClose(fd);
940 return false;
941 }
942
943 std::lock_guard<std::mutex> guard(streamSocketLock_);
944 streamFd_ = fd;
945 configCv_.notify_all();
946
947 if (streamReceiver_ != nullptr) {
948 TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
949 streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
950 }
951
952 bool isServer = true;
953 RegisterMetricCallback(isServer); /* register the callback function */
954 /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
955 #ifdef FILLP_SUPPORT_BW_DET
956 EnableBwEstimationAlgo(streamFd_, isServer);
957 #endif
958 #ifdef FILLP_SUPPORT_CQE
959 EnableJitterDetectionAlgo(streamFd_);
960 #endif
961
962 TRANS_LOGI(TRANS_STREAM, "accept success!");
963 return true;
964 }
965
EpollTimeout(int32_t fd,int32_t timeout)966 int32_t VtpStreamSocket::EpollTimeout(int32_t fd, int32_t timeout)
967 {
968 struct SpungeEpollEvent events[MAX_EPOLL_NUM];
969 (void)memset_s(events, sizeof(events), 0, sizeof(events));
970 while (true) {
971 FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
972 if (fdNum <= 0) {
973 TRANS_LOGE(TRANS_STREAM,
974 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
975 return -FtGetErrno();
976 }
977
978 for (FILLP_INT i = 0; i < fdNum; i++) {
979 if (events[i].data.fd != fd) {
980 continue;
981 }
982
983 if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
984 TRANS_LOGE(TRANS_STREAM,
985 "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
986 (unsigned int)events[i].events);
987 return -1;
988 }
989
990 if (events[i].events & SPUNGE_EPOLLIN) {
991 return SOFTBUS_OK;
992 }
993 }
994 }
995 }
996
SetSocketEpollMode(int32_t fd)997 int32_t VtpStreamSocket::SetSocketEpollMode(int32_t fd)
998 {
999 if (!SetNonBlockMode(fd, StreamAttr(true))) {
1000 TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
1001 return -1;
1002 }
1003
1004 struct SpungeEpollEvent event = {0};
1005 event.events = SPUNGE_EPOLLIN;
1006 event.data.fd = fd;
1007
1008 auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
1009 if (ret != ERR_OK) {
1010 TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
1011 return ret;
1012 }
1013
1014 TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1015 return SOFTBUS_OK;
1016 }
1017
InsertBufferLength(int32_t num,int32_t length,uint8_t * output) const1018 void VtpStreamSocket::InsertBufferLength(int32_t num, int32_t length, uint8_t *output) const
1019 {
1020 for (int32_t i = 0; i < length; i++) {
1021 output[length - 1 - i] = static_cast<unsigned int>(
1022 ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1023 }
1024 }
1025
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1026 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1027 {
1028 std::unique_ptr<IStream> stream = nullptr;
1029 switch (streamType_) {
1030 case VIDEO_SLICE_STREAM:
1031 TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1032 break;
1033 case COMMON_VIDEO_STREAM:
1034 case COMMON_AUDIO_STREAM:
1035 TRANS_LOGD(TRANS_STREAM,
1036 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1037 streamType_, info.seqNum, info.streamId);
1038 stream = IStream::MakeCommonStream(data, info);
1039 break;
1040 case RAW_STREAM:
1041 TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1042 stream = IStream::MakeRawStream(data, info);
1043 break;
1044 default:
1045 TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1046 break;
1047 }
1048 if (stream == nullptr) {
1049 TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1050 return nullptr;
1051 }
1052
1053 return stream;
1054 }
1055
RecvStreamLen()1056 int32_t VtpStreamSocket::RecvStreamLen()
1057 {
1058 int32_t hdrSize = FRAME_HEADER_LEN;
1059 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1060 hdrSize = streamHdrSize_;
1061 }
1062
1063 int32_t len = -1;
1064 int32_t timeout = -1;
1065 auto buffer = std::make_unique<char[]>(hdrSize);
1066 if (EpollTimeout(streamFd_, timeout) == 0) {
1067 do {
1068 len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1069 } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1070 }
1071 TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1072
1073 if (len <= 0) {
1074 TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1075 return -1;
1076 }
1077
1078 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1079 std::lock_guard<std::mutex> guard(streamSocketLock_);
1080 if (streamReceiver_ != nullptr) {
1081 return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1082 }
1083 }
1084
1085 return ntohl(*reinterpret_cast<int32_t *>(buffer.get()));
1086 }
1087
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1088 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1089 int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1090 {
1091 TRANS_LOGD(TRANS_STREAM, "recv common stream");
1092 int32_t decryptedLength = dataLength;
1093 auto decryptedBuffer = std::move(dataBuffer);
1094
1095 int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1096 if (plainDataLength <= 0) {
1097 TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1098 return false;
1099 }
1100 std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1101 ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1102 if (decLen != plainDataLength) {
1103 TRANS_LOGE(TRANS_STREAM,
1104 "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1105 return false;
1106 }
1107 auto header = plainData.get();
1108 StreamDepacketizer decode(streamType_);
1109 if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1110 TRANS_LOGE(TRANS_STREAM,
1111 "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1112 return false;
1113 }
1114 decode.DepacketizeHeader(header);
1115
1116 auto buffer = plainData.get() + sizeof(CommonHeader);
1117 decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1118
1119 extBuffer = decode.GetUserExt();
1120 extLen = decode.GetUserExtSize();
1121 info = decode.GetFrameInfo();
1122 dataBuffer = decode.GetData();
1123 dataLength = decode.GetDataLength();
1124 if (dataLength <= 0) {
1125 TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1126 return false;
1127 }
1128 return true;
1129 }
1130
DoStreamRecv()1131 void VtpStreamSocket::DoStreamRecv()
1132 {
1133 while (isStreamRecv_) {
1134 std::unique_ptr<char[]> dataBuffer = nullptr;
1135 std::unique_ptr<char[]> extBuffer = nullptr;
1136 int32_t extLen = 0;
1137 StreamFrameInfo info = {};
1138 TRANS_LOGD(TRANS_STREAM, "recv stream");
1139 int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1140 if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1141 TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1142 break;
1143 }
1144 TRANS_LOGD(TRANS_STREAM,
1145 "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1146 dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1147
1148 if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1149 if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1150 break;
1151 }
1152 }
1153
1154 StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1155 std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1156 if (stream == nullptr) {
1157 TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1158 break;
1159 }
1160
1161 TRANS_LOGD(TRANS_STREAM,
1162 "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1163
1164 if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1165 std::lock_guard<std::mutex> guard(streamSocketLock_);
1166 if (streamReceiver_ != nullptr) {
1167 streamReceiver_->OnStreamReceived(std::move(stream));
1168 continue;
1169 }
1170 }
1171
1172 PutStream(std::move(stream));
1173 TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1174 }
1175 TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1176 }
1177
RecvStream(int32_t dataLength)1178 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1179 {
1180 auto buffer = std::make_unique<char[]>(dataLength);
1181 int32_t recvLen = 0;
1182 while (recvLen < dataLength) {
1183 int32_t ret = -1;
1184 int32_t timeout = -1;
1185
1186 if (EpollTimeout(streamFd_, timeout) == 0) {
1187 do {
1188 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1189 } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1190 }
1191
1192 if (ret == -1) {
1193 TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1194 return nullptr;
1195 }
1196
1197 recvLen += ret;
1198 }
1199 return std::unique_ptr<char[]>(buffer.release());
1200 }
1201
SetDefaultConfig(int32_t fd)1202 void VtpStreamSocket::SetDefaultConfig(int32_t fd)
1203 {
1204 if (!SetIpTos(fd, StreamAttr(static_cast<int32_t>(IPTOS_LOWDELAY)))) {
1205 TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1206 }
1207 // Set Fillp direct sending
1208 if (!EnableDirectlySend(fd)) {
1209 TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1210 }
1211
1212 if (!EnableSemiReliable(fd)) {
1213 TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1214 }
1215 // Set Fillp Differentiated Transmission
1216 FILLP_BOOL enable = 1;
1217 if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1218 TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1219 }
1220
1221 if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int32_t>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1222 TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1223 }
1224
1225 if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int32_t>(DEFAULT_UDP_BUFFER_SIZE)))) {
1226 TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1227 }
1228
1229 if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int32_t>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1230 TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1231 }
1232
1233 if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int32_t>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1234 TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1235 }
1236 }
1237
SetIpTos(int32_t fd,const StreamAttr & tos)1238 bool VtpStreamSocket::SetIpTos(int32_t fd, const StreamAttr &tos)
1239 {
1240 auto tmp = tos.GetIntValue();
1241 if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1242 TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1243 return false;
1244 }
1245
1246 TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1247 return true;
1248 }
1249
GetIpTos(int32_t type) const1250 StreamAttr VtpStreamSocket::GetIpTos(int32_t type) const
1251 {
1252 static_cast<void>(type);
1253 int32_t tos;
1254 int32_t size = sizeof(tos);
1255
1256 if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1257 TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1258 return std::move(StreamAttr());
1259 }
1260
1261 return std::move(StreamAttr(tos));
1262 }
1263
GetStreamSocketFd(int32_t type) const1264 StreamAttr VtpStreamSocket::GetStreamSocketFd(int32_t type) const
1265 {
1266 static_cast<void>(type);
1267 return std::move(StreamAttr(streamFd_));
1268 }
1269
GetListenSocketFd(int32_t type) const1270 StreamAttr VtpStreamSocket::GetListenSocketFd(int32_t type) const
1271 {
1272 static_cast<void>(type);
1273 return std::move(StreamAttr(listenFd_));
1274 }
1275
SetSocketBoundInner(int32_t fd,std::string ip) const1276 bool VtpStreamSocket::SetSocketBoundInner(int32_t fd, std::string ip) const
1277 {
1278 auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1279 struct ifaddrs *ifList = nullptr;
1280 if (getifaddrs(&ifList) < 0) {
1281 TRANS_LOGE(TRANS_STREAM,
1282 "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1283 return false;
1284 }
1285
1286 struct ifaddrs *ifa = nullptr;
1287 for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1288 if (ifa->ifa_addr == nullptr) {
1289 continue;
1290 }
1291 if (ifa->ifa_addr->sa_family != AF_INET) {
1292 continue;
1293 }
1294
1295 char host[ADDR_MAX_SIZE];
1296 std::string devName(ifa->ifa_name);
1297 if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1298 host, ADDR_MAX_SIZE)) == 0) {
1299 TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1300 auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1301 if (err < 0) {
1302 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1303 freeifaddrs(ifList);
1304 return false;
1305 }
1306 break;
1307 }
1308 }
1309 freeifaddrs(ifList);
1310
1311 return true;
1312 }
1313
SetSocketBindToDevices(int32_t type,const StreamAttr & ip)1314 bool VtpStreamSocket::SetSocketBindToDevices(int32_t type, const StreamAttr &ip)
1315 {
1316 static_cast<void>(type);
1317 auto tmp = ip.GetStrValue();
1318 auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1319 return SetSocketBoundInner(streamFd_, boundIp);
1320 }
1321
SetVtpStackConfigDelayed(int32_t type,const StreamAttr & value)1322 bool VtpStreamSocket::SetVtpStackConfigDelayed(int32_t type, const StreamAttr &value)
1323 {
1324 std::unique_lock<std::mutex> lock(streamSocketLock_);
1325 if (streamFd_ == -1) {
1326 configCv_.wait(lock, [this] { return streamFd_ != -1; });
1327 }
1328 TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1329 return SetVtpStackConfig(type, value);
1330 }
1331
SetVtpStackConfig(int32_t type,const StreamAttr & value)1332 bool VtpStreamSocket::SetVtpStackConfig(int32_t type, const StreamAttr &value)
1333 {
1334 if (streamFd_ == -1) {
1335 TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1336 auto self = GetSelf();
1337 std::thread([self, type, value]() {
1338 const std::string threadName = "OS_setVtpCfg";
1339 pthread_setname_np(pthread_self(), threadName.c_str());
1340 self->SetVtpStackConfigDelayed(type, value);
1341 }).detach();
1342 return true;
1343 }
1344
1345 if (value.GetType() == INT_TYPE) {
1346 int32_t intVal = value.GetIntValue();
1347 int32_t ret = FtConfigSet(type, &intVal, &streamFd_);
1348 if (ret != SOFTBUS_OK) {
1349 TRANS_LOGE(TRANS_STREAM,
1350 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1351 return false;
1352 }
1353
1354 TRANS_LOGI(TRANS_STREAM,
1355 "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1356 return true;
1357 }
1358
1359 if (value.GetType() == BOOL_TYPE) {
1360 bool flag = value.GetBoolValue();
1361 int32_t ret = FtConfigSet(type, &flag, &streamFd_);
1362 if (ret != SOFTBUS_OK) {
1363 TRANS_LOGE(TRANS_STREAM,
1364 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1365 return false;
1366 }
1367
1368 TRANS_LOGI(TRANS_STREAM,
1369 "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1370 return true;
1371 }
1372
1373 TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1374 return false;
1375 }
1376
GetVtpStackConfig(int32_t type) const1377 StreamAttr VtpStreamSocket::GetVtpStackConfig(int32_t type) const
1378 {
1379 int32_t intVal = -1;
1380 int32_t configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1381 int32_t ret = FtConfigGet(type, &intVal, &configFd);
1382 if (ret != SOFTBUS_OK) {
1383 TRANS_LOGE(TRANS_STREAM,
1384 "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1385 return std::move(StreamAttr());
1386 }
1387
1388 int32_t valType = ValueType::UNKNOWN;
1389 for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1390 if (it->second != type) {
1391 continue;
1392 }
1393
1394 valType = optFuncMap_.at(it->first).valueType;
1395 break;
1396 }
1397
1398 if (valType != ValueType::UNKNOWN) {
1399 for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1400 if (it->second != type) {
1401 continue;
1402 }
1403
1404 valType = optFuncMap_.at(it->first).valueType;
1405 break;
1406 }
1407 }
1408
1409 if (valType == BOOL_TYPE) {
1410 return std::move(StreamAttr(!!intVal));
1411 }
1412
1413 return std::move(StreamAttr(intVal));
1414 }
1415
SetNonBlockMode(int32_t fd,const StreamAttr & value)1416 bool VtpStreamSocket::SetNonBlockMode(int32_t fd, const StreamAttr &value)
1417 {
1418 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1419 if (flags < 0) {
1420 TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1421 flags = 0;
1422 }
1423 bool nonBlock = value.GetBoolValue();
1424
1425 flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1426 static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1427
1428 FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1429 if (res < 0) {
1430 TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1431 return false;
1432 }
1433
1434 TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1435 return true;
1436 }
1437
GetNonBlockMode(int32_t fd) const1438 StreamAttr VtpStreamSocket::GetNonBlockMode(int32_t fd) const
1439 {
1440 FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1441 if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1442 return std::move(StreamAttr(true));
1443 }
1444
1445 return std::move(StreamAttr(false));
1446 }
1447
GetIp(int32_t type) const1448 StreamAttr VtpStreamSocket::GetIp(int32_t type) const
1449 {
1450 if (type == LOCAL_IP) {
1451 return std::move(StreamAttr(localIpPort_.ip));
1452 }
1453
1454 return std::move(StreamAttr(remoteIpPort_.ip));
1455 }
1456
GetPort(int32_t type) const1457 StreamAttr VtpStreamSocket::GetPort(int32_t type) const
1458 {
1459 if (type == LOCAL_PORT) {
1460 return std::move(StreamAttr(localIpPort_.port));
1461 }
1462 return std::move(StreamAttr(remoteIpPort_.port));
1463 }
1464
SetStreamType(int32_t type,const StreamAttr & value)1465 bool VtpStreamSocket::SetStreamType(int32_t type, const StreamAttr &value)
1466 {
1467 if (type != STREAM_TYPE_INT) {
1468 return false;
1469 }
1470
1471 streamType_ = value.GetIntValue();
1472 return true;
1473 }
1474
GetStreamType(int32_t type) const1475 StreamAttr VtpStreamSocket::GetStreamType(int32_t type) const
1476 {
1477 if (type != STREAM_TYPE_INT) {
1478 return std::move(StreamAttr());
1479 }
1480
1481 return std::move(StreamAttr(streamType_));
1482 }
1483
SetStreamScene(int32_t type,const StreamAttr & value)1484 bool VtpStreamSocket::SetStreamScene(int32_t type, const StreamAttr &value)
1485 {
1486 static_cast<void>(type);
1487 if (value.GetType() != INT_TYPE) {
1488 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1489 return false;
1490 }
1491 scene_ = value.GetIntValue();
1492 TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1493 return true;
1494 }
1495
SetStreamHeaderSize(int32_t type,const StreamAttr & value)1496 bool VtpStreamSocket::SetStreamHeaderSize(int32_t type, const StreamAttr &value)
1497 {
1498 static_cast<void>(type);
1499 if (value.GetType() != INT_TYPE) {
1500 TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1501 return false;
1502 }
1503 streamHdrSize_ = value.GetIntValue();
1504 TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1505 return true;
1506 }
1507
NotifyStreamListener()1508 void VtpStreamSocket::NotifyStreamListener()
1509 {
1510 while (isStreamRecv_) {
1511 int32_t streamNum = GetStreamNum();
1512 if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1513 TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1514 }
1515
1516 auto stream = TakeStream();
1517 if (stream == nullptr) {
1518 TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1519 break;
1520 }
1521
1522 std::lock_guard<std::mutex> guard(streamSocketLock_);
1523 if (streamReceiver_ != nullptr) {
1524 TRANS_LOGD(TRANS_STREAM, "notify listener");
1525 streamReceiver_->OnStreamReceived(std::move(stream));
1526 TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1527 }
1528 }
1529 TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1530 }
1531
GetEncryptOverhead() const1532 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1533 {
1534 return OVERHEAD_LEN;
1535 }
1536
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1537 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1538 {
1539 if (in == nullptr || out == nullptr) {
1540 TRANS_LOGE(TRANS_STREAM, "param invalid");
1541 return SOFTBUS_INVALID_PARAM;
1542 }
1543 AesGcmCipherKey cipherKey = {0};
1544
1545 if (inLen - OVERHEAD_LEN > outLen) {
1546 TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1547 return SOFTBUS_INVALID_PARAM;
1548 }
1549
1550 cipherKey.keyLen = SESSION_KEY_LENGTH;
1551 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1552 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1553 return SOFTBUS_MEM_ERR;
1554 }
1555
1556 int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1557 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1558 if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1559 TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1560 return SOFTBUS_ENCRYPT_ERR;
1561 }
1562 return outLen;
1563 }
1564
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1565 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1566 {
1567 if (in == nullptr || out == nullptr) {
1568 TRANS_LOGE(TRANS_STREAM, "param invalid");
1569 return SOFTBUS_INVALID_PARAM;
1570 }
1571 AesGcmCipherKey cipherKey = {0};
1572
1573 if (inLen - OVERHEAD_LEN > outLen) {
1574 TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1575 return SOFTBUS_INVALID_PARAM;
1576 }
1577
1578 cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1579 if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1580 TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1581 return SOFTBUS_MEM_ERR;
1582 }
1583 int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1584 (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1585 if (ret != SOFTBUS_OK) {
1586 TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1587 return SOFTBUS_DECRYPT_ERR;
1588 }
1589
1590 return outLen;
1591 }
1592
SetMultiLayer(const void * para)1593 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1594 {
1595 int32_t fd = GetStreamSocketFd(FD).GetIntValue();
1596 return VtpSetSocketMultiLayer(fd, &onStreamEvtCb_, para);
1597 }
1598
CreateServerProcessThread()1599 void VtpStreamSocket::CreateServerProcessThread()
1600 {
1601 auto self = this->GetSelf();
1602 std::thread([self]() {
1603 const std::string threadName = "OS_sntfStmLsn";
1604 pthread_setname_np(pthread_self(), threadName.c_str());
1605 self->NotifyStreamListener();
1606 }).detach();
1607
1608 std::thread([self]() {
1609 const std::string threadName = "OS_sdstyStmSkt";
1610 pthread_setname_np(pthread_self(), threadName.c_str());
1611 if (!self->Accept()) {
1612 self->DestroyStreamSocket();
1613 return;
1614 }
1615 self->DoStreamRecv();
1616 self->DestroyStreamSocket();
1617 }).detach();
1618
1619 bool &isDestroyed = isDestroyed_;
1620 std::thread([self, &isDestroyed]() {
1621 const std::string threadName = "OS_sfillStatic";
1622 pthread_setname_np(pthread_self(), threadName.c_str());
1623 while (!isDestroyed) {
1624 self->FillpAppStatistics();
1625 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1626 }
1627 }).detach();
1628 }
1629
CreateClientProcessThread()1630 void VtpStreamSocket::CreateClientProcessThread()
1631 {
1632 auto self = this->GetSelf();
1633 std::thread([self]() {
1634 const std::string threadName = "OS_cntfStmLsn";
1635 pthread_setname_np(pthread_self(), threadName.c_str());
1636 self->NotifyStreamListener();
1637 }).detach();
1638
1639 std::thread([self]() {
1640 const std::string threadName = "OS_cdstyStmSkt";
1641 pthread_setname_np(pthread_self(), threadName.c_str());
1642 self->DoStreamRecv();
1643 self->DestroyStreamSocket();
1644 }).detach();
1645
1646 bool &isDestroyed = isDestroyed_;
1647 std::thread([self, &isDestroyed]() {
1648 const std::string threadName = "OS_cfillStatic";
1649 pthread_setname_np(pthread_self(), threadName.c_str());
1650 while (!isDestroyed) {
1651 self->FillpAppStatistics();
1652 std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1653 }
1654 }).detach();
1655 }
1656 } // namespace SoftBus
1657 } // namespace Communication
1658