1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nstackx_dfile_session.h"
17
18 #include "nstackx_dfile_log.h"
19 #include "nstackx_socket.h"
20
21 #define TAG "nStackXDFile"
22
23 #define WAIT_DATA_FRAME_WAIT_US 5 /* Spend 5us to read one file data frame. */
24
25 #define MAX_NR_IOVCNT 20
26 #define MAX_UDP_PAYLOAD 65507
27 #define MAX_SEND_COUNT 1
28
GetIovListSize(void)29 static inline uint32_t GetIovListSize(void)
30 {
31 // updated this value from 40 to 20 now
32 return MAX_NR_IOVCNT;
33 }
34
AllocIovList(List * head)35 static int32_t AllocIovList(List *head)
36 {
37 uint32_t size = GetIovListSize();
38 IovList *ptr = malloc(sizeof(IovList) * size);
39 if (ptr == NULL) {
40 return NSTACKX_ENOMEM;
41 }
42 for (uint32_t i = 0; i < size; i++) {
43 ptr[i].addr = NULL;
44 ptr[i].len = 0;
45 ListInsertTail(head, &ptr[i].entry);
46 }
47 return NSTACKX_EOK;
48 }
49
50 #ifndef BUILD_FOR_WINDOWS
51 __attribute__((unused))
52 #endif
GetFreeIovList(DFileSession * s,int32_t tid)53 static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
54 {
55 List *p = &s->freeIovList[tid];
56 List *q = NULL;
57
58 if (ListIsEmpty(p)) {
59 int32_t err = AllocIovList(p);
60 if (err != NSTACKX_EOK) {
61 return NULL;
62 }
63 }
64
65 q = ListPopFront(p);
66 return (IovList *)q;
67 }
68
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)69 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
70 {
71 List *p = NULL;
72 List *n = NULL;
73 BlockFrame *block = NULL;
74
75 (void)s;
76 (void)tid;
77 LIST_FOR_EACH_SAFE(p, n, head) {
78 block = (BlockFrame *)p;
79 ListRemoveNode(p);
80 free(block->fileDataFrame);
81 free(block);
82 }
83 }
84
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)85 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
86 {
87 int32_t ret;
88 DFileSession *session = peerInfo->session;
89
90 ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
91 if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
92 block->sendLen = 0;
93 ListRemoveNode(p);
94 free(block->fileDataFrame);
95 free(block);
96 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
97 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
98 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
99 } else if (ret > 0) {
100 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
101 block->sendLen = block->sendLen + (uint32_t)ret;
102 ret = NSTACKX_EAGAIN;
103 } else if (errno == EAGAIN) {
104 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
105 ret = NSTACKX_EAGAIN;
106 } else {
107 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
108 ret = NSTACKX_EFAILED;
109 }
110
111 return ret;
112 }
113
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)114 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
115 BlockFrame *block)
116 {
117 ListRemoveNode(p);
118 free(f);
119 free(block);
120 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
121 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
122 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
123 }
124
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)125 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
126 {
127 List *p = NULL;
128 List *n = NULL;
129 BlockFrame *block = NULL;
130 FileDataFrameZS *f = NULL;
131 int32_t ret;
132 uint16_t len;
133 Socket *socket = session->socket[0];
134
135 if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
136 socket = session->acceptSocket;
137 }
138
139 LIST_FOR_EACH_SAFE(p, n, head) {
140 block = (BlockFrame *)p;
141 f = (FileDataFrameZS *)(void *)block->fileDataFrame;
142 len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
143 if (CapsTcp(session)) {
144 ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
145 if (ret == NSTACKX_EFAILED) {
146 break;
147 } else if (ret == NSTACKX_EAGAIN) {
148 return ret;
149 }
150 } else {
151 ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
152 if (ret > 0) {
153 UdpSendFileDataSuccess(session, peerInfo, p, f, block);
154 } else if (ret == NSTACKX_EAGAIN) {
155 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
156 return ret;
157 } else {
158 DFILE_LOGE(TAG, "socket sendto failed");
159 break;
160 }
161 }
162 }
163
164 DestroyIovList(head, session, tid);
165
166 return ret;
167 }
168
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)169 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
170 {
171 return SendFileDataFrame(session, peerInfo, head, tid);
172 }
173
CheckUnsentList(List * unsent,List * head,int32_t maxCount)174 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
175 {
176 int32_t cnt = 0;
177
178 ListInitHead(head);
179 while (cnt < maxCount && !ListIsEmpty(unsent)) {
180 List *p = ListPopFront(unsent);
181 if (p == NULL) {
182 break;
183 }
184 ListInsertTail(head, p);
185 cnt++;
186 }
187
188 return cnt;
189 }
190
GetMaxSendCount(void)191 static int32_t GetMaxSendCount(void)
192 {
193 return MAX_SEND_COUNT;
194 }
195
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)196 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
197 {
198 BlockFrame *block = NULL;
199 int32_t ret;
200 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
201 if (!peerInfo) {
202 return NSTACKX_EFAILED;
203 }
204 int32_t maxCount = GetMaxSendCount();
205 int32_t flag;
206 do {
207 while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
208 ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
209 if (ret < 0) {
210 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
211 break;
212 }
213 if (ret == 0) {
214 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
215 (void)usleep(WAIT_DATA_FRAME_WAIT_US);
216 continue;
217 }
218 while (block) {
219 List *next = block->list.next;
220 ListInsertTail(head, &block->list);
221 block = (BlockFrame *)(void *)next;
222 }
223 count += ret;
224 }
225
226 if (count == 0) {
227 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
228 ret = NSTACKX_EOK;
229 break;
230 }
231 ret = SendFileDataFrameEx(session, peerInfo, head, tid);
232 if (ret <= 0) {
233 break;
234 }
235
236 count = 0;
237 maxCount = GetMaxSendCount();
238 flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
239 (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
240 } while (flag && (session->stopSendCnt[tid] == 0));
241 return ret;
242 }
243
244
245 /*
246 * * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
247 * * if backpress frame count is zero then send packet normally
248 * */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)249 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
250 {
251 uint32_t fileProcessCnt;
252 uint32_t sleepTime;
253 uint32_t stopCnt;
254 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
255 if (peerInfo == NULL) {
256 return;
257 }
258
259 if (session->stopSendCnt[tid] != 0) {
260 if (PthreadMutexLock(&session->backPressLock) != 0) {
261 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
262 return;
263 }
264
265 stopCnt = session->stopSendCnt[tid];
266 if (stopCnt == 0) {
267 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
268 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
269 }
270 return;
271 }
272
273 /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
274 fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
275
276 session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
277 fileProcessCnt) : 0;
278
279 if (PthreadMutexUnlock(&session->backPressLock) != 0) {
280 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
281 return;
282 }
283
284 sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
285
286 #ifndef NSTACKX_WITH_LITEOS
287 DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
288 fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
289 #endif
290 (void)usleep(sleepTime);
291 }
292 }
293
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)294 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
295 {
296 int32_t ret = NSTACKX_EOK;
297 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
298 List tmpq;
299
300 if (peerInfo == NULL) {
301 return NSTACKX_EFAILED;
302 }
303 if (peerInfo->amendSendRate == 0) {
304 return ret;
305 }
306
307 CheckSendByBackPress(session, tid, socketIndex);
308
309 int32_t maxCount = GetMaxSendCount();
310 int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
311 ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
312 if (ret == NSTACKX_EAGAIN) {
313 ListMove(&tmpq, unsent);
314 }
315 return ret;
316 }
317
SendControlFrame(DFileSession * session,QueueNode * queueNode)318 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
319 {
320 int32_t ret;
321 Socket *socket = NULL;
322
323 if (CapsTcp(session)) {
324 socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
325 ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
326 if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
327 queueNode->sendLen = 0;
328 } else if (ret > 0) {
329 queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
330 ret = NSTACKX_EAGAIN;
331 } else if (errno == EAGAIN) {
332 ret = NSTACKX_EAGAIN;
333 } else {
334 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
335 ret = NSTACKX_EFAILED;
336 }
337 return ret;
338 }
339
340 uint8_t socketIndex = queueNode->socketIndex;
341 ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
342 if (ret <= 0) {
343 if (ret != NSTACKX_EAGAIN) {
344 DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
345 ret = NSTACKX_EFAILED;
346 }
347 }
348
349 return ret;
350 }
351
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)352 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
353 {
354 QueueNode *queueNode = *preQueueNode;
355 int32_t ret;
356
357 do {
358 if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
359 DFILE_LOGE(TAG, "Pthread mutex lock failed");
360 ret = NSTACKX_EFAILED;
361 break;
362 }
363 if (queueNode == NULL && session->outboundQueueSize) {
364 queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
365 session->outboundQueueSize--;
366 }
367 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
368 DFILE_LOGE(TAG, "Pthread mutex unlock failed");
369 ret = NSTACKX_EFAILED;
370 break;
371 }
372 if (queueNode == NULL) {
373 ret = NSTACKX_EOK;
374 break;
375 }
376
377 uint32_t socketIndex = queueNode->socketIndex;
378 if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
379 session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
380 session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
381 }
382
383 ret = SendControlFrame(session, queueNode);
384 if (ret <= 0) {
385 break;
386 }
387 /* Send ok, try to get next frame. */
388 DestroyQueueNode(queueNode);
389 queueNode = NULL;
390 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
391 } while (!session->closeFlag);
392
393 if (ret == NSTACKX_EAGAIN) {
394 *preQueueNode = queueNode;
395 } else {
396 *preQueueNode = NULL;
397 DestroyQueueNode(queueNode);
398 queueNode = NULL;
399 }
400 return ret;
401 }
402
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)403 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
404 const socklen_t *addrLen)
405 {
406 int32_t ret;
407 int recvLen = 0;
408
409 Socket *socket = session->socket[0];
410
411 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
412 socket = session->acceptSocket;
413 }
414
415 while (recvLen < (int32_t)length) {
416 ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
417 if (ret == 0) {
418 return NSTACKX_PEER_CLOSE;
419 }
420 if (ret < 0) {
421 if (errno != EAGAIN) {
422 ret = NSTACKX_EFAILED;
423 return ret;
424 } else {
425 return NSTACKX_EAGAIN;
426 }
427 }
428 recvLen = recvLen + ret;
429 session->recvLen = session->recvLen + (uint32_t)ret;
430 }
431
432 return recvLen;
433 }
434
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)435 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
436 const socklen_t *addrLen)
437 {
438 int32_t ret;
439 uint16_t payloadLen;
440 DFileFrameHeader *frameHeader = NULL;
441 size_t length = sizeof(DFileFrameHeader);
442 if (session->recvLen < length) {
443 ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
444 if (ret <= 0) {
445 return ret;
446 }
447 }
448
449 frameHeader = (DFileFrameHeader *)(session->recvBuffer);
450 payloadLen = ntohs(frameHeader->length);
451 if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) {
452 DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
453 session->recvLen, payloadLen, frameHeader->type);
454 return NSTACKX_EFAILED;
455 }
456
457 if ((session->recvLen - length) < payloadLen) {
458 ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
459 if (ret <= 0) {
460 return ret;
461 }
462 }
463
464 return (int32_t)(session->recvLen);
465 }
466