1  /*
2   * Copyright (C) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "nstackx_dfile_session.h"
17  
18  #include "nstackx_dfile_log.h"
19  #include "nstackx_socket.h"
20  
21  #define TAG "nStackXDFile"
22  
23  #define WAIT_DATA_FRAME_WAIT_US            5 /* Spend 5us to read one file data frame. */
24  
25  #define MAX_NR_IOVCNT       20
26  #define MAX_UDP_PAYLOAD     65507
27  #define MAX_SEND_COUNT      1
28  
GetIovListSize(void)29  static inline uint32_t GetIovListSize(void)
30  {
31      // updated this value from 40 to 20 now
32      return  MAX_NR_IOVCNT;
33  }
34  
AllocIovList(List * head)35  static int32_t AllocIovList(List *head)
36  {
37      uint32_t size = GetIovListSize();
38      IovList *ptr = malloc(sizeof(IovList) * size);
39      if (ptr == NULL) {
40          return NSTACKX_ENOMEM;
41      }
42      for (uint32_t i = 0; i < size; i++) {
43          ptr[i].addr = NULL;
44          ptr[i].len = 0;
45          ListInsertTail(head, &ptr[i].entry);
46      }
47      return NSTACKX_EOK;
48  }
49  
50  #ifndef BUILD_FOR_WINDOWS
51  __attribute__((unused))
52  #endif
GetFreeIovList(DFileSession * s,int32_t tid)53  static IovList *GetFreeIovList(DFileSession *s, int32_t tid)
54  {
55      List *p = &s->freeIovList[tid];
56      List *q = NULL;
57  
58      if (ListIsEmpty(p)) {
59          int32_t err = AllocIovList(p);
60          if (err != NSTACKX_EOK) {
61              return NULL;
62          }
63      }
64  
65      q = ListPopFront(p);
66      return (IovList *)q;
67  }
68  
DestroyIovList(const List * head,DFileSession * s,uint32_t tid)69  void DestroyIovList(const List *head, DFileSession *s, uint32_t tid)
70  {
71      List *p = NULL;
72      List *n = NULL;
73      BlockFrame *block = NULL;
74  
75      (void)s;
76      (void)tid;
77      LIST_FOR_EACH_SAFE(p, n, head) {
78          block = (BlockFrame *)p;
79          ListRemoveNode(p);
80          free(block->fileDataFrame);
81          free(block);
82      }
83  }
84  
TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)85  static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len)
86  {
87      int32_t ret;
88      DFileSession *session = peerInfo->session;
89  
90      ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen);
91      if (ret > 0 && ret == (int32_t)(len - block->sendLen)) {
92          block->sendLen = 0;
93          ListRemoveNode(p);
94          free(block->fileDataFrame);
95          free(block);
96          NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
97          NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
98          NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
99      } else if (ret > 0) {
100          NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
101          block->sendLen = block->sendLen + (uint32_t)ret;
102          ret = NSTACKX_EAGAIN;
103      } else if (errno == EAGAIN) {
104          NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
105          ret = NSTACKX_EAGAIN;
106      } else {
107          DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
108          ret = NSTACKX_EFAILED;
109      }
110  
111      return ret;
112  }
113  
UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)114  static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f,
115      BlockFrame *block)
116  {
117      ListRemoveNode(p);
118      free(f);
119      free(block);
120      NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount);
121      NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount);
122      NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
123  }
124  
SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)125  static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
126  {
127      List *p = NULL;
128      List *n = NULL;
129      BlockFrame *block = NULL;
130      FileDataFrameZS *f = NULL;
131      int32_t ret;
132      uint16_t len;
133      Socket *socket = session->socket[0];
134  
135      if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) {
136          socket = session->acceptSocket;
137      }
138  
139      LIST_FOR_EACH_SAFE(p, n, head) {
140          block = (BlockFrame *)p;
141          f = (FileDataFrameZS *)(void *)block->fileDataFrame;
142          len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN;
143          if (CapsTcp(session)) {
144              ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len);
145              if (ret == NSTACKX_EFAILED) {
146                  break;
147              } else if (ret == NSTACKX_EAGAIN) {
148                  return ret;
149              }
150          } else {
151              ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len);
152              if (ret > 0) {
153                  UdpSendFileDataSuccess(session, peerInfo, p, f, block);
154              } else if (ret == NSTACKX_EAGAIN) {
155                  NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount);
156                  return ret;
157              } else {
158                  DFILE_LOGE(TAG, "socket sendto failed");
159                  break;
160              }
161          }
162      }
163  
164      DestroyIovList(head, session, tid);
165  
166      return ret;
167  }
168  
SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)169  static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid)
170  {
171      return SendFileDataFrame(session, peerInfo, head, tid);
172  }
173  
CheckUnsentList(List * unsent,List * head,int32_t maxCount)174  static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount)
175  {
176      int32_t cnt = 0;
177  
178      ListInitHead(head);
179      while (cnt < maxCount && !ListIsEmpty(unsent)) {
180          List *p = ListPopFront(unsent);
181          if (p == NULL) {
182              break;
183          }
184          ListInsertTail(head, p);
185          cnt++;
186      }
187  
188      return cnt;
189  }
190  
GetMaxSendCount(void)191  static int32_t GetMaxSendCount(void)
192  {
193      return MAX_SEND_COUNT;
194  }
195  
DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)196  static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex)
197  {
198      BlockFrame *block = NULL;
199      int32_t ret;
200      PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
201      if (!peerInfo) {
202          return NSTACKX_EFAILED;
203      }
204      int32_t maxCount = GetMaxSendCount();
205      int32_t flag;
206      do {
207          while (count < maxCount && FileManagerHasPendingData(session->fileManager)) {
208              ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count);
209              if (ret < 0) {
210                  DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret);
211                  break;
212              }
213              if (ret == 0) {
214                  NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes);
215                  (void)usleep(WAIT_DATA_FRAME_WAIT_US);
216                  continue;
217              }
218              while (block) {
219                  List *next = block->list.next;
220                  ListInsertTail(head, &block->list);
221                  block = (BlockFrame *)(void *)next;
222              }
223              count += ret;
224          }
225  
226          if (count == 0) {
227              NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes);
228              ret = NSTACKX_EOK;
229              break;
230          }
231          ret = SendFileDataFrameEx(session, peerInfo, head, tid);
232          if (ret <= 0) {
233              break;
234          }
235  
236          count = 0;
237          maxCount = GetMaxSendCount();
238          flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) :
239              (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag);
240      } while (flag && (session->stopSendCnt[tid] == 0));
241      return ret;
242  }
243  
244  
245  /*
246   *  * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt
247   *   * if backpress frame count is zero then send packet normally
248   *    */
CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)249  static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex)
250  {
251      uint32_t fileProcessCnt;
252      uint32_t sleepTime;
253      uint32_t stopCnt;
254      PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
255      if (peerInfo == NULL) {
256          return;
257      }
258  
259      if (session->stopSendCnt[tid] != 0) {
260          if (PthreadMutexLock(&session->backPressLock) != 0) {
261              DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed");
262              return;
263          }
264  
265          stopCnt = session->stopSendCnt[tid];
266          if (stopCnt == 0) {
267              if (PthreadMutexUnlock(&session->backPressLock) != 0) {
268                  DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
269              }
270              return;
271          }
272  
273          /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */
274          fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt;
275  
276          session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] -
277              fileProcessCnt) : 0;
278  
279          if (PthreadMutexUnlock(&session->backPressLock) != 0) {
280              DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed");
281              return;
282          }
283  
284          sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval;
285  
286  #ifndef NSTACKX_WITH_LITEOS
287          DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime,
288               fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]);
289  #endif
290          (void)usleep(sleepTime);
291      }
292  }
293  
SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)294  int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex)
295  {
296      int32_t ret = NSTACKX_EOK;
297      PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session);
298      List tmpq;
299  
300      if (peerInfo == NULL) {
301          return NSTACKX_EFAILED;
302      }
303      if (peerInfo->amendSendRate == 0) {
304          return ret;
305      }
306  
307      CheckSendByBackPress(session, tid, socketIndex);
308  
309      int32_t maxCount = GetMaxSendCount();
310      int32_t count = CheckUnsentList(unsent, &tmpq, maxCount);
311      ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex);
312      if (ret == NSTACKX_EAGAIN) {
313          ListMove(&tmpq, unsent);
314      }
315      return ret;
316  }
317  
SendControlFrame(DFileSession * session,QueueNode * queueNode)318  int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode)
319  {
320      int32_t ret;
321      Socket *socket = NULL;
322  
323      if (CapsTcp(session)) {
324          socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0];
325          ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen);
326          if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) {
327              queueNode->sendLen = 0;
328          } else if (ret > 0) {
329              queueNode->sendLen = queueNode->sendLen + (uint32_t)ret;
330              ret = NSTACKX_EAGAIN;
331          } else if (errno == EAGAIN) {
332              ret = NSTACKX_EAGAIN;
333          } else {
334              DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno);
335              ret = NSTACKX_EFAILED;
336          }
337          return ret;
338      }
339  
340      uint8_t socketIndex = queueNode->socketIndex;
341      ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length);
342      if (ret <= 0) {
343          if (ret != NSTACKX_EAGAIN) {
344              DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno);
345              ret = NSTACKX_EFAILED;
346          }
347      }
348  
349      return ret;
350  }
351  
SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)352  int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode)
353  {
354      QueueNode *queueNode = *preQueueNode;
355      int32_t ret;
356  
357      do {
358          if (PthreadMutexLock(&session->outboundQueueLock) != 0) {
359              DFILE_LOGE(TAG, "Pthread mutex lock failed");
360              ret = NSTACKX_EFAILED;
361              break;
362          }
363          if (queueNode == NULL && session->outboundQueueSize) {
364              queueNode = (QueueNode *)ListPopFront(&session->outboundQueue);
365              session->outboundQueueSize--;
366          }
367          if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) {
368              DFILE_LOGE(TAG, "Pthread mutex unlock failed");
369              ret = NSTACKX_EFAILED;
370              break;
371          }
372          if (queueNode == NULL) {
373              ret = NSTACKX_EOK;
374              break;
375          }
376  
377          uint32_t socketIndex = queueNode->socketIndex;
378          if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP &&
379              session->socket[socketIndex]->isServer == NSTACKX_TRUE) {
380              session->socket[socketIndex]->dstAddr = queueNode->peerAddr;
381          }
382  
383          ret = SendControlFrame(session, queueNode);
384          if (ret <= 0) {
385              break;
386          }
387          /* Send ok, try to get next frame. */
388          DestroyQueueNode(queueNode);
389          queueNode = NULL;
390          NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks);
391      } while (!session->closeFlag);
392  
393      if (ret == NSTACKX_EAGAIN) {
394          *preQueueNode = queueNode;
395      } else {
396          *preQueueNode = NULL;
397          DestroyQueueNode(queueNode);
398          queueNode = NULL;
399      }
400      return ret;
401  }
402  
TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)403  int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr,
404      const socklen_t *addrLen)
405  {
406      int32_t ret;
407      int recvLen = 0;
408  
409      Socket *socket = session->socket[0];
410  
411      if (session->sessionType == DFILE_SESSION_TYPE_SERVER) {
412          socket = session->acceptSocket;
413      }
414  
415      while (recvLen < (int32_t)length) {
416          ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen);
417          if (ret == 0) {
418              return NSTACKX_PEER_CLOSE;
419          }
420          if (ret < 0) {
421              if (errno != EAGAIN) {
422                  ret = NSTACKX_EFAILED;
423                  return ret;
424              } else {
425                  return NSTACKX_EAGAIN;
426              }
427          }
428          recvLen = recvLen + ret;
429          session->recvLen = session->recvLen + (uint32_t)ret;
430      }
431  
432      return recvLen;
433  }
434  
SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)435  int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr,
436      const socklen_t *addrLen)
437  {
438      int32_t ret;
439      uint16_t payloadLen;
440      DFileFrameHeader *frameHeader = NULL;
441      size_t length = sizeof(DFileFrameHeader);
442      if (session->recvLen < length) {
443          ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen);
444          if (ret <= 0) {
445              return ret;
446          }
447      }
448  
449      frameHeader = (DFileFrameHeader *)(session->recvBuffer);
450      payloadLen = ntohs(frameHeader->length);
451      if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) {
452          DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length,
453               session->recvLen, payloadLen, frameHeader->type);
454          return NSTACKX_EFAILED;
455      }
456  
457      if ((session->recvLen - length) < payloadLen) {
458          ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen);
459          if (ret <= 0) {
460              return ret;
461          }
462      }
463  
464      return (int32_t)(session->recvLen);
465  }
466