1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nstackx_dfile_session.h"
17 
18 #include "nstackx_dfile_log.h"
19 #include "nstackx_socket.h"
20 
21 #define TAG "nStackXDFile"
22 
23 #define WAIT_DATA_FRAME_WAIT_US            5 /* Spend 5us to read one file data frame. */
24 
25 #define MAX_NR_IOVCNT       20
26 #define MAX_UDP_PAYLOAD     65507
27 #define MAX_SEND_COUNT      1
28 
GetIovListSize(void)29 static inline uint32_t GetIovListSize(void)
30 {
31     // updated this value from 40 to 20 now
32     return  MAX_NR_IOVCNT;
33 }
34 
AllocIovList(List * head)35 static int32_t AllocIovList(List *head)
36 {
37     uint32_t size = GetIovListSize();
38     IovList *ptr = malloc(sizeof(IovList) * size);
39     if (ptr == NULL) {
40         return NSTACKX_ENOMEM;
41     }
42     for (uint32_t i = 0; i < size; i++) {
43         ptr[i].addr = NULL;
44         ptr[i].len = 0;
45         ListInsertTail(head, &ptr[i].entry);
46     }
47     return NSTACKX_EOK;
48 }
49 
50 #ifndef BUILD_FOR_WINDOWS
51 __attribute__((unused))
52 #endif
GetFreeIovList(DFileSession * s,int32_t tid)53 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
54 {
55     List *p = &s->freeIovList[tid];
56     List *q = NULL;
57 
58     if (ListIsEmpty(p)) {
59         int32_t err = AllocIovList(p);
60         if (err != NSTACKX_EOK) {
61             return NULL;
62         }
63     }
64 
65     q = ListPopFront(p);
66     return (IovList *)q;
67 }
68 
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)69 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
70 {
71     List *p = NULL;
72     List *n = NULL;
73     BlockFrame *block = NULL;
74 
75     (void)s;
76     (void)tid;
77     LIST_FOR_EACH_SAFE(p, n, head) {
78         block = (BlockFrame *)p;
79         ListRemoveNode(p);
80         free(block->fileDataFrame);
81         free(block);
82     }
83 }
84 
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)85 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
86 {
87     int32_t ret;
88     DFileSession *session = peerInfo->session;
89 
90     ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
91     if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
92         block->sendLen = 0;
93         ListRemoveNode(p);
94         free(block->fileDataFrame);
95         free(block);
96         NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
97         NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
98         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
99     } else if (ret > 0) {
100         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
101         block->sendLen = block->sendLen + (uint32_t)ret;
102         ret = NSTACKX_EAGAIN;
103     } else if (errno == EAGAIN) {
104         NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
105         ret = NSTACKX_EAGAIN;
106     } else {
107         DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
108         ret = NSTACKX_EFAILED;
109     }
110 
111     return ret;
112 }
113 
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)114 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
115     BlockFrame *block)
116 {
117     ListRemoveNode(p);
118     free(f);
119     free(block);
120     NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
121     NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
122     NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
123 }
124 
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)125 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
126 {
127     List *p = NULL;
128     List *n = NULL;
129     BlockFrame *block = NULL;
130     FileDataFrameZS *f = NULL;
131     int32_t ret;
132     uint16_t len;
133     Socket *socket = session->socket[0];
134 
135     if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
136         socket = session->acceptSocket;
137     }
138 
139     LIST_FOR_EACH_SAFE(p, n, head) {
140         block = (BlockFrame *)p;
141         f = (FileDataFrameZS *)(void *)block->fileDataFrame;
142         len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
143         if (CapsTcp(session)) {
144             ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
145             if (ret == NSTACKX_EFAILED) {
146                 break;
147             } else if (ret == NSTACKX_EAGAIN) {
148                 return ret;
149             }
150         } else {
151             ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
152             if (ret > 0) {
153                 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
154             } else if (ret == NSTACKX_EAGAIN) {
155                 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
156                 return ret;
157             } else {
158                 DFILE_LOGE(TAG, "socket sendto failed");
159                 break;
160             }
161         }
162     }
163 
164     DestroyIovList(head, session, tid);
165 
166     return ret;
167 }
168 
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)169 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
170 {
171     return SendFileDataFrame(session, peerInfo, head, tid);
172 }
173 
CheckUnsentList(List * unsent,List * head,int32_t maxCount)174 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
175 {
176     int32_t cnt = 0;
177 
178     ListInitHead(head);
179     while (cnt < maxCount && !ListIsEmpty(unsent)) {
180         List *p = ListPopFront(unsent);
181         if (p == NULL) {
182             break;
183         }
184         ListInsertTail(head, p);
185         cnt++;
186     }
187 
188     return cnt;
189 }
190 
GetMaxSendCount(void)191 static int32_t GetMaxSendCount(void)
192 {
193     return MAX_SEND_COUNT;
194 }
195 
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)196 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
197 {
198     BlockFrame *block = NULL;
199     int32_t ret;
200     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
201     if (!peerInfo) {
202         return NSTACKX_EFAILED;
203     }
204     int32_t maxCount = GetMaxSendCount();
205     int32_t flag;
206     do {
207         while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
208             ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
209             if (ret < 0) {
210                 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
211                 break;
212             }
213             if (ret == 0) {
214                 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
215                 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
216                 continue;
217             }
218             while (block) {
219                 List *next = block->list.next;
220                 ListInsertTail(head, &block->list);
221                 block = (BlockFrame *)(void *)next;
222             }
223             count += ret;
224         }
225 
226         if (count == 0) {
227             NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
228             ret = NSTACKX_EOK;
229             break;
230         }
231         ret = SendFileDataFrameEx(session, peerInfo, head, tid);
232         if (ret <= 0) {
233             break;
234         }
235 
236         count = 0;
237         maxCount = GetMaxSendCount();
238         flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
239             (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
240     } while (flag && (session->stopSendCnt[tid] == 0));
241     return ret;
242 }
243 
244 
245 /*
246  *  * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
247  *   * if backpress frame count is zero then send packet normally
248  *    */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)249 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
250 {
251     uint32_t fileProcessCnt;
252     uint32_t sleepTime;
253     uint32_t stopCnt;
254     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
255     if (peerInfo == NULL) {
256         return;
257     }
258 
259     if (session->stopSendCnt[tid] != 0) {
260         if (PthreadMutexLock(&session->backPressLock) != 0) {
261             DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
262             return;
263         }
264 
265         stopCnt = session->stopSendCnt[tid];
266         if (stopCnt == 0) {
267             if (PthreadMutexUnlock(&session->backPressLock) != 0) {
268                 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
269             }
270             return;
271         }
272 
273         /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
274         fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
275 
276         session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
277             fileProcessCnt) : 0;
278 
279         if (PthreadMutexUnlock(&session->backPressLock) != 0) {
280             DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
281             return;
282         }
283 
284         sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
285 
286 #ifndef NSTACKX_WITH_LITEOS
287         DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
288              fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
289 #endif
290         (void)usleep(sleepTime);
291     }
292 }
293 
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)294 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
295 {
296     int32_t ret = NSTACKX_EOK;
297     PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
298     List tmpq;
299 
300     if (peerInfo == NULL) {
301         return NSTACKX_EFAILED;
302     }
303     if (peerInfo->amendSendRate == 0) {
304         return ret;
305     }
306 
307     CheckSendByBackPress(session, tid, socketIndex);
308 
309     int32_t maxCount = GetMaxSendCount();
310     int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
311     ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
312     if (ret == NSTACKX_EAGAIN) {
313         ListMove(&tmpq, unsent);
314     }
315     return ret;
316 }
317 
SendControlFrame(DFileSession * session,QueueNode * queueNode)318 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
319 {
320     int32_t ret;
321     Socket *socket = NULL;
322 
323     if (CapsTcp(session)) {
324         socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
325         ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
326         if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
327             queueNode->sendLen = 0;
328         } else if (ret > 0) {
329             queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
330             ret = NSTACKX_EAGAIN;
331         } else if (errno == EAGAIN) {
332             ret = NSTACKX_EAGAIN;
333         } else {
334             DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
335             ret = NSTACKX_EFAILED;
336         }
337         return ret;
338     }
339 
340     uint8_t socketIndex = queueNode->socketIndex;
341     ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
342     if (ret <= 0) {
343         if (ret != NSTACKX_EAGAIN) {
344             DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
345             ret = NSTACKX_EFAILED;
346         }
347     }
348 
349     return ret;
350 }
351 
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)352 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
353 {
354     QueueNode *queueNode = *preQueueNode;
355     int32_t ret;
356 
357     do {
358         if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
359             DFILE_LOGE(TAG, "Pthread mutex lock failed");
360             ret = NSTACKX_EFAILED;
361             break;
362         }
363         if (queueNode == NULL && session->outboundQueueSize) {
364             queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
365             session->outboundQueueSize--;
366         }
367         if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
368             DFILE_LOGE(TAG, "Pthread mutex unlock failed");
369             ret = NSTACKX_EFAILED;
370             break;
371         }
372         if (queueNode == NULL) {
373             ret = NSTACKX_EOK;
374             break;
375         }
376 
377         uint32_t socketIndex = queueNode->socketIndex;
378         if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
379             session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
380             session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
381         }
382 
383         ret = SendControlFrame(session, queueNode);
384         if (ret <= 0) {
385             break;
386         }
387         /* Send ok, try to get next frame. */
388         DestroyQueueNode(queueNode);
389         queueNode = NULL;
390         NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
391     } while (!session->closeFlag);
392 
393     if (ret == NSTACKX_EAGAIN) {
394         *preQueueNode = queueNode;
395     } else {
396         *preQueueNode = NULL;
397         DestroyQueueNode(queueNode);
398         queueNode = NULL;
399     }
400     return ret;
401 }
402 
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)403 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
404     const socklen_t *addrLen)
405 {
406     int32_t ret;
407     int recvLen = 0;
408 
409     Socket *socket = session->socket[0];
410 
411     if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
412         socket = session->acceptSocket;
413     }
414 
415     while (recvLen < (int32_t)length) {
416         ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
417         if (ret == 0) {
418             return NSTACKX_PEER_CLOSE;
419         }
420         if (ret < 0) {
421             if (errno != EAGAIN) {
422                 ret = NSTACKX_EFAILED;
423                 return ret;
424             } else {
425                 return NSTACKX_EAGAIN;
426             }
427         }
428         recvLen = recvLen + ret;
429         session->recvLen = session->recvLen + (uint32_t)ret;
430     }
431 
432     return recvLen;
433 }
434 
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)435 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
436     const socklen_t *addrLen)
437 {
438     int32_t ret;
439     uint16_t payloadLen;
440     DFileFrameHeader *frameHeader = NULL;
441     size_t length = sizeof(DFileFrameHeader);
442     if (session->recvLen < length) {
443         ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
444         if (ret <= 0) {
445             return ret;
446         }
447     }
448 
449     frameHeader = (DFileFrameHeader *)(session->recvBuffer);
450     payloadLen = ntohs(frameHeader->length);
451     if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) {
452         DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
453              session->recvLen, payloadLen, frameHeader->type);
454         return NSTACKX_EFAILED;
455     }
456 
457     if ((session->recvLen - length) < payloadLen) {
458         ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
459         if (ret <= 0) {
460             return ret;
461         }
462     }
463 
464     return (int32_t)(session->recvLen);
465 }
466