1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "nstackx_dfile_session.h" 17 18 #include "nstackx_dfile_log.h" 19 #include "nstackx_socket.h" 20 21 #define TAG "nStackXDFile" 22 23 #define WAIT_DATA_FRAME_WAIT_US 5 /* Spend 5us to read one file data frame. */ 24 25 #define MAX_NR_IOVCNT 20 26 #define MAX_UDP_PAYLOAD 65507 27 #define MAX_SEND_COUNT 1 28 GetIovListSize(void)29 static inline uint32_t GetIovListSize(void) 30 { 31 // updated this value from 40 to 20 now 32 return MAX_NR_IOVCNT; 33 } 34 AllocIovList(List * head)35 static int32_t AllocIovList(List *head) 36 { 37 uint32_t size = GetIovListSize(); 38 IovList *ptr = malloc(sizeof(IovList) * size); 39 if (ptr == NULL) { 40 return NSTACKX_ENOMEM; 41 } 42 for (uint32_t i = 0; i < size; i++) { 43 ptr[i].addr = NULL; 44 ptr[i].len = 0; 45 ListInsertTail(head, &ptr[i].entry); 46 } 47 return NSTACKX_EOK; 48 } 49 50 #ifndef BUILD_FOR_WINDOWS 51 __attribute__((unused)) 52 #endif GetFreeIovList(DFileSession * s,int32_t tid)53 static IovList *GetFreeIovList(DFileSession *s, int32_t tid) 54 { 55 List *p = &s->freeIovList[tid]; 56 List *q = NULL; 57 58 if (ListIsEmpty(p)) { 59 int32_t err = AllocIovList(p); 60 if (err != NSTACKX_EOK) { 61 return NULL; 62 } 63 } 64 65 q = ListPopFront(p); 66 return (IovList *)q; 67 } 68 DestroyIovList(const List * head,DFileSession * s,uint32_t tid)69 void DestroyIovList(const List *head, DFileSession *s, uint32_t tid) 70 { 71 List *p = NULL; 72 List *n = NULL; 73 BlockFrame *block = NULL; 74 75 (void)s; 76 (void)tid; 77 LIST_FOR_EACH_SAFE(p, n, head) { 78 block = (BlockFrame *)p; 79 ListRemoveNode(p); 80 free(block->fileDataFrame); 81 free(block); 82 } 83 } 84 TcpSendFileDataFrame(Socket * socket,PeerInfo * peerInfo,List * p,BlockFrame * block,uint16_t len)85 static int32_t TcpSendFileDataFrame(Socket *socket, PeerInfo *peerInfo, List *p, BlockFrame *block, uint16_t len) 86 { 87 int32_t ret; 88 DFileSession *session = peerInfo->session; 89 90 ret = SocketSend(socket, (uint8_t *)block->fileDataFrame + block->sendLen, len - block->sendLen); 91 if (ret > 0 && ret == (int32_t)(len - block->sendLen)) { 92 block->sendLen = 0; 93 ListRemoveNode(p); 94 free(block->fileDataFrame); 95 free(block); 96 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount); 97 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount); 98 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks); 99 } else if (ret > 0) { 100 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount); 101 block->sendLen = block->sendLen + (uint32_t)ret; 102 ret = NSTACKX_EAGAIN; 103 } else if (errno == EAGAIN) { 104 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount); 105 ret = NSTACKX_EAGAIN; 106 } else { 107 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno); 108 ret = NSTACKX_EFAILED; 109 } 110 111 return ret; 112 } 113 UdpSendFileDataSuccess(DFileSession * session,PeerInfo * peerInfo,List * p,FileDataFrameZS * f,BlockFrame * block)114 static void UdpSendFileDataSuccess(DFileSession *session, PeerInfo *peerInfo, List *p, FileDataFrameZS *f, 115 BlockFrame *block) 116 { 117 ListRemoveNode(p); 118 free(f); 119 free(block); 120 NSTACKX_ATOM_FETCH_INC(&peerInfo->sendCount); 121 NSTACKX_ATOM_FETCH_INC(&peerInfo->intervalSendCount); 122 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks); 123 } 124 SendFileDataFrame(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)125 static int32_t SendFileDataFrame(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid) 126 { 127 List *p = NULL; 128 List *n = NULL; 129 BlockFrame *block = NULL; 130 FileDataFrameZS *f = NULL; 131 int32_t ret; 132 uint16_t len; 133 Socket *socket = session->socket[0]; 134 135 if (CapsTcp(session) && (session->sessionType == DFILE_SESSION_TYPE_SERVER)) { 136 socket = session->acceptSocket; 137 } 138 139 LIST_FOR_EACH_SAFE(p, n, head) { 140 block = (BlockFrame *)p; 141 f = (FileDataFrameZS *)(void *)block->fileDataFrame; 142 len = ntohs(f->header.length) + DFILE_FRAME_HEADER_LEN; 143 if (CapsTcp(session)) { 144 ret = TcpSendFileDataFrame(socket, peerInfo, p, block, len); 145 if (ret == NSTACKX_EFAILED) { 146 break; 147 } else if (ret == NSTACKX_EAGAIN) { 148 return ret; 149 } 150 } else { 151 ret = SocketSend(session->socket[peerInfo->socketIndex], (void *)f, len); 152 if (ret > 0) { 153 UdpSendFileDataSuccess(session, peerInfo, p, f, block); 154 } else if (ret == NSTACKX_EAGAIN) { 155 NSTACKX_ATOM_FETCH_INC(&peerInfo->eAgainCount); 156 return ret; 157 } else { 158 DFILE_LOGE(TAG, "socket sendto failed"); 159 break; 160 } 161 } 162 } 163 164 DestroyIovList(head, session, tid); 165 166 return ret; 167 } 168 SendFileDataFrameEx(DFileSession * session,PeerInfo * peerInfo,List * head,uint32_t tid)169 static int32_t SendFileDataFrameEx(DFileSession *session, PeerInfo *peerInfo, List *head, uint32_t tid) 170 { 171 return SendFileDataFrame(session, peerInfo, head, tid); 172 } 173 CheckUnsentList(List * unsent,List * head,int32_t maxCount)174 static int32_t CheckUnsentList(List *unsent, List *head, int32_t maxCount) 175 { 176 int32_t cnt = 0; 177 178 ListInitHead(head); 179 while (cnt < maxCount && !ListIsEmpty(unsent)) { 180 List *p = ListPopFront(unsent); 181 if (p == NULL) { 182 break; 183 } 184 ListInsertTail(head, p); 185 cnt++; 186 } 187 188 return cnt; 189 } 190 GetMaxSendCount(void)191 static int32_t GetMaxSendCount(void) 192 { 193 return MAX_SEND_COUNT; 194 } 195 DoSendDataFrame(DFileSession * session,List * head,int32_t count,uint32_t tid,uint8_t socketIndex)196 static int32_t DoSendDataFrame(DFileSession *session, List *head, int32_t count, uint32_t tid, uint8_t socketIndex) 197 { 198 BlockFrame *block = NULL; 199 int32_t ret; 200 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session); 201 if (!peerInfo) { 202 return NSTACKX_EFAILED; 203 } 204 int32_t maxCount = GetMaxSendCount(); 205 int32_t flag; 206 do { 207 while (count < maxCount && FileManagerHasPendingData(session->fileManager)) { 208 ret = FileManagerFileRead(session->fileManager, tid, &block, maxCount - count); 209 if (ret < 0) { 210 DFILE_LOGE(TAG, "FileManagerFileRead failed %d", ret); 211 break; 212 } 213 if (ret == 0) { 214 NSTACKX_ATOM_FETCH_INC(&session->sendBlockListEmptyTimes); 215 (void)usleep(WAIT_DATA_FRAME_WAIT_US); 216 continue; 217 } 218 while (block) { 219 List *next = block->list.next; 220 ListInsertTail(head, &block->list); 221 block = (BlockFrame *)(void *)next; 222 } 223 count += ret; 224 } 225 226 if (count == 0) { 227 NSTACKX_ATOM_FETCH_INC(&session->noPendingDataTimes); 228 ret = NSTACKX_EOK; 229 break; 230 } 231 ret = SendFileDataFrameEx(session, peerInfo, head, tid); 232 if (ret <= 0) { 233 break; 234 } 235 236 count = 0; 237 maxCount = GetMaxSendCount(); 238 flag = CapsTcp(session) ? (session->sendRemain ? 0 : 1) : 239 (peerInfo->intervalSendCount < (uint16_t)peerInfo->amendSendRate && !session->closeFlag); 240 } while (flag && (session->stopSendCnt[tid] == 0)); 241 return ret; 242 } 243 244 245 /* 246 * * if backpress frame count is not zero then sleep one ack interval and update stopSendCnt 247 * * if backpress frame count is zero then send packet normally 248 * */ CheckSendByBackPress(DFileSession * session,uint32_t tid,uint8_t socketIndex)249 static void CheckSendByBackPress(DFileSession *session, uint32_t tid, uint8_t socketIndex) 250 { 251 uint32_t fileProcessCnt; 252 uint32_t sleepTime; 253 uint32_t stopCnt; 254 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session); 255 if (peerInfo == NULL) { 256 return; 257 } 258 259 if (session->stopSendCnt[tid] != 0) { 260 if (PthreadMutexLock(&session->backPressLock) != 0) { 261 DFILE_LOGE(TAG, "pthread backPressLock mutex lock failed"); 262 return; 263 } 264 265 stopCnt = session->stopSendCnt[tid]; 266 if (stopCnt == 0) { 267 if (PthreadMutexUnlock(&session->backPressLock) != 0) { 268 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed"); 269 } 270 return; 271 } 272 273 /* fileProcessCnt corresponds to trans one-to-one, one ack interval recv fileProcessCnt backpress frame */ 274 fileProcessCnt = session->fileListProcessingCnt + session->smallListProcessingCnt; 275 276 session->stopSendCnt[tid] = (session->stopSendCnt[tid] > fileProcessCnt) ? (session->stopSendCnt[tid] - 277 fileProcessCnt) : 0; 278 279 if (PthreadMutexUnlock(&session->backPressLock) != 0) { 280 DFILE_LOGE(TAG, "pthread backPressLock mutex unlock failed"); 281 return; 282 } 283 284 sleepTime = CapsTcp(session) ? NSTACKX_INIT_RATE_STAT_INTERVAL : peerInfo->rateStateInterval; 285 286 #ifndef NSTACKX_WITH_LITEOS 287 DFILE_LOGI(TAG, "tid %u sleep %u us fileProCnt %u Interval %u lastStopCnt %u stopSendCnt %u", tid, sleepTime, 288 fileProcessCnt, peerInfo->rateStateInterval, stopCnt, session->stopSendCnt[tid]); 289 #endif 290 (void)usleep(sleepTime); 291 } 292 } 293 SendDataFrame(DFileSession * session,List * unsent,uint32_t tid,uint8_t socketIndex)294 int32_t SendDataFrame(DFileSession *session, List *unsent, uint32_t tid, uint8_t socketIndex) 295 { 296 int32_t ret = NSTACKX_EOK; 297 PeerInfo *peerInfo = ClientGetPeerInfoBySocketIndex(socketIndex, session); 298 List tmpq; 299 300 if (peerInfo == NULL) { 301 return NSTACKX_EFAILED; 302 } 303 if (peerInfo->amendSendRate == 0) { 304 return ret; 305 } 306 307 CheckSendByBackPress(session, tid, socketIndex); 308 309 int32_t maxCount = GetMaxSendCount(); 310 int32_t count = CheckUnsentList(unsent, &tmpq, maxCount); 311 ret = DoSendDataFrame(session, &tmpq, count, tid, socketIndex); 312 if (ret == NSTACKX_EAGAIN) { 313 ListMove(&tmpq, unsent); 314 } 315 return ret; 316 } 317 SendControlFrame(DFileSession * session,QueueNode * queueNode)318 int32_t SendControlFrame(DFileSession *session, QueueNode *queueNode) 319 { 320 int32_t ret; 321 Socket *socket = NULL; 322 323 if (CapsTcp(session)) { 324 socket = (session->sessionType == DFILE_SESSION_TYPE_SERVER) ? session->acceptSocket : session->socket[0]; 325 ret = SocketSend(socket, queueNode->frame + queueNode->sendLen, queueNode->length - queueNode->sendLen); 326 if (ret > 0 && ret == (int32_t)(queueNode->length - queueNode->sendLen)) { 327 queueNode->sendLen = 0; 328 } else if (ret > 0) { 329 queueNode->sendLen = queueNode->sendLen + (uint32_t)ret; 330 ret = NSTACKX_EAGAIN; 331 } else if (errno == EAGAIN) { 332 ret = NSTACKX_EAGAIN; 333 } else { 334 DFILE_LOGE(TAG, "socket send failed ret is %d errno is %d", ret, errno); 335 ret = NSTACKX_EFAILED; 336 } 337 return ret; 338 } 339 340 uint8_t socketIndex = queueNode->socketIndex; 341 ret = SocketSend(session->socket[socketIndex], queueNode->frame, queueNode->length); 342 if (ret <= 0) { 343 if (ret != NSTACKX_EAGAIN) { 344 DFILE_LOGE(TAG, "MpEscape. socket:%u send failed. Errno:%d", socketIndex, errno); 345 ret = NSTACKX_EFAILED; 346 } 347 } 348 349 return ret; 350 } 351 SendOutboundFrame(DFileSession * session,QueueNode ** preQueueNode)352 int32_t SendOutboundFrame(DFileSession *session, QueueNode **preQueueNode) 353 { 354 QueueNode *queueNode = *preQueueNode; 355 int32_t ret; 356 357 do { 358 if (PthreadMutexLock(&session->outboundQueueLock) != 0) { 359 DFILE_LOGE(TAG, "Pthread mutex lock failed"); 360 ret = NSTACKX_EFAILED; 361 break; 362 } 363 if (queueNode == NULL && session->outboundQueueSize) { 364 queueNode = (QueueNode *)ListPopFront(&session->outboundQueue); 365 session->outboundQueueSize--; 366 } 367 if (PthreadMutexUnlock(&session->outboundQueueLock) != 0) { 368 DFILE_LOGE(TAG, "Pthread mutex unlock failed"); 369 ret = NSTACKX_EFAILED; 370 break; 371 } 372 if (queueNode == NULL) { 373 ret = NSTACKX_EOK; 374 break; 375 } 376 377 uint32_t socketIndex = queueNode->socketIndex; 378 if (session->socket[socketIndex]->protocol == NSTACKX_PROTOCOL_UDP && 379 session->socket[socketIndex]->isServer == NSTACKX_TRUE) { 380 session->socket[socketIndex]->dstAddr = queueNode->peerAddr; 381 } 382 383 ret = SendControlFrame(session, queueNode); 384 if (ret <= 0) { 385 break; 386 } 387 /* Send ok, try to get next frame. */ 388 DestroyQueueNode(queueNode); 389 queueNode = NULL; 390 NSTACKX_ATOM_FETCH_INC(&session->totalSendBlocks); 391 } while (!session->closeFlag); 392 393 if (ret == NSTACKX_EAGAIN) { 394 *preQueueNode = queueNode; 395 } else { 396 *preQueueNode = NULL; 397 DestroyQueueNode(queueNode); 398 queueNode = NULL; 399 } 400 return ret; 401 } 402 TcpSocketRecv(DFileSession * session,uint8_t * buffer,size_t length,struct sockaddr_in * srcAddr,const socklen_t * addrLen)403 int32_t TcpSocketRecv(DFileSession *session, uint8_t *buffer, size_t length, struct sockaddr_in *srcAddr, 404 const socklen_t *addrLen) 405 { 406 int32_t ret; 407 int recvLen = 0; 408 409 Socket *socket = session->socket[0]; 410 411 if (session->sessionType == DFILE_SESSION_TYPE_SERVER) { 412 socket = session->acceptSocket; 413 } 414 415 while (recvLen < (int32_t)length) { 416 ret = SocketRecv(socket, buffer + session->recvLen, length - (size_t)recvLen, srcAddr, addrLen); 417 if (ret == 0) { 418 return NSTACKX_PEER_CLOSE; 419 } 420 if (ret < 0) { 421 if (errno != EAGAIN) { 422 ret = NSTACKX_EFAILED; 423 return ret; 424 } else { 425 return NSTACKX_EAGAIN; 426 } 427 } 428 recvLen = recvLen + ret; 429 session->recvLen = session->recvLen + (uint32_t)ret; 430 } 431 432 return recvLen; 433 } 434 SocketRecvForTcp(DFileSession * session,uint8_t * buffer,struct sockaddr_in * srcAddr,const socklen_t * addrLen)435 int32_t SocketRecvForTcp(DFileSession *session, uint8_t *buffer, struct sockaddr_in *srcAddr, 436 const socklen_t *addrLen) 437 { 438 int32_t ret; 439 uint16_t payloadLen; 440 DFileFrameHeader *frameHeader = NULL; 441 size_t length = sizeof(DFileFrameHeader); 442 if (session->recvLen < length) { 443 ret = TcpSocketRecv(session, buffer, length - session->recvLen, srcAddr, addrLen); 444 if (ret <= 0) { 445 return ret; 446 } 447 } 448 449 frameHeader = (DFileFrameHeader *)(session->recvBuffer); 450 payloadLen = ntohs(frameHeader->length); 451 if (payloadLen >= NSTACKX_RECV_BUFFER_LEN) { 452 DFILE_LOGI(TAG, "header length is %u recv length is %u payloadLen is %u type %u", length, 453 session->recvLen, payloadLen, frameHeader->type); 454 return NSTACKX_EFAILED; 455 } 456 457 if ((session->recvLen - length) < payloadLen) { 458 ret = TcpSocketRecv(session, buffer, payloadLen - (session->recvLen - length), srcAddr, addrLen); 459 if (ret <= 0) { 460 return ret; 461 } 462 } 463 464 return (int32_t)(session->recvLen); 465 } 466