1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "vtp_stream_socket.h"
17 
18 #include <chrono>
19 #include <ifaddrs.h>
20 #include <memory>
21 #include <netinet/in.h>
22 #include <securec.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <thread>
26 
27 #include "fillpinc.h"
28 #include "raw_stream_data.h"
29 #include "session.h"
30 #include "softbus_adapter_crypto.h"
31 #include "softbus_adapter_socket.h"
32 #include "softbus_adapter_timer.h"
33 #include "softbus_errcode.h"
34 #include "softbus_trans_def.h"
35 #include "stream_common_data.h"
36 #include "stream_depacketizer.h"
37 #include "stream_packetizer.h"
38 #include "vtp_stream_opt.h"
39 
40 namespace Communication {
41 namespace SoftBus {
42 bool g_logOn = false;
43 const int32_t FEED_BACK_PERIOD = 1;  /* feedback period of fillp stream traffic statistics is 1s */
44 const int32_t MS_PER_SECOND = 1000;
45 const int32_t US_PER_MS = 1000;
46 
47 namespace {
PrintOptionInfo(int32_t type,const StreamAttr & value)48 void PrintOptionInfo(int32_t type, const StreamAttr &value)
49 {
50     switch (value.GetType()) {
51         case INT_TYPE:
52             TRANS_LOGI(TRANS_STREAM,
53                 "Int option: type=%{public}d, value=%{public}d", type, value.GetIntValue());
54             break;
55         case BOOL_TYPE:
56             TRANS_LOGI(TRANS_STREAM,
57                 "Bool option: type=%{public}d, value=%{public}d", type, value.GetBoolValue());
58             break;
59         case STRING_TYPE:
60             TRANS_LOGD(TRANS_STREAM,
61                 "String option: type=%{public}d, value=%{public}s", type, value.GetStrValue().c_str());
62             break;
63         default:
64             TRANS_LOGE(TRANS_STREAM, "Wrong StreamAttr!");
65             (void)type;
66     }
67 }
68 } // namespace
69 std::shared_ptr<VtpInstance> VtpStreamSocket::vtpInstance_ = VtpInstance::GetVtpInstance();
70 
71 std::map<int32_t, std::mutex &> VtpStreamSocket::g_streamSocketLockMap;
72 std::mutex VtpStreamSocket::streamSocketLockMapLock_;
73 std::map<int32_t, std::shared_ptr<VtpStreamSocket>> VtpStreamSocket::g_streamSocketMap;
74 std::mutex VtpStreamSocket::streamSocketMapLock_;
75 
ConvertStreamFrameInfo2FrameInfo(FrameInfo * frameInfo,const Communication::SoftBus::StreamFrameInfo * streamFrameInfo)76 static inline void ConvertStreamFrameInfo2FrameInfo(FrameInfo *frameInfo,
77     const Communication::SoftBus::StreamFrameInfo *streamFrameInfo)
78 {
79     frameInfo->frameType = (FILLP_INT)(streamFrameInfo->frameType);
80     frameInfo->seqNum = (FILLP_INT)(streamFrameInfo->seqNum);
81     frameInfo->subSeqNum = (FILLP_INT)(streamFrameInfo->seqSubNum);
82     frameInfo->level = (FILLP_INT)(streamFrameInfo->level);
83     frameInfo->timestamp = (FILLP_SLONG)streamFrameInfo->timeStamp;
84     frameInfo->bitMap = (FILLP_UINT32)streamFrameInfo->bitMap;
85 }
86 
AddStreamSocketLock(int32_t fd,std::mutex & streamsocketlock)87 void VtpStreamSocket::AddStreamSocketLock(int32_t fd, std::mutex &streamsocketlock)
88 {
89     std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
90     if (!g_streamSocketLockMap.empty() && g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
91         TRANS_LOGE(TRANS_STREAM, "streamsocketlock for the fd already exists. fd=%{public}d", fd);
92         return;
93     }
94 
95     g_streamSocketLockMap.emplace(std::pair<int32_t, std::mutex &>(fd, streamsocketlock));
96 }
97 
AddStreamSocketListener(int32_t fd,std::shared_ptr<VtpStreamSocket> streamreceiver)98 void VtpStreamSocket::AddStreamSocketListener(int32_t fd, std::shared_ptr<VtpStreamSocket> streamreceiver)
99 {
100     std::lock_guard<std::mutex> guard(streamSocketMapLock_);
101     if (!g_streamSocketMap.empty() && g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
102         TRANS_LOGE(TRANS_STREAM, "streamreceiver for the fd already exists. fd=%{public}d", fd);
103         return;
104     }
105 
106     g_streamSocketMap.emplace(std::pair<int32_t, std::shared_ptr<VtpStreamSocket>>(fd, streamreceiver));
107 }
108 
RemoveStreamSocketLock(int32_t fd)109 void VtpStreamSocket::RemoveStreamSocketLock(int32_t fd)
110 {
111     std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
112     if (g_streamSocketLockMap.find(fd) != g_streamSocketLockMap.end()) {
113         g_streamSocketLockMap.erase(fd);
114         TRANS_LOGI(TRANS_STREAM, "Remove streamsocketlock for the fd success. fd=%{public}d", fd);
115     } else {
116         TRANS_LOGE(TRANS_STREAM,
117             "Streamsocketlock for the fd not exist in the map. fd=%{public}d", fd);
118     }
119 }
120 
RemoveStreamSocketListener(int32_t fd)121 void VtpStreamSocket::RemoveStreamSocketListener(int32_t fd)
122 {
123     std::lock_guard<std::mutex> guard(streamSocketMapLock_);
124     if (g_streamSocketMap.find(fd) != g_streamSocketMap.end()) {
125         g_streamSocketMap.erase(fd);
126         TRANS_LOGI(TRANS_STREAM, "Remove streamreceiver for the fd success. fd=%{public}d", fd);
127     } else {
128         TRANS_LOGE(TRANS_STREAM, "Streamreceiver for the fd not exist in the map. fd=%{public}d", fd);
129     }
130 }
131 
InsertElementToFuncMap(int32_t type,ValueType valueType,MySetFunc set,MyGetFunc get)132 void VtpStreamSocket::InsertElementToFuncMap(int32_t type, ValueType valueType, MySetFunc set, MyGetFunc get)
133 {
134     OptionFunc fun = {
135         valueType, set, get
136     };
137     optFuncMap_.insert(std::pair<int32_t, OptionFunc>(type, fun));
138 }
139 
VtpStreamSocket()140 VtpStreamSocket::VtpStreamSocket()
141 {
142     InsertElementToFuncMap(TOS, INT_TYPE, &VtpStreamSocket::SetIpTos, &VtpStreamSocket::GetIpTos);
143     InsertElementToFuncMap(FD, INT_TYPE, nullptr, &VtpStreamSocket::GetStreamSocketFd);
144     InsertElementToFuncMap(SERVER_FD, INT_TYPE, nullptr, &VtpStreamSocket::GetListenSocketFd);
145     InsertElementToFuncMap(LOCAL_IP, INT_TYPE, nullptr, &VtpStreamSocket::GetIp);
146     InsertElementToFuncMap(LOCAL_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
147     InsertElementToFuncMap(REMOTE_IP, STRING_TYPE, nullptr, &VtpStreamSocket::GetIp);
148     InsertElementToFuncMap(REMOTE_PORT, INT_TYPE, nullptr, &VtpStreamSocket::GetPort);
149     InsertElementToFuncMap(BOUND_INTERFACE_IP, STRING_TYPE, &VtpStreamSocket::SetSocketBindToDevices, nullptr);
150     InsertElementToFuncMap(IP_TYPE, STRING_TYPE, nullptr, &VtpStreamSocket::GetIpType);
151     InsertElementToFuncMap(REMOTE_SCOPE_ID, INT_TYPE, nullptr, &VtpStreamSocket::GetRemoteScopeId);
152     InsertElementToFuncMap(NON_BLOCK, BOOL_TYPE, &VtpStreamSocket::SetNonBlockMode, &VtpStreamSocket::GetNonBlockMode);
153     InsertElementToFuncMap(KEEP_ALIVE_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig, nullptr);
154     InsertElementToFuncMap(SEND_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
155         &VtpStreamSocket::GetVtpStackConfig);
156     InsertElementToFuncMap(RECV_CACHE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
157         &VtpStreamSocket::GetVtpStackConfig);
158     InsertElementToFuncMap(SEND_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
159         &VtpStreamSocket::GetVtpStackConfig);
160     InsertElementToFuncMap(RECV_BUF_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
161         &VtpStreamSocket::GetVtpStackConfig);
162     InsertElementToFuncMap(PACKET_SIZE, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
163         &VtpStreamSocket::GetVtpStackConfig);
164     InsertElementToFuncMap(MAX_VTP_SOCKET_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
165         &VtpStreamSocket::GetVtpStackConfig);
166     InsertElementToFuncMap(MAX_VTP_CONNECT_NUM, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
167         &VtpStreamSocket::GetVtpStackConfig);
168     InsertElementToFuncMap(REDUNANCY_SWITCH, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
169         &VtpStreamSocket::GetVtpStackConfig);
170     InsertElementToFuncMap(REDUNANCY_LEVEL, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
171         &VtpStreamSocket::GetVtpStackConfig);
172     InsertElementToFuncMap(NACK_DELAY, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
173         &VtpStreamSocket::GetVtpStackConfig);
174     InsertElementToFuncMap(NACK_DELAY_TIMEOUT, INT_TYPE, &VtpStreamSocket::SetVtpStackConfig,
175         &VtpStreamSocket::GetVtpStackConfig);
176     InsertElementToFuncMap(PACK_INTERVAL_ENLARGE, BOOL_TYPE, &VtpStreamSocket::SetVtpStackConfig,
177         &VtpStreamSocket::GetVtpStackConfig);
178     InsertElementToFuncMap(STREAM_TYPE_INT, INT_TYPE, &VtpStreamSocket::SetStreamType, &VtpStreamSocket::GetStreamType);
179     InsertElementToFuncMap(IS_SERVER, INT_TYPE, nullptr, &VtpStreamSocket::IsServer);
180     InsertElementToFuncMap(SCENE, INT_TYPE, &VtpStreamSocket::SetStreamScene, nullptr);
181     InsertElementToFuncMap(STREAM_HEADER_SIZE, INT_TYPE, &VtpStreamSocket::SetStreamHeaderSize, nullptr);
182 
183     scene_ = UNKNOWN_SCENE;
184 }
185 
~VtpStreamSocket()186 VtpStreamSocket::~VtpStreamSocket()
187 {
188     TRANS_LOGW(TRANS_STREAM, "~VtpStreamSocket");
189 }
190 
GetSelf()191 std::shared_ptr<VtpStreamSocket> VtpStreamSocket::GetSelf()
192 {
193     return shared_from_this();
194 }
195 
HandleFillpFrameStats(int32_t fd,const FtEventCbkInfo * info)196 int32_t VtpStreamSocket::HandleFillpFrameStats(int32_t fd, const FtEventCbkInfo *info)
197 {
198     if (info == nullptr) {
199         TRANS_LOGE(TRANS_STREAM, "stats info is nullptr");
200         return SOFTBUS_INVALID_PARAM;
201     }
202     StreamSendStats stats = {};
203     if (memcpy_s(&stats, sizeof(StreamSendStats), &info->info.frameSendStats,
204         sizeof(info->info.frameSendStats)) != EOK) {
205         TRANS_LOGE(TRANS_STREAM, "streamStats info memcpy fail");
206         return SOFTBUS_MEM_ERR;
207     }
208 
209     std::lock_guard<std::mutex> guard(streamSocketMapLock_);
210     auto itListener = g_streamSocketMap.find(fd);
211     if (itListener != g_streamSocketMap.end()) {
212         if (itListener->second->streamReceiver_ != nullptr) {
213             TRANS_LOGD(TRANS_STREAM, "OnFrameStats enter");
214             itListener->second->streamReceiver_->OnFrameStats(&stats);
215         } else {
216             TRANS_LOGE(TRANS_STREAM, "streamReceiver_ is nullptr");
217         }
218     } else {
219         TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the fd is empty in the map. fd=%{public}d", fd);
220     }
221     return SOFTBUS_OK;
222 }
223 
HandleRipplePolicy(int32_t fd,const FtEventCbkInfo * info)224 int32_t VtpStreamSocket::HandleRipplePolicy(int32_t fd, const FtEventCbkInfo *info)
225 {
226     if (info == nullptr) {
227         TRANS_LOGE(TRANS_STREAM, "policy info is nullptr");
228         return SOFTBUS_INVALID_PARAM;
229     }
230     TrafficStats stats;
231     (void)memset_s(&stats, sizeof(TrafficStats), 0, sizeof(TrafficStats));
232     if (memcpy_s(&stats.stats, sizeof(stats.stats), info->info.trafficData.stats,
233         sizeof(info->info.trafficData.stats)) != EOK) {
234         TRANS_LOGE(TRANS_STREAM, "RipplePolicy info memcpy fail");
235         return SOFTBUS_MEM_ERR;
236     }
237     std::lock_guard<std::mutex> guard(streamSocketMapLock_);
238     auto itListener = g_streamSocketMap.find(fd);
239     if (itListener != g_streamSocketMap.end()) {
240         if (itListener->second->streamReceiver_ != nullptr) {
241             TRANS_LOGI(TRANS_STREAM, "OnRippleStats enter");
242             itListener->second->streamReceiver_->OnRippleStats(&stats);
243         } else {
244             TRANS_LOGE(TRANS_STREAM, "OnRippleStats streamReceiver_ is nullptr");
245         }
246     } else {
247         TRANS_LOGE(TRANS_STREAM,
248             "OnRippleStats streamReceiver for the fd is empty in the map. fd=%{public}d", fd);
249     }
250     return SOFTBUS_OK;
251 }
252 
HandleFillpFrameEvt(int32_t fd,const FtEventCbkInfo * info)253 int32_t VtpStreamSocket::HandleFillpFrameEvt(int32_t fd, const FtEventCbkInfo *info)
254 {
255     if (info == nullptr) {
256         TRANS_LOGE(TRANS_STREAM, "fd is %{public}d, info is nullptr", fd);
257         return SOFTBUS_INVALID_PARAM;
258     }
259     std::lock_guard<std::mutex> guard(streamSocketMapLock_);
260     auto itListener = g_streamSocketMap.find(fd);
261     if (itListener != g_streamSocketMap.end()) {
262         return itListener->second->HandleFillpFrameEvtInner(fd, info);
263     } else {
264         TRANS_LOGE(TRANS_STREAM, "OnFillpFrameEvt for the fd is empty in the map. fd=%{public}d", fd);
265     }
266     return SOFTBUS_OK;
267 }
268 
HandleFillpFrameEvtInner(int32_t fd,const FtEventCbkInfo * info)269 int32_t VtpStreamSocket::HandleFillpFrameEvtInner(int32_t fd, const FtEventCbkInfo *info)
270 {
271     if (onStreamEvtCb_ != nullptr) {
272         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ enter");
273         return HandleVtpFrameEvt(fd, onStreamEvtCb_, info);
274     } else {
275         TRANS_LOGD(TRANS_STREAM, "onStreamEvtCb_ is nullptr");
276     }
277     return SOFTBUS_OK;
278 }
279 
280 #ifdef FILLP_SUPPORT_BW_DET
FillSupportDet(int32_t fd,const FtEventCbkInfo * info,QosTv * metricList)281 void VtpStreamSocket::FillSupportDet(int32_t fd, const FtEventCbkInfo *info, QosTv *metricList)
282 {
283     if (info == nullptr || metricList == nullptr) {
284         TRANS_LOGE(TRANS_STREAM, "info or metricList is nullptr");
285         return;
286     }
287     if (info->evt == FT_EVT_BW_DET) {
288         TRANS_LOGI(TRANS_STREAM,
289             "[Metric Return]: Fillp bandwidth information of socket fd=%{public}d is returned", fd);
290         TRANS_LOGI(TRANS_STREAM,
291             "[Metric Return]: Changed amount of current available bandwidth=%{public}d", info->info.bwInfo.bwStat);
292         TRANS_LOGI(TRANS_STREAM,
293             "[Metric Return]: Current bandwidth for receiving data rate=%{public}d kbps", info->info.bwInfo.rate);
294         metricList->type = BANDWIDTH_ESTIMATE_VALUE;
295         metricList->info.bandwidthInfo.trend = info->info.bwInfo.bwStat;
296         metricList->info.bandwidthInfo.rate = info->info.bwInfo.rate;
297     }
298     if (info->evt == FT_EVT_JITTER_DET) {
299         TRANS_LOGI(TRANS_STREAM,
300             "[Metric Return]: Fillp connection quality information of socket fd=%{public}d is returned", fd);
301         TRANS_LOGI(TRANS_STREAM,
302             "[Metric Return]: Predeicted network condition jitterLevel=%{public}d", info->info.jitterInfo.jitterLevel);
303         TRANS_LOGI(TRANS_STREAM,
304             "[Metric Return]: Current available receiving buffer time=%{public}d ms", info->info.jitterInfo.bufferTime);
305         metricList->type = JITTER_DETECTION_VALUE;
306         metricList->info.jitterInfo.jitterLevel = info->info.jitterInfo.jitterLevel;
307         metricList->info.jitterInfo.bufferTime = info->info.jitterInfo.bufferTime;
308     }
309 }
310 #endif
311 
312 /* This function is used to prompt the metrics returned by FtApiRegEventCallbackFunc() function */
FillpStatistics(int32_t fd,const FtEventCbkInfo * info)313 int32_t VtpStreamSocket::FillpStatistics(int32_t fd, const FtEventCbkInfo *info)
314 {
315     if (info == nullptr || fd < 0) {
316         TRANS_LOGE(TRANS_STREAM, "param invalid fd is %{public}d", fd);
317         return SOFTBUS_INVALID_PARAM;
318     }
319     if (info->evt == FT_EVT_FRAME_STATS) {
320         TRANS_LOGI(TRANS_STREAM, "recv fillp frame stats");
321         return HandleFillpFrameStats(fd, info);
322     } else if (info->evt == FT_EVT_TRAFFIC_DATA) {
323         TRANS_LOGI(TRANS_STREAM, "recv fillp traffic data");
324         return HandleRipplePolicy(fd, info);
325     } else if (IsVtpFrameSentEvt(info)) {
326         TRANS_LOGI(TRANS_STREAM, "fd %{public}d recv fillp frame send evt", fd);
327         return HandleFillpFrameEvt(fd, info);
328     }
329 #ifdef FILLP_SUPPORT_BW_DET
330     if (info->evt == FT_EVT_BW_DET || info->evt == FT_EVT_JITTER_DET) {
331         int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
332         int16_t tvCount = 1;
333         QosTv metricList = {};
334 
335         FillSupportDet(fd, info, &metricList);
336         metricList.info.wifiChannelInfo = {};
337         metricList.info.frameStatusInfo = {};
338 
339         std::lock_guard<std::mutex> guard(streamSocketLockMapLock_);
340         auto itLock = g_streamSocketLockMap.find(fd);
341         if (itLock != g_streamSocketLockMap.end()) {
342             std::lock_guard<std::mutex> guard(streamSocketMapLock_);
343             auto itListener = g_streamSocketMap.find(fd);
344             if (itListener != g_streamSocketMap.end()) {
345                 std::thread([itListener, eventId, tvCount, metricList, &itLock]() {
346                     const std::string threadName = "OS_qosEvent";
347                     pthread_setname_np(pthread_self(), threadName.c_str());
348                     std::lock_guard<std::mutex> guard(itLock->second);
349                     itListener->second->OnQosEvent(eventId, tvCount, &metricList);
350                 }).detach();
351             } else {
352                 TRANS_LOGE(TRANS_STREAM, "StreamReceiver for fd=%{public}d is empty in the map", fd);
353             }
354         } else {
355             TRANS_LOGE(TRANS_STREAM, "StreamSocketLock for fd=%{public}d is empty in the map", fd);
356         }
357     } else {
358         TRANS_LOGE(TRANS_STREAM,
359             "[Metric Return]: Fail to retrieve bandwidth and connection quality information");
360         return -1;
361     }
362 #endif
363     return SOFTBUS_OK;
364 }
365 
FillpAppStatistics()366 void VtpStreamSocket::FillpAppStatistics()
367 {
368     int32_t eventId = TRANS_STREAM_QUALITY_EVENT;
369     int16_t tvCount = 1;
370     QosTv metricList = {};
371     FillpStatisticsPcb fillpPcbStats = {};
372     SoftBusSysTime fillpStatsGetTime = {0};
373 
374     int32_t getStatisticsRet = FtFillpStatsGet(streamFd_, &fillpPcbStats);
375     SoftBusGetTime(&fillpStatsGetTime);
376     if (getStatisticsRet == 0) {
377         metricList.type = STREAM_TRAFFIC_STASTICS;
378         metricList.info.appStatistics.statisticsGotTime = static_cast<uint64_t>((fillpStatsGetTime.sec *
379             MS_PER_SECOND + fillpStatsGetTime.usec / US_PER_MS)); /* ms */
380         metricList.info.appStatistics.periodRecvBits =
381             static_cast<uint64_t>(fillpPcbStats.appFcStastics.periodRecvBits);
382         metricList.info.appStatistics.pktNum = fillpPcbStats.appFcStastics.pktNum;
383         metricList.info.appStatistics.periodRecvPkts = fillpPcbStats.appFcStastics.periodRecvPkts;
384         metricList.info.appStatistics.periodRecvPktLoss = fillpPcbStats.appFcStastics.periodRecvPktLoss;
385         metricList.info.appStatistics.periodRecvRate = fillpPcbStats.appFcStastics.periodRecvRate;
386         metricList.info.appStatistics.periodRecvRateBps = fillpPcbStats.appFcStastics.periodRecvRateBps;
387         metricList.info.appStatistics.periodRtt = fillpPcbStats.appFcStastics.periodRtt;
388         metricList.info.appStatistics.periodRecvPktLossHighPrecision =
389             fillpPcbStats.appFcStastics.periodRecvPktLossHighPrecision;
390         metricList.info.appStatistics.periodSendLostPkts = fillpPcbStats.appFcStastics.periodSendLostPkts;
391         metricList.info.appStatistics.periodSendPkts = fillpPcbStats.appFcStastics.periodSendPkts;
392         metricList.info.appStatistics.periodSendPktLossHighPrecision =
393             fillpPcbStats.appFcStastics.periodSendPktLossHighPrecision;
394         metricList.info.appStatistics.periodSendBits = fillpPcbStats.appFcStastics.periodSendBits;
395         metricList.info.appStatistics.periodSendRateBps = fillpPcbStats.appFcStastics.periodSendRateBps;
396 
397         TRANS_LOGD(TRANS_STREAM,
398             "Succeed to get fillp statistics information for streamfd=%{public}d", streamFd_);
399         TRANS_LOGD(TRANS_STREAM,
400             "[Metric Return]: periodRtt=%{public}d", fillpPcbStats.appFcStastics.periodRtt);
401 
402         std::lock_guard<std::mutex> guard(streamSocketLock_);
403 
404         if (streamReceiver_ != nullptr) {
405             TRANS_LOGD(TRANS_STREAM,
406                 "[Metric Notify]: Fillp traffic statistics information of socket is notified. streamfd=%{public}d",
407                 streamFd_);
408             streamReceiver_->OnQosEvent(eventId, tvCount, &metricList);
409         } else {
410             TRANS_LOGE(TRANS_STREAM, "StreamReceiver for the streamFd is empty. streamFd=%{public}d", streamFd_);
411         }
412     } else {
413         TRANS_LOGE(TRANS_STREAM,
414             "Fail to get fillp statistics information for the streamfd. streamfd=%{public}d, errno=%{public}d",
415             streamFd_, FtGetErrno());
416     }
417 }
418 
CreateClient(IpAndPort & local,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)419 bool VtpStreamSocket::CreateClient(IpAndPort &local, int32_t streamType, std::pair<uint8_t *, uint32_t> sessionKey)
420 {
421     int32_t fd = CreateAndBindSocket(local, false);
422     if (fd == -1) {
423         TRANS_LOGE(TRANS_STREAM, "CreateAndBindSocket failed, errno=%{public}d", FtGetErrno());
424         DestroyStreamSocket();
425         return false;
426     }
427 
428     sessionKey_.second = sessionKey.second;
429     if (sessionKey_.first == nullptr) {
430         sessionKey_.first = new uint8_t[sessionKey_.second];
431     }
432     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
433         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
434         return false;
435     }
436 
437     streamType_ = streamType;
438     std::lock_guard<std::mutex> guard(streamSocketLock_);
439     streamFd_ = fd;
440     configCv_.notify_all();
441 
442     TRANS_LOGI(TRANS_STREAM,
443         "Success to create a client socket. fd=%{public}d, streamType=%{public}d", fd, streamType);
444     return true;
445 }
446 
CreateClient(IpAndPort & local,const IpAndPort & remote,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)447 bool VtpStreamSocket::CreateClient(IpAndPort &local, const IpAndPort &remote, int32_t streamType,
448     std::pair<uint8_t *, uint32_t> sessionKey)
449 {
450     if (!CreateClient(local, streamType, sessionKey)) {
451         return false;
452     }
453     /* enable the bandwidth and CQE estimation algorithms by FtSetSockOpt() for current ftsocket */
454 #ifdef FILLP_SUPPORT_BW_DET
455     bool isServer = false;
456     EnableBwEstimationAlgo(streamFd_, isServer);
457 #endif
458 
459     bool connectRet = Connect(remote);
460     if (connectRet) {
461         bool isServer = false;
462         RegisterMetricCallback(isServer); /* register the callback function */
463     }
464     return connectRet;
465 }
466 
CreateServer(IpAndPort & local,int32_t streamType,std::pair<uint8_t *,uint32_t> sessionKey)467 bool VtpStreamSocket::CreateServer(IpAndPort &local, int32_t streamType, std::pair<uint8_t *, uint32_t> sessionKey)
468 {
469     TRANS_LOGD(TRANS_STREAM, "enter.");
470     listenFd_ = CreateAndBindSocket(local, true);
471     if (listenFd_ == -1) {
472         TRANS_LOGE(TRANS_STREAM, "create listenFd failed, errno=%{public}d", FtGetErrno());
473         DestroyStreamSocket();
474         return false;
475     }
476 
477     bool ret = FtListen(listenFd_, MAX_CONNECTION_VALUE);
478     if (ret) {
479         TRANS_LOGE(TRANS_STREAM, "FtListen failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
480         DestroyStreamSocket();
481         return false;
482     }
483 
484     epollFd_ = FtEpollCreate();
485     if (epollFd_ < 0) {
486         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
487         DestroyStreamSocket();
488         return false;
489     }
490     isStreamRecv_ = true;
491     streamType_ = streamType;
492     sessionKey_.second = sessionKey.second;
493     if (sessionKey_.first == nullptr) {
494         sessionKey_.first = new uint8_t[sessionKey_.second];
495     }
496     if (memcpy_s(sessionKey_.first, sessionKey_.second, sessionKey.first, sessionKey.second) != EOK) {
497         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
498         return false;
499     }
500 
501     CreateServerProcessThread();
502     TRANS_LOGI(TRANS_STREAM,
503         "CreateServer end, listenFd=%{public}d, epollFd=%{public}d, streamType=%{public}d", listenFd_, epollFd_,
504         streamType_);
505     return true;
506 }
507 
DestroyStreamSocket()508 void VtpStreamSocket::DestroyStreamSocket()
509 {
510     TRANS_LOGD(TRANS_STREAM, "enter.");
511     std::lock_guard<std::mutex> guard(streamSocketLock_);
512     if (isDestroyed_) {
513         TRANS_LOGI(TRANS_STREAM, "StreamSocket is already destroyed");
514         return;
515     }
516     if (listenFd_ != -1) {
517         TRANS_LOGI(TRANS_STREAM, "listenFd_ enter FtClose");
518         FtClose(listenFd_);
519         listenFd_ = -1;
520     }
521 
522     if (streamFd_ != -1) {
523         RemoveStreamSocketLock(streamFd_); /* remove the socket lock from the map */
524         RemoveStreamSocketListener(streamFd_); /* remove the socket listener from the map */
525         TRANS_LOGI(TRANS_STREAM, "streamFd_ enter FtClose");
526         FtClose(streamFd_);
527         streamFd_ = -1;
528     }
529 
530     if (epollFd_ != -1) {
531         TRANS_LOGI(TRANS_STREAM, "epollFd_ enter FtClose");
532         FtClose(epollFd_);
533         epollFd_ = -1;
534     }
535 
536     if (streamReceiver_ != nullptr) {
537         TRANS_LOGI(TRANS_STREAM, "DestroyStreamSocket receiver delete");
538         streamReceiver_->OnStreamStatus(STREAM_CLOSED);
539         streamReceiver_.reset();
540     }
541 
542     QuitStreamBuffer();
543     vtpInstance_->UpdateSocketStreamCount(false);
544     isDestroyed_ = true;
545     TRANS_LOGD(TRANS_STREAM, "ok");
546 }
547 
Connect(const IpAndPort & remote)548 bool VtpStreamSocket::Connect(const IpAndPort &remote)
549 {
550     if (remote.ip.empty()) {
551         TRANS_LOGE(TRANS_STREAM, "remote addr  error, ip is nullptr");
552         DestroyStreamSocket();
553         return false;
554     }
555 
556     TRANS_LOGD(TRANS_STREAM,
557         "Connect to server remotePort=%{public}d", remote.port);
558     remoteIpPort_ = remote;
559 
560     struct sockaddr_in remoteSockAddr;
561     remoteSockAddr.sin_family = AF_INET;
562     remoteSockAddr.sin_port = htons(static_cast<short>(remote.port));
563     remoteSockAddr.sin_addr.s_addr = inet_addr(remote.ip.c_str());
564 
565     int32_t ret = FtConnect(streamFd_, reinterpret_cast<struct sockaddr *>(&remoteSockAddr), sizeof(remoteSockAddr));
566     if (ret != SOFTBUS_OK) {
567         TRANS_LOGE(TRANS_STREAM, "FtConnect failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
568         DestroyStreamSocket();
569         return false;
570     }
571 
572     epollFd_ = FtEpollCreate();
573     if (epollFd_ < 0) {
574         TRANS_LOGE(TRANS_STREAM, "Failed to create epoll fd errno=%{public}d", FtGetErrno());
575         DestroyStreamSocket();
576         return false;
577     }
578 
579     if (SetSocketEpollMode(streamFd_) != ERR_OK) {
580         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, streamFd=%{public}d", streamFd_);
581         DestroyStreamSocket();
582         return false;
583     }
584     isStreamRecv_ = true;
585     TRANS_LOGI(TRANS_STREAM, "Success to connect remote, and create a thread to recv data.");
586 
587     CreateClientProcessThread();
588     return true;
589 }
590 
EncryptStreamPacket(std::unique_ptr<IStream> stream,std::unique_ptr<char[]> & data,ssize_t & len)591 bool VtpStreamSocket::EncryptStreamPacket(std::unique_ptr<IStream> stream, std::unique_ptr<char[]> &data, ssize_t &len)
592 {
593     StreamPacketizer packet(streamType_, std::move(stream));
594     auto plainData = packet.PacketizeStream();
595     if (plainData == nullptr) {
596         TRANS_LOGE(TRANS_STREAM, "PacketizeStream failed");
597         return false;
598     }
599     len = packet.GetPacketLen() + GetEncryptOverhead();
600     TRANS_LOGD(TRANS_STREAM, "packetLen=%{public}zd, encryptOverhead=%{public}zd",
601         packet.GetPacketLen(), GetEncryptOverhead());
602     data = std::make_unique<char[]>(len + FRAME_HEADER_LEN);
603     ssize_t encLen = Encrypt(plainData.get(), packet.GetPacketLen(), data.get() + FRAME_HEADER_LEN, len);
604     if (encLen != len) {
605         TRANS_LOGE(TRANS_STREAM, "encrypted failed, dataLen=%{public}zd, encLen=%{public}zd", len, encLen);
606         return false;
607     }
608     InsertBufferLength(len, FRAME_HEADER_LEN, reinterpret_cast<uint8_t *>(data.get()));
609     len += FRAME_HEADER_LEN;
610 
611     return true;
612 }
613 
Send(std::unique_ptr<IStream> stream)614 bool VtpStreamSocket::Send(std::unique_ptr<IStream> stream)
615 {
616     TRANS_LOGD(TRANS_STREAM, "send in... streamType=%{public}d, dataSize=%{public}zd, extSize=%{public}zd",
617         streamType_, stream->GetBufferLen(), stream->GetExtBufferLen());
618 
619     if (!isBlocked_) {
620         isBlocked_ = true;
621         if (!SetNonBlockMode(streamFd_, StreamAttr(false))) {
622             TRANS_LOGE(TRANS_STREAM, "set non block mode fail");
623             return false;
624         }
625     }
626 
627     int32_t ret = -1;
628     std::unique_ptr<char[]> data = nullptr;
629     ssize_t len = 0;
630 
631     if (streamType_ == RAW_STREAM) {
632         data = stream->GetBuffer();
633         len = stream->GetBufferLen();
634 
635         ret = FtSend(streamFd_, data.get(), len, 0);
636     } else if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
637         const Communication::SoftBus::StreamFrameInfo *streamFrameInfo = stream->GetStreamFrameInfo();
638         if (streamFrameInfo == nullptr) {
639             TRANS_LOGE(TRANS_STREAM, "streamFrameInfo is null");
640             return false;
641         }
642         if (!EncryptStreamPacket(std::move(stream), data, len)) {
643             return false;
644         }
645 
646         FrameInfo frameInfo;
647         ConvertStreamFrameInfo2FrameInfo(&frameInfo, streamFrameInfo);
648         ret = FtSendFrame(streamFd_, data.get(), len, 0, &frameInfo);
649     }
650 
651     if (ret == -1) {
652         TRANS_LOGE(TRANS_STREAM, "send failed, errno=%{public}d", FtGetErrno());
653         return false;
654     }
655 
656     TRANS_LOGD(TRANS_STREAM, "send out..., streamType=%{public}d, len=%{public}zd", streamType_, len);
657     return true;
658 }
659 
SetOption(int32_t type,const StreamAttr & value)660 bool VtpStreamSocket::SetOption(int32_t type, const StreamAttr &value)
661 {
662     PrintOptionInfo(type, value);
663     auto it = optFuncMap_.find(type);
664     if (it == optFuncMap_.end()) {
665         TRANS_LOGW(TRANS_STREAM, "not found type=%{public}d", type);
666         return false;
667     }
668 
669     if (value.GetType() != it->second.valueType) {
670         TRANS_LOGW(TRANS_STREAM,
671             "type=%{public}d, valueType=%{public}d", value.GetType(), it->second.valueType);
672         return false;
673     }
674 
675     MySetFunc set = it->second.set;
676     if (set == nullptr) {
677         TRANS_LOGW(TRANS_STREAM, "set is nullptr");
678         return false;
679     }
680 
681     if (type == NON_BLOCK || type == TOS) {
682         return (this->*set)(static_cast<int32_t>(streamFd_), value);
683     }
684 
685     auto outerIt = FILLP_TYPE_MAP.find(type);
686     if (outerIt != FILLP_TYPE_MAP.end()) {
687         return (this->*set)(outerIt->second, value);
688     }
689 
690     auto innerIt = INNER_FILLP_TYPE_MAP.find(type);
691     if (innerIt != INNER_FILLP_TYPE_MAP.end()) {
692         return (this->*set)(innerIt->second, value);
693     }
694 
695     return (this->*set)(static_cast<int32_t>(type), value);
696 }
697 
GetOption(int32_t type) const698 StreamAttr VtpStreamSocket::GetOption(int32_t type) const
699 {
700     StreamAttr attr {};
701     auto it = optFuncMap_.find(type);
702     if (it != optFuncMap_.end()) {
703         MyGetFunc get = it->second.get;
704         if (get == nullptr) {
705             TRANS_LOGE(TRANS_STREAM, "Can not get option type=%{public}d", type);
706             return std::move(StreamAttr());
707         }
708         if (type == NON_BLOCK) {
709             attr = (this->*get)(static_cast<int32_t>(streamFd_));
710         } else {
711             attr = (this->*get)(static_cast<int32_t>(type));
712         }
713     }
714 
715     PrintOptionInfo(type, attr);
716     return attr;
717 }
718 
SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)719 bool VtpStreamSocket::SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver)
720 {
721     if (receiver == nullptr) {
722         TRANS_LOGW(TRANS_STREAM, "receiver is nullptr");
723         return false;
724     }
725 
726     std::lock_guard<std::mutex> guard(streamSocketLock_);
727     streamReceiver_ = receiver;
728     TRANS_LOGI(TRANS_STREAM, "set receiver success");
729     return true;
730 }
731 
InitVtpInstance(const std::string & pkgName)732 bool VtpStreamSocket::InitVtpInstance(const std::string &pkgName)
733 {
734     return vtpInstance_->InitVtp(pkgName);
735 }
736 
DestroyVtpInstance(const std::string & pkgName)737 void VtpStreamSocket::DestroyVtpInstance(const std::string &pkgName)
738 {
739     TRANS_LOGD(TRANS_STREAM, "enter.");
740     vtpInstance_->DestroyVtp(pkgName);
741     TRANS_LOGD(TRANS_STREAM, "ok");
742 }
743 
CreateAndBindSocket(IpAndPort & local,bool isServer)744 int32_t VtpStreamSocket::CreateAndBindSocket(IpAndPort &local, bool isServer)
745 {
746     localIpPort_ = local;
747     vtpInstance_->UpdateSocketStreamCount(true);
748     if (local.ip.empty()) {
749         TRANS_LOGE(TRANS_STREAM, "ip is empty");
750         return -1;
751     }
752 
753     int32_t sockFd = FtSocket(AF_INET, SOCK_STREAM, IPPROTO_FILLP);
754     if (sockFd == -1) {
755         TRANS_LOGE(TRANS_STREAM, "FtSocket failed, errno=%{public}d", FtGetErrno());
756         return -1;
757     }
758     if (!isServer) {
759         TRANS_LOGI(TRANS_STREAM, "FtSocket set client, errno=%{public}d", FtGetErrno());
760         streamFd_ = sockFd;
761     }
762     SetDefaultConfig(sockFd);
763 
764     // bind
765     sockaddr_in localSockAddr = { 0 };
766     char host[ADDR_MAX_SIZE];
767     localSockAddr.sin_family = AF_INET;
768     localSockAddr.sin_port = htons((short)local.port);
769     localSockAddr.sin_addr.s_addr = inet_addr(local.ip.c_str());
770     localIpPort_.ip = SoftBusInetNtoP(AF_INET, &(localSockAddr.sin_addr), host, ADDR_MAX_SIZE);
771     if (!SetSocketBoundInner(sockFd, localIpPort_.ip)) {
772         TRANS_LOGE(TRANS_STREAM, "SetSocketBoundInner failed, errno=%{public}d", FtGetErrno());
773     }
774 
775     socklen_t localAddrLen = sizeof(localSockAddr);
776     int32_t ret = FtBind(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), localAddrLen);
777     if (ret == -1) {
778         FtClose(sockFd);
779         TRANS_LOGE(TRANS_STREAM, "FtBind failed, errno=%{public}d", FtGetErrno());
780         return -1;
781     }
782 
783     // 获取port
784     ret = FtGetSockName(sockFd, reinterpret_cast<sockaddr *>(&localSockAddr), &localAddrLen);
785     if (ret != ERR_OK) {
786         TRANS_LOGE(TRANS_STREAM, "getsockname error ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
787         FtClose(sockFd);
788         return -1;
789     }
790 
791     localIpPort_.port = static_cast<int32_t>(ntohs(localSockAddr.sin_port));
792     local.port = localIpPort_.port;
793 
794     return sockFd;
795 }
796 
797 
EnableBwEstimationAlgo(int32_t streamFd,bool isServer) const798 bool VtpStreamSocket::EnableBwEstimationAlgo(int32_t streamFd, bool isServer) const
799 {
800 #ifdef FILLP_SUPPORT_BW_DET
801     int32_t errBwDet;
802     if (isServer) {
803         int32_t enableBwDet = FILLP_BW_DET_RX_ENABLE;
804         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
805             &enableBwDet, sizeof(enableBwDet));
806     } else {
807         int32_t enableBwDet = FILLP_BW_DET_TX_ENABLE;
808         errBwDet = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_BW_DET_ALGO,
809             &enableBwDet, sizeof(enableBwDet));
810     }
811     if (errBwDet < 0) {
812         TRANS_LOGE(TRANS_STREAM,
813             "Fail to enable bandwidth estimation algorithm for streamFd=%{public}d, errno%{public}d",
814             streamFd, FtGetErrno());
815         return true;
816     } else {
817         TRANS_LOGE(TRANS_STREAM,
818             "Success to enable bandwidth estimation algorithm for stream=Fd%{public}d", streamFd);
819         return false;
820     }
821 #else
822     return true;
823 #endif
824 }
825 
EnableJitterDetectionAlgo(int32_t streamFd) const826 bool VtpStreamSocket::EnableJitterDetectionAlgo(int32_t streamFd) const
827 {
828 #ifdef FILLP_SUPPORT_CQE
829     int32_t  enableCQE = FILLP_CQE_ENABLE;
830     auto errCQE = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_CQE_ALGO, &enableCQE, sizeof(enableCQE));
831     if (errCQE < 0) {
832         TRANS_LOGE(TRANS_STREAM,
833             "Fail to enable CQE algorithm for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
834         return true;
835     } else {
836         TRANS_LOGE(TRANS_STREAM,
837             "Success to enable CQE algorithm for streamFd=%{public}d", streamFd);
838         return false;
839     }
840 #else
841     return true;
842 #endif
843 }
844 
EnableDirectlySend(int32_t streamFd) const845 bool VtpStreamSocket::EnableDirectlySend(int32_t streamFd) const
846 {
847     int32_t enable = 1;
848     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SOCK_DIRECTLY_SEND, &enable, sizeof(enable));
849     if (ret < 0) {
850         TRANS_LOGE(TRANS_STREAM,
851             "Fail to enable direct send for streamFd=%{public}d, rrno=%{public}d", streamFd, FtGetErrno());
852         return false;
853     }
854     TRANS_LOGI(TRANS_STREAM, "Success to enable direct send for streamFd=%{public}d", streamFd);
855     return true;
856 }
857 
EnableSemiReliable(int32_t streamFd) const858 bool VtpStreamSocket::EnableSemiReliable(int32_t streamFd) const
859 {
860     int32_t enable = 1;
861     FILLP_INT ret = FtSetSockOpt(streamFd, IPPROTO_FILLP, FILLP_SEMI_RELIABLE, &enable, sizeof(enable));
862     if (ret < 0) {
863         TRANS_LOGE(TRANS_STREAM,
864             "Fail to enable direct send for streamFd=%{public}d, errno=%{public}d", streamFd, FtGetErrno());
865         return false;
866     }
867     TRANS_LOGI(TRANS_STREAM, "Success to enable semi reliable for streamFd=%{public}d", streamFd);
868     return true;
869 }
870 
RegisterMetricCallback(bool isServer)871 void VtpStreamSocket::RegisterMetricCallback(bool isServer)
872 {
873     VtpStreamSocket::AddStreamSocketLock(streamFd_, streamSocketLock_);
874     auto self = this->GetSelf();
875     VtpStreamSocket::AddStreamSocketListener(streamFd_, self);
876     int32_t regStatisticsRet = FtApiRegEventCallbackFunc(FILLP_CONFIG_ALL_SOCKET, FillpStatistics);
877     int32_t value = 1;
878     auto err = FtSetSockOpt(streamFd_, IPPROTO_FILLP, FILLP_SOCK_TRAFFIC, &value, sizeof(value));
879     if (err < 0) {
880         TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
881         return;
882     }
883     TRANS_LOGD(TRANS_STREAM, "FtSetSockOpt start success");
884     if (isServer) {
885         if (regStatisticsRet == 0) {
886             TRANS_LOGI(TRANS_STREAM,
887                 "Success to register the stream callback function at server side. streamFd=%{public}d", streamFd_);
888         } else {
889             TRANS_LOGE(TRANS_STREAM,
890                 "Fail to register the stream callback function at server side. streamFd=%{public}d, errno=%{public}d",
891                 streamFd_, regStatisticsRet);
892         }
893     } else {
894         if (regStatisticsRet == 0) {
895             TRANS_LOGI(TRANS_STREAM,
896                 "Success to register the stream callback function at client side. streamFd=%{public}d", streamFd_);
897         } else {
898             TRANS_LOGE(TRANS_STREAM,
899                 "Fail to register the stream callback function at client side. streamFd=%{public}d, errno=%{public}d",
900                 streamFd_, regStatisticsRet);
901         }
902     }
903 }
904 
Accept()905 bool VtpStreamSocket::Accept()
906 {
907     TRANS_LOGD(TRANS_STREAM, "enter.");
908     auto fd = FtAccept(listenFd_, nullptr, nullptr);
909     TRANS_LOGI(TRANS_STREAM, "accept streamFd=%{public}d", fd);
910     if (fd == -1) {
911         TRANS_LOGE(TRANS_STREAM, "errno=%{public}d", FtGetErrno());
912         return false;
913     }
914 
915     sockaddr remoteAddr {};
916     socklen_t remoteAddrLen = sizeof(remoteAddr);
917     auto ret = FtGetPeerName(fd, &remoteAddr, &remoteAddrLen);
918     if (ret != ERR_OK) {
919         TRANS_LOGE(TRANS_STREAM, "get name failed, fd=%{public}d", fd);
920         FtClose(fd);
921         return false;
922     }
923 
924     char host[ADDR_MAX_SIZE];
925     if (remoteAddr.sa_family == AF_INET) {
926         auto v4Addr = reinterpret_cast<const sockaddr_in *>(&remoteAddr);
927         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET, &(v4Addr->sin_addr), host, ADDR_MAX_SIZE);
928         remoteIpPort_.port = v4Addr->sin_port;
929     } else {
930         auto v6Addr = reinterpret_cast<const sockaddr_in6 *>(&remoteAddr);
931         remoteIpPort_.ip = SoftBusInetNtoP(AF_INET6, &(v6Addr->sin6_addr), host, ADDR_MAX_SIZE);
932         remoteIpPort_.port = v6Addr->sin6_port;
933     }
934 
935     TRANS_LOGD(TRANS_STREAM, "Accept a client remotePort=%{public}d", remoteIpPort_.port);
936 
937     if (SetSocketEpollMode(fd) != ERR_OK) {
938         TRANS_LOGE(TRANS_STREAM, "SetSocketEpollMode failed, fd=%{public}d", fd);
939         FtClose(fd);
940         return false;
941     }
942 
943     std::lock_guard<std::mutex> guard(streamSocketLock_);
944     streamFd_ = fd;
945     configCv_.notify_all();
946 
947     if (streamReceiver_ != nullptr) {
948         TRANS_LOGI(TRANS_STREAM, "notify stream connected!");
949         streamReceiver_->OnStreamStatus(STREAM_CONNECTED);
950     }
951 
952     bool isServer = true;
953     RegisterMetricCallback(isServer); /* register the callback function */
954     /* enable the bandwidth and CQE estimation algorithms for current ftsocket */
955 #ifdef FILLP_SUPPORT_BW_DET
956     EnableBwEstimationAlgo(streamFd_, isServer);
957 #endif
958 #ifdef FILLP_SUPPORT_CQE
959     EnableJitterDetectionAlgo(streamFd_);
960 #endif
961 
962     TRANS_LOGI(TRANS_STREAM, "accept success!");
963     return true;
964 }
965 
EpollTimeout(int32_t fd,int32_t timeout)966 int32_t VtpStreamSocket::EpollTimeout(int32_t fd, int32_t timeout)
967 {
968     struct SpungeEpollEvent events[MAX_EPOLL_NUM];
969     (void)memset_s(events, sizeof(events), 0, sizeof(events));
970     while (true) {
971         FILLP_INT fdNum = FtEpollWait(epollFd_, events, MAX_EPOLL_NUM, timeout);
972         if (fdNum <= 0) {
973             TRANS_LOGE(TRANS_STREAM,
974                 "FtEpollWait failed, ret=%{public}d, errno=%{public}d", fdNum, FtGetErrno());
975             return -FtGetErrno();
976         }
977 
978         for (FILLP_INT i = 0; i < fdNum; i++) {
979             if (events[i].data.fd != fd) {
980                 continue;
981             }
982 
983             if (events[i].events & (SPUNGE_EPOLLHUP | SPUNGE_EPOLLERR)) {
984                 TRANS_LOGE(TRANS_STREAM,
985                     "EpollTimeout, something may be wrong in this socket, fd=%{public}d, events=%{public}u", fd,
986                     (unsigned int)events[i].events);
987                 return -1;
988             }
989 
990             if (events[i].events & SPUNGE_EPOLLIN) {
991                 return SOFTBUS_OK;
992             }
993         }
994     }
995 }
996 
SetSocketEpollMode(int32_t fd)997 int32_t VtpStreamSocket::SetSocketEpollMode(int32_t fd)
998 {
999     if (!SetNonBlockMode(fd, StreamAttr(true))) {
1000         TRANS_LOGE(TRANS_STREAM, "SetNonBlockMode failed, errno=%{public}d", FtGetErrno());
1001         return -1;
1002     }
1003 
1004     struct SpungeEpollEvent event = {0};
1005     event.events = SPUNGE_EPOLLIN;
1006     event.data.fd = fd;
1007 
1008     auto ret = FtEpollCtl(epollFd_, SPUNGE_EPOLL_CTL_ADD, fd, &event);
1009     if (ret != ERR_OK) {
1010         TRANS_LOGE(TRANS_STREAM, "FtEpollCtl failed, ret=%{public}d, errno=%{public}d", ret, FtGetErrno());
1011         return ret;
1012     }
1013 
1014     TRANS_LOGD(TRANS_STREAM, "SetNonBlockMode success");
1015     return SOFTBUS_OK;
1016 }
1017 
InsertBufferLength(int32_t num,int32_t length,uint8_t * output) const1018 void VtpStreamSocket::InsertBufferLength(int32_t num, int32_t length, uint8_t *output) const
1019 {
1020     for (int32_t i = 0; i < length; i++) {
1021         output[length - 1 - i] = static_cast<unsigned int>(
1022             ((static_cast<unsigned int>(num) >> static_cast<unsigned int>(BYTE_TO_BIT * i))) & INT_TO_BYTE);
1023     }
1024 }
1025 
MakeStreamData(StreamData & data,const StreamFrameInfo & info) const1026 std::unique_ptr<IStream> VtpStreamSocket::MakeStreamData(StreamData &data, const StreamFrameInfo &info) const
1027 {
1028     std::unique_ptr<IStream> stream = nullptr;
1029     switch (streamType_) {
1030         case VIDEO_SLICE_STREAM:
1031             TRANS_LOGD(TRANS_STREAM, "do not support VIDEO_SLICE_STREAM streamType=%{public}d", streamType_);
1032             break;
1033         case COMMON_VIDEO_STREAM:
1034         case COMMON_AUDIO_STREAM:
1035             TRANS_LOGD(TRANS_STREAM,
1036                 "streamType=%{public}d, seqNum=%{public}d, streamId=%{public}d",
1037                 streamType_, info.seqNum, info.streamId);
1038             stream = IStream::MakeCommonStream(data, info);
1039             break;
1040         case RAW_STREAM:
1041             TRANS_LOGD(TRANS_STREAM, "streamType=%{public}d", streamType_);
1042             stream = IStream::MakeRawStream(data, info);
1043             break;
1044         default:
1045             TRANS_LOGE(TRANS_STREAM, "do not support streamType=%{public}d", streamType_);
1046             break;
1047     }
1048     if (stream == nullptr) {
1049         TRANS_LOGE(TRANS_STREAM, "IStream construct error");
1050         return nullptr;
1051     }
1052 
1053     return stream;
1054 }
1055 
RecvStreamLen()1056 int32_t VtpStreamSocket::RecvStreamLen()
1057 {
1058     int32_t hdrSize = FRAME_HEADER_LEN;
1059     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1060         hdrSize = streamHdrSize_;
1061     }
1062 
1063     int32_t len = -1;
1064     int32_t timeout = -1;
1065     auto buffer = std::make_unique<char[]>(hdrSize);
1066     if (EpollTimeout(streamFd_, timeout) == 0) {
1067         do {
1068             len = FtRecv(streamFd_, buffer.get(), hdrSize, 0);
1069         } while (len <= 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1070     }
1071     TRANS_LOGD(TRANS_STREAM, "recv frame header, len=%{public}d, scene=%{public}d", len, scene_);
1072 
1073     if (len <= 0) {
1074         TRANS_LOGE(TRANS_STREAM, "len invalid, len=%{public}d", len);
1075         return -1;
1076     }
1077 
1078     if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1079         std::lock_guard<std::mutex> guard(streamSocketLock_);
1080         if (streamReceiver_ != nullptr) {
1081             return streamReceiver_->OnStreamHdrReceived(std::move(buffer), hdrSize);
1082         }
1083     }
1084 
1085     return ntohl(*reinterpret_cast<int32_t *>(buffer.get()));
1086 }
1087 
ProcessCommonDataStream(std::unique_ptr<char[]> & dataBuffer,int32_t & dataLength,std::unique_ptr<char[]> & extBuffer,int32_t & extLen,StreamFrameInfo & info)1088 bool VtpStreamSocket::ProcessCommonDataStream(std::unique_ptr<char[]> &dataBuffer,
1089     int32_t &dataLength, std::unique_ptr<char[]> &extBuffer, int32_t &extLen, StreamFrameInfo &info)
1090 {
1091     TRANS_LOGD(TRANS_STREAM, "recv common stream");
1092     int32_t decryptedLength = dataLength;
1093     auto decryptedBuffer = std::move(dataBuffer);
1094 
1095     int32_t plainDataLength = decryptedLength - GetEncryptOverhead();
1096     if (plainDataLength <= 0) {
1097         TRANS_LOGE(TRANS_STREAM, "Decrypt failed, invalid decryptedLen=%{public}d", decryptedLength);
1098         return false;
1099     }
1100     std::unique_ptr<char[]> plainData = std::make_unique<char[]>(plainDataLength);
1101     ssize_t decLen = Decrypt(decryptedBuffer.get(), decryptedLength, plainData.get(), plainDataLength);
1102     if (decLen != plainDataLength) {
1103         TRANS_LOGE(TRANS_STREAM,
1104             "Decrypt failed, dataLen=%{public}d, decryptedLen=%{public}zd", plainDataLength, decLen);
1105         return false;
1106     }
1107     auto header = plainData.get();
1108     StreamDepacketizer decode(streamType_);
1109     if (plainDataLength < static_cast<int32_t>(sizeof(CommonHeader))) {
1110         TRANS_LOGE(TRANS_STREAM,
1111             "failed, plainDataLen=%{public}d, CommonHeader=%{public}zu", plainDataLength, sizeof(CommonHeader));
1112         return false;
1113     }
1114     decode.DepacketizeHeader(header);
1115 
1116     auto buffer = plainData.get() + sizeof(CommonHeader);
1117     decode.DepacketizeBuffer(buffer, plainDataLength - sizeof(CommonHeader));
1118 
1119     extBuffer = decode.GetUserExt();
1120     extLen = decode.GetUserExtSize();
1121     info = decode.GetFrameInfo();
1122     dataBuffer = decode.GetData();
1123     dataLength = decode.GetDataLength();
1124     if (dataLength <= 0) {
1125         TRANS_LOGE(TRANS_STREAM, "common depacketize error, dataLen=%{public}d", dataLength);
1126         return false;
1127     }
1128     return true;
1129 }
1130 
DoStreamRecv()1131 void VtpStreamSocket::DoStreamRecv()
1132 {
1133     while (isStreamRecv_) {
1134         std::unique_ptr<char[]> dataBuffer = nullptr;
1135         std::unique_ptr<char[]> extBuffer = nullptr;
1136         int32_t extLen = 0;
1137         StreamFrameInfo info = {};
1138         TRANS_LOGD(TRANS_STREAM, "recv stream");
1139         int32_t dataLength = VtpStreamSocket::RecvStreamLen();
1140         if (dataLength <= 0 || dataLength > MAX_STREAM_LEN) {
1141             TRANS_LOGE(TRANS_STREAM, "read frame length error, dataLength=%{public}d", dataLength);
1142             break;
1143         }
1144         TRANS_LOGD(TRANS_STREAM,
1145             "recv a new frame, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1146         dataBuffer = VtpStreamSocket::RecvStream(dataLength);
1147 
1148         if (streamType_ == COMMON_VIDEO_STREAM || streamType_ == COMMON_AUDIO_STREAM) {
1149             if (!ProcessCommonDataStream(dataBuffer, dataLength, extBuffer, extLen, info)) {
1150                 break;
1151             }
1152         }
1153 
1154         StreamData data = { std::move(dataBuffer), dataLength, std::move(extBuffer), extLen };
1155         std::unique_ptr<IStream> stream = MakeStreamData(data, info);
1156         if (stream == nullptr) {
1157             TRANS_LOGE(TRANS_STREAM, "MakeStreamData failed, stream is null");
1158             break;
1159         }
1160 
1161         TRANS_LOGD(TRANS_STREAM,
1162             "recv frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1163 
1164         if (streamType_ == RAW_STREAM && scene_ == COMPATIBLE_SCENE) {
1165             std::lock_guard<std::mutex> guard(streamSocketLock_);
1166             if (streamReceiver_ != nullptr) {
1167                 streamReceiver_->OnStreamReceived(std::move(stream));
1168                 continue;
1169             }
1170         }
1171 
1172         PutStream(std::move(stream));
1173         TRANS_LOGD(TRANS_STREAM, "put frame done, dataLen=%{public}d, streamType=%{public}d", dataLength, streamType_);
1174     }
1175     TRANS_LOGI(TRANS_STREAM, "recv thread exit");
1176 }
1177 
RecvStream(int32_t dataLength)1178 std::unique_ptr<char[]> VtpStreamSocket::RecvStream(int32_t dataLength)
1179 {
1180     auto buffer = std::make_unique<char[]>(dataLength);
1181     int32_t recvLen = 0;
1182     while (recvLen < dataLength) {
1183         int32_t ret = -1;
1184         int32_t timeout = -1;
1185 
1186         if (EpollTimeout(streamFd_, timeout) == 0) {
1187             do {
1188                 ret = FtRecv(streamFd_, (buffer.get() + recvLen), dataLength - recvLen, 0);
1189             } while (ret < 0 && (FtGetErrno() == EINTR || FtGetErrno() == FILLP_EAGAIN));
1190         }
1191 
1192         if (ret == -1) {
1193             TRANS_LOGE(TRANS_STREAM, "read frame failed, errno=%{public}d", FtGetErrno());
1194             return nullptr;
1195         }
1196 
1197         recvLen += ret;
1198     }
1199     return std::unique_ptr<char[]>(buffer.release());
1200 }
1201 
SetDefaultConfig(int32_t fd)1202 void VtpStreamSocket::SetDefaultConfig(int32_t fd)
1203 {
1204     if (!SetIpTos(fd, StreamAttr(static_cast<int32_t>(IPTOS_LOWDELAY)))) {
1205         TRANS_LOGW(TRANS_STREAM, "SetIpTos failed");
1206     }
1207     // Set Fillp direct sending
1208     if (!EnableDirectlySend(fd)) {
1209         TRANS_LOGW(TRANS_STREAM, "EnableDirectlySend failed");
1210     }
1211 
1212     if (!EnableSemiReliable(fd)) {
1213         TRANS_LOGW(TRANS_STREAM, "EnableSemiReliable failed");
1214     }
1215     // Set Fillp Differentiated Transmission
1216     FILLP_BOOL enable = 1;
1217     if (!FtConfigSet(FT_CONF_APP_DIFFER_TRANSMIT, &enable, &fd)) {
1218         TRANS_LOGW(TRANS_STREAM, "Set differ transmit failed");
1219     }
1220 
1221     if (!SetOption(RECV_BUF_SIZE, StreamAttr(static_cast<int32_t>(DEFAULT_UDP_BUFFER_RCV_SIZE)))) {
1222         TRANS_LOGW(TRANS_STREAM, "Set recv buff failed");
1223     }
1224 
1225     if (!SetOption(SEND_BUF_SIZE, StreamAttr(static_cast<int32_t>(DEFAULT_UDP_BUFFER_SIZE)))) {
1226         TRANS_LOGW(TRANS_STREAM, "Set send buff failed");
1227     }
1228 
1229     if (!SetOption(InnerStreamOptionType::RECV_CACHE, StreamAttr(static_cast<int32_t>(FILLP_VTP_RECV_CACHE_SIZE)))) {
1230         TRANS_LOGW(TRANS_STREAM, "Set recv cache failed");
1231     }
1232 
1233     if (!SetOption(InnerStreamOptionType::SEND_CACHE, StreamAttr(static_cast<int32_t>(FILLP_VTP_SEND_CACHE_SIZE)))) {
1234         TRANS_LOGW(TRANS_STREAM, "Set send cache failed");
1235     }
1236 }
1237 
SetIpTos(int32_t fd,const StreamAttr & tos)1238 bool VtpStreamSocket::SetIpTos(int32_t fd, const StreamAttr &tos)
1239 {
1240     auto tmp = tos.GetIntValue();
1241     if (FtSetSockOpt(fd, IPPROTO_IP, IP_TOS, &tmp, sizeof(tmp)) != ERR_OK) {
1242         TRANS_LOGE(TRANS_STREAM, "SetIpTos wrong! fd=%{public}d, errno=%{public}d", fd, FtGetErrno());
1243         return false;
1244     }
1245 
1246     TRANS_LOGD(TRANS_STREAM, "Success to set ip tos: fd=%{public}d, tos=%{public}d", fd, tmp);
1247     return true;
1248 }
1249 
GetIpTos(int32_t type) const1250 StreamAttr VtpStreamSocket::GetIpTos(int32_t type) const
1251 {
1252     static_cast<void>(type);
1253     int32_t tos;
1254     int32_t size = sizeof(tos);
1255 
1256     if (FtGetSockOpt(streamFd_, IPPROTO_IP, IP_TOS, &tos, &size) != ERR_OK) {
1257         TRANS_LOGE(TRANS_STREAM, "FtGetSockOpt errno=%{public}d", FtGetErrno());
1258         return std::move(StreamAttr());
1259     }
1260 
1261     return std::move(StreamAttr(tos));
1262 }
1263 
GetStreamSocketFd(int32_t type) const1264 StreamAttr VtpStreamSocket::GetStreamSocketFd(int32_t type) const
1265 {
1266     static_cast<void>(type);
1267     return std::move(StreamAttr(streamFd_));
1268 }
1269 
GetListenSocketFd(int32_t type) const1270 StreamAttr VtpStreamSocket::GetListenSocketFd(int32_t type) const
1271 {
1272     static_cast<void>(type);
1273     return std::move(StreamAttr(listenFd_));
1274 }
1275 
SetSocketBoundInner(int32_t fd,std::string ip) const1276 bool VtpStreamSocket::SetSocketBoundInner(int32_t fd, std::string ip) const
1277 {
1278     auto boundIp = (ip.empty()) ? localIpPort_.ip : ip;
1279     struct ifaddrs *ifList = nullptr;
1280     if (getifaddrs(&ifList) < 0) {
1281         TRANS_LOGE(TRANS_STREAM,
1282             "get interface address return errno=%{public}d, strerror=%{public}s", errno, strerror(errno));
1283         return false;
1284     }
1285 
1286     struct ifaddrs *ifa = nullptr;
1287     for (ifa = ifList; ifa != nullptr; ifa = ifa->ifa_next) {
1288         if (ifa->ifa_addr == nullptr) {
1289             continue;
1290         }
1291         if (ifa->ifa_addr->sa_family != AF_INET) {
1292             continue;
1293         }
1294 
1295         char host[ADDR_MAX_SIZE];
1296         std::string devName(ifa->ifa_name);
1297         if (strcmp(boundIp.c_str(), SoftBusInetNtoP(AF_INET, &(((struct sockaddr_in *)ifa->ifa_addr)->sin_addr),
1298             host, ADDR_MAX_SIZE)) == 0) {
1299             TRANS_LOGI(TRANS_STREAM, "current use interface to bind to socket. ifName=%{public}s", ifa->ifa_name);
1300             auto err = FtSetSockOpt(fd, SOL_SOCKET, SO_BINDTODEVICE, devName.c_str(), devName.size());
1301             if (err < 0) {
1302                 TRANS_LOGE(TRANS_STREAM, "fail to set socket binding to device");
1303                 freeifaddrs(ifList);
1304                 return false;
1305             }
1306             break;
1307         }
1308     }
1309     freeifaddrs(ifList);
1310 
1311     return true;
1312 }
1313 
SetSocketBindToDevices(int32_t type,const StreamAttr & ip)1314 bool VtpStreamSocket::SetSocketBindToDevices(int32_t type, const StreamAttr &ip)
1315 {
1316     static_cast<void>(type);
1317     auto tmp = ip.GetStrValue();
1318     auto boundIp = (tmp.empty()) ? localIpPort_.ip : tmp;
1319     return SetSocketBoundInner(streamFd_, boundIp);
1320 }
1321 
SetVtpStackConfigDelayed(int32_t type,const StreamAttr & value)1322 bool VtpStreamSocket::SetVtpStackConfigDelayed(int32_t type, const StreamAttr &value)
1323 {
1324     std::unique_lock<std::mutex> lock(streamSocketLock_);
1325     if (streamFd_ == -1) {
1326         configCv_.wait(lock, [this] { return streamFd_ != -1; });
1327     }
1328     TRANS_LOGD(TRANS_STREAM, "set vtp stack config, streamFd=%{public}d", streamFd_);
1329     return SetVtpStackConfig(type, value);
1330 }
1331 
SetVtpStackConfig(int32_t type,const StreamAttr & value)1332 bool VtpStreamSocket::SetVtpStackConfig(int32_t type, const StreamAttr &value)
1333 {
1334     if (streamFd_ == -1) {
1335         TRANS_LOGI(TRANS_STREAM, "set vtp stack config when streamFd is legal, type=%{public}d", type);
1336         auto self = GetSelf();
1337         std::thread([self, type, value]() {
1338             const std::string threadName = "OS_setVtpCfg";
1339             pthread_setname_np(pthread_self(), threadName.c_str());
1340             self->SetVtpStackConfigDelayed(type, value);
1341             }).detach();
1342         return true;
1343     }
1344 
1345     if (value.GetType() == INT_TYPE) {
1346         int32_t intVal = value.GetIntValue();
1347         int32_t ret = FtConfigSet(type, &intVal, &streamFd_);
1348         if (ret != SOFTBUS_OK) {
1349             TRANS_LOGE(TRANS_STREAM,
1350                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1351             return false;
1352         }
1353 
1354         TRANS_LOGI(TRANS_STREAM,
1355             "setVtpConfig success, type=%{public}d, streamFd=%{public}d, value=%{public}d", type, streamFd_, intVal);
1356         return true;
1357     }
1358 
1359     if (value.GetType() == BOOL_TYPE) {
1360         bool flag = value.GetBoolValue();
1361         int32_t ret = FtConfigSet(type, &flag, &streamFd_);
1362         if (ret != SOFTBUS_OK) {
1363             TRANS_LOGE(TRANS_STREAM,
1364                 "FtConfigSet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1365             return false;
1366         }
1367 
1368         TRANS_LOGI(TRANS_STREAM,
1369             "setVtpConfig success, streamFd=%{public}d, flag=%{public}d, type=%{public}d", type, streamFd_, flag);
1370         return true;
1371     }
1372 
1373     TRANS_LOGE(TRANS_STREAM, "UNKNOWN TYPE!");
1374     return false;
1375 }
1376 
GetVtpStackConfig(int32_t type) const1377 StreamAttr VtpStreamSocket::GetVtpStackConfig(int32_t type) const
1378 {
1379     int32_t intVal = -1;
1380     int32_t configFd = (streamFd_ == -1) ? FILLP_CONFIG_ALL_SOCKET : streamFd_;
1381     int32_t ret = FtConfigGet(type, &intVal, &configFd);
1382     if (ret != SOFTBUS_OK) {
1383         TRANS_LOGE(TRANS_STREAM,
1384             "FtConfigGet failed, type=%{public}d, errno=%{public}d", type, FtGetErrno());
1385         return std::move(StreamAttr());
1386     }
1387 
1388     int32_t valType = ValueType::UNKNOWN;
1389     for (auto it = FILLP_TYPE_MAP.begin(); it != FILLP_TYPE_MAP.end(); it++) {
1390         if (it->second != type) {
1391             continue;
1392         }
1393 
1394         valType = optFuncMap_.at(it->first).valueType;
1395         break;
1396     }
1397 
1398     if (valType != ValueType::UNKNOWN) {
1399         for (auto it = INNER_FILLP_TYPE_MAP.begin(); it != INNER_FILLP_TYPE_MAP.end(); it++) {
1400             if (it->second != type) {
1401                 continue;
1402             }
1403 
1404             valType = optFuncMap_.at(it->first).valueType;
1405             break;
1406         }
1407     }
1408 
1409     if (valType == BOOL_TYPE) {
1410         return std::move(StreamAttr(!!intVal));
1411     }
1412 
1413     return std::move(StreamAttr(intVal));
1414 }
1415 
SetNonBlockMode(int32_t fd,const StreamAttr & value)1416 bool VtpStreamSocket::SetNonBlockMode(int32_t fd, const StreamAttr &value)
1417 {
1418     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1419     if (flags < 0) {
1420         TRANS_LOGE(TRANS_STREAM, "failed to get FtFcntl, flags=%{public}d", flags);
1421         flags = 0;
1422     }
1423     bool nonBlock = value.GetBoolValue();
1424 
1425     flags = nonBlock ? static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) | O_NONBLOCK)) :
1426         static_cast<FILLP_INT>((static_cast<FILLP_UINT>(flags) & ~O_NONBLOCK));
1427 
1428     FILLP_INT res = FtFcntl(fd, F_SETFL, flags);
1429     if (res < 0) {
1430         TRANS_LOGE(TRANS_STREAM, "failed to set FtFcntl, res=%{public}d", res);
1431         return false;
1432     }
1433 
1434     TRANS_LOGI(TRANS_STREAM, "Successfully to set fd=%{public}d, nonBlock=%{public}d", fd, nonBlock);
1435     return true;
1436 }
1437 
GetNonBlockMode(int32_t fd) const1438 StreamAttr VtpStreamSocket::GetNonBlockMode(int32_t fd) const
1439 {
1440     FILLP_INT flags = FtFcntl(fd, F_GETFL, 0);
1441     if (static_cast<unsigned int>(flags) & O_NONBLOCK) {
1442         return std::move(StreamAttr(true));
1443     }
1444 
1445     return std::move(StreamAttr(false));
1446 }
1447 
GetIp(int32_t type) const1448 StreamAttr VtpStreamSocket::GetIp(int32_t type) const
1449 {
1450     if (type == LOCAL_IP) {
1451         return std::move(StreamAttr(localIpPort_.ip));
1452     }
1453 
1454     return std::move(StreamAttr(remoteIpPort_.ip));
1455 }
1456 
GetPort(int32_t type) const1457 StreamAttr VtpStreamSocket::GetPort(int32_t type) const
1458 {
1459     if (type == LOCAL_PORT) {
1460         return std::move(StreamAttr(localIpPort_.port));
1461     }
1462     return std::move(StreamAttr(remoteIpPort_.port));
1463 }
1464 
SetStreamType(int32_t type,const StreamAttr & value)1465 bool VtpStreamSocket::SetStreamType(int32_t type, const StreamAttr &value)
1466 {
1467     if (type != STREAM_TYPE_INT) {
1468         return false;
1469     }
1470 
1471     streamType_ = value.GetIntValue();
1472     return true;
1473 }
1474 
GetStreamType(int32_t type) const1475 StreamAttr VtpStreamSocket::GetStreamType(int32_t type) const
1476 {
1477     if (type != STREAM_TYPE_INT) {
1478         return std::move(StreamAttr());
1479     }
1480 
1481     return std::move(StreamAttr(streamType_));
1482 }
1483 
SetStreamScene(int32_t type,const StreamAttr & value)1484 bool VtpStreamSocket::SetStreamScene(int32_t type, const StreamAttr &value)
1485 {
1486     static_cast<void>(type);
1487     if (value.GetType() != INT_TYPE) {
1488         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1489         return false;
1490     }
1491     scene_ = value.GetIntValue();
1492     TRANS_LOGI(TRANS_STREAM, "Set scene=%{public}d", scene_);
1493     return true;
1494 }
1495 
SetStreamHeaderSize(int32_t type,const StreamAttr & value)1496 bool VtpStreamSocket::SetStreamHeaderSize(int32_t type, const StreamAttr &value)
1497 {
1498     static_cast<void>(type);
1499     if (value.GetType() != INT_TYPE) {
1500         TRANS_LOGE(TRANS_STREAM, "value.GetType=%{public}d", value.GetType());
1501         return false;
1502     }
1503     streamHdrSize_ = value.GetIntValue();
1504     TRANS_LOGI(TRANS_STREAM, "Set headerSize=%{public}d", streamHdrSize_);
1505     return true;
1506 }
1507 
NotifyStreamListener()1508 void VtpStreamSocket::NotifyStreamListener()
1509 {
1510     while (isStreamRecv_) {
1511         int32_t streamNum = GetStreamNum();
1512         if (streamNum >= STREAM_BUFFER_THRESHOLD) {
1513             TRANS_LOGW(TRANS_STREAM, "Too many data in receiver, streamNum=%{public}d", streamNum);
1514         }
1515 
1516         auto stream = TakeStream();
1517         if (stream == nullptr) {
1518             TRANS_LOGE(TRANS_STREAM, "Pop stream failed");
1519             break;
1520         }
1521 
1522         std::lock_guard<std::mutex> guard(streamSocketLock_);
1523         if (streamReceiver_ != nullptr) {
1524             TRANS_LOGD(TRANS_STREAM, "notify listener");
1525             streamReceiver_->OnStreamReceived(std::move(stream));
1526             TRANS_LOGD(TRANS_STREAM, "notify listener done.");
1527         }
1528     }
1529     TRANS_LOGI(TRANS_STREAM, "notify thread exit");
1530 }
1531 
GetEncryptOverhead() const1532 ssize_t VtpStreamSocket::GetEncryptOverhead() const
1533 {
1534     return OVERHEAD_LEN;
1535 }
1536 
Encrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1537 ssize_t VtpStreamSocket::Encrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1538 {
1539     if (in == nullptr || out == nullptr) {
1540         TRANS_LOGE(TRANS_STREAM, "param invalid");
1541         return SOFTBUS_INVALID_PARAM;
1542     }
1543     AesGcmCipherKey cipherKey = {0};
1544 
1545     if (inLen - OVERHEAD_LEN > outLen) {
1546         TRANS_LOGE(TRANS_STREAM, "Encrypt invalid para.");
1547         return SOFTBUS_INVALID_PARAM;
1548     }
1549 
1550     cipherKey.keyLen = SESSION_KEY_LENGTH;
1551     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1552         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1553         return SOFTBUS_MEM_ERR;
1554     }
1555 
1556     int ret = SoftBusEncryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1557     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1558     if (ret != SOFTBUS_OK || outLen != inLen + OVERHEAD_LEN) {
1559         TRANS_LOGE(TRANS_STREAM, "Encrypt Data fail. ret=%{public}d", ret);
1560         return SOFTBUS_ENCRYPT_ERR;
1561     }
1562     return outLen;
1563 }
1564 
Decrypt(const void * in,ssize_t inLen,void * out,ssize_t outLen) const1565 ssize_t VtpStreamSocket::Decrypt(const void *in, ssize_t inLen, void *out, ssize_t outLen) const
1566 {
1567     if (in == nullptr || out == nullptr) {
1568         TRANS_LOGE(TRANS_STREAM, "param invalid");
1569         return SOFTBUS_INVALID_PARAM;
1570     }
1571     AesGcmCipherKey cipherKey = {0};
1572 
1573     if (inLen - OVERHEAD_LEN > outLen) {
1574         TRANS_LOGE(TRANS_STREAM, "Decrypt invalid para.");
1575         return SOFTBUS_INVALID_PARAM;
1576     }
1577 
1578     cipherKey.keyLen = SESSION_KEY_LENGTH; // 256 bit encryption
1579     if (memcpy_s(cipherKey.key, SESSION_KEY_LENGTH, sessionKey_.first, sessionKey_.second) != EOK) {
1580         TRANS_LOGE(TRANS_STREAM, "memcpy key error.");
1581         return SOFTBUS_MEM_ERR;
1582     }
1583     int ret = SoftBusDecryptData(&cipherKey, (unsigned char *)in, inLen, (unsigned char *)out, (unsigned int *)&outLen);
1584     (void)memset_s(&cipherKey, sizeof(AesGcmCipherKey), 0, sizeof(AesGcmCipherKey));
1585     if (ret != SOFTBUS_OK) {
1586         TRANS_LOGE(TRANS_STREAM, "Decrypt Data fail. ret=%{public}d ", ret);
1587         return SOFTBUS_DECRYPT_ERR;
1588     }
1589 
1590     return outLen;
1591 }
1592 
SetMultiLayer(const void * para)1593 int32_t VtpStreamSocket::SetMultiLayer(const void *para)
1594 {
1595     int32_t fd = GetStreamSocketFd(FD).GetIntValue();
1596     return VtpSetSocketMultiLayer(fd, &onStreamEvtCb_, para);
1597 }
1598 
CreateServerProcessThread()1599 void VtpStreamSocket::CreateServerProcessThread()
1600 {
1601     auto self = this->GetSelf();
1602     std::thread([self]() {
1603         const std::string threadName = "OS_sntfStmLsn";
1604         pthread_setname_np(pthread_self(), threadName.c_str());
1605         self->NotifyStreamListener();
1606         }).detach();
1607 
1608     std::thread([self]() {
1609         const std::string threadName = "OS_sdstyStmSkt";
1610         pthread_setname_np(pthread_self(), threadName.c_str());
1611         if (!self->Accept()) {
1612             self->DestroyStreamSocket();
1613             return;
1614         }
1615         self->DoStreamRecv();
1616         self->DestroyStreamSocket();
1617         }).detach();
1618 
1619     bool &isDestroyed = isDestroyed_;
1620     std::thread([self, &isDestroyed]() {
1621         const std::string threadName = "OS_sfillStatic";
1622         pthread_setname_np(pthread_self(), threadName.c_str());
1623         while (!isDestroyed) {
1624             self->FillpAppStatistics();
1625             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1626         }
1627         }).detach();
1628 }
1629 
CreateClientProcessThread()1630 void VtpStreamSocket::CreateClientProcessThread()
1631 {
1632     auto self = this->GetSelf();
1633     std::thread([self]() {
1634         const std::string threadName = "OS_cntfStmLsn";
1635         pthread_setname_np(pthread_self(), threadName.c_str());
1636         self->NotifyStreamListener();
1637         }).detach();
1638 
1639     std::thread([self]() {
1640         const std::string threadName = "OS_cdstyStmSkt";
1641         pthread_setname_np(pthread_self(), threadName.c_str());
1642         self->DoStreamRecv();
1643         self->DestroyStreamSocket();
1644         }).detach();
1645 
1646     bool &isDestroyed = isDestroyed_;
1647     std::thread([self, &isDestroyed]() {
1648         const std::string threadName = "OS_cfillStatic";
1649         pthread_setname_np(pthread_self(), threadName.c_str());
1650         while (!isDestroyed) {
1651             self->FillpAppStatistics();
1652             std::this_thread::sleep_for(std::chrono::seconds(FEED_BACK_PERIOD));
1653         }
1654         }).detach();
1655 }
1656 } // namespace SoftBus
1657 } // namespace Communication
1658