1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "spunge_core.h"
17 #ifdef FILLP_LINUX
18 #include <errno.h>
19 #endif
20 #include <stdio.h>
21 #include "securec.h"
22 #include "sysio.h"
23 #include "res.h"
24 #include "socket_common.h"
25 #include "fillp_flow_control.h"
26 #include "timing_wheel.h"
27 #include "fillp_buf_item.h"
28 #include "callbacks.h"
29 #include "fillp_common.h"
30 #include "spunge.h"
31 #include "spunge_stack.h"
32 #include "spunge_message.h"
33 #include "fillp_output.h"
34 #include "fillp_input.h"
35 #include "fillp_dfx.h"
36
37 #ifdef __cplusplus
38 extern "C" {
39 #endif
40
41 SYS_ARCH_SEM g_resDeinitSem;
42 #define BIT_MOVE_CNT 3
43 #define RECV_RATE_PAR_LOW 0.98
44 #define RECV_RATE_PAT_HIGH 1.02
45 #define RECV_STATE_THRESHOLD 10
46
47 void SpungeFreeInstanceResource(struct SpungeInstance *inst);
48
49
SpungeDoRecvCycle(struct SockOsSocket * osSock,struct SpungeInstance * inst)50 void SpungeDoRecvCycle(struct SockOsSocket *osSock, struct SpungeInstance *inst)
51 {
52 FILLP_UINT32 i;
53 struct NetBuf buf;
54 struct SpungePcb *spcb = FILLP_NULL_PTR;
55
56 if (!OS_SOCK_OPS_FUNC_VALID(osSock, fetchPacket)) {
57 return;
58 }
59
60 (void)memset_s(&buf, sizeof(buf), 0, sizeof(buf));
61 buf.p = inst->tmpBuf[0];
62 for (i = 0; i < g_resource.udp.rxBurst; i++) {
63 spcb = osSock->ioSock->ops->fetchPacket((void *)osSock, (void *)&buf, 0);
64 if (spcb != FILLP_NULL_PTR) {
65 FillpDoInput(&spcb->fpcb, &buf, inst);
66 continue;
67 } else {
68 break;
69 }
70 }
71 }
72
SpungeCalExpectedBytes(FILLP_UINT32 * sendPktNum,struct SpungePcb * pcb,struct FtSocket * sock,struct FillpFlowControl * flowControl,FILLP_LLONG detaTime)73 static FILLP_UINT32 SpungeCalExpectedBytes(FILLP_UINT32 *sendPktNum, struct SpungePcb *pcb,
74 struct FtSocket *sock, struct FillpFlowControl *flowControl, FILLP_LLONG detaTime)
75 {
76 FILLP_UINT32 bytesExpected;
77 FILLP_UINT32 pktNum = sock->resConf.udp.txBurst;
78
79 if (flowControl->sendInterval) {
80 pktNum = (FILLP_UINT32)(detaTime / flowControl->sendInterval);
81 }
82
83 if (pktNum <= (sock->resConf.udp.txBurst)) {
84 /* sendRate is kbps */
85 FILLP_ULLONG bitsExpected = (FILLP_ULLONG)(detaTime * flowControl->sendRate / FILLP_ONE_SECOND);
86 bitsExpected >>= FILLP_TIME_PRECISION;
87 bytesExpected = (FILLP_UINT32)(FILLP_UTILS_BIT2BYTE(bitsExpected));
88 pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
89 bytesExpected += pcb->fpcb.send.flowControl.remainBytes;
90 } else {
91 pktNum = sock->resConf.udp.txBurst;
92 bytesExpected = (FILLP_UINT32)(pktNum * pcb->fpcb.pktSize);
93 pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
94 }
95 *sendPktNum = pktNum;
96 FILLP_LOGDBG("before_send_cycle fillp_sock_id:%d unRecvNum:%u, unAck:%u\r\n",
97 sock->index, pcb->fpcb.send.unrecvList.nodeNum, pcb->fpcb.send.unackList.count);
98 return bytesExpected;
99 }
100
SpungeDoSendUpdate(struct SpungePcb * pcb,FILLP_UINT32 sendBytes,FILLP_UINT32 bytesExpected)101 static void SpungeDoSendUpdate(struct SpungePcb *pcb, FILLP_UINT32 sendBytes, FILLP_UINT32 bytesExpected)
102 {
103 if ((sendBytes > 0) && (bytesExpected >= (FILLP_UINT32)sendBytes)) {
104 pcb->fpcb.statistics.traffic.packSendBytes += sendBytes;
105 pcb->fpcb.send.flowControl.remainBytes = (bytesExpected - (FILLP_UINT32)sendBytes);
106 } else {
107 pcb->fpcb.send.flowControl.remainBytes = 0;
108 }
109 }
110
SpungeDoSendCycle(struct SpungePcb * pcb,struct SpungeInstance * inst,FILLP_LLONG detaTime)111 void SpungeDoSendCycle(struct SpungePcb *pcb, struct SpungeInstance *inst, FILLP_LLONG detaTime)
112 {
113 FILLP_UINT32 sendPktNum;
114 FILLP_UINT32 sendBytes = 0;
115 FILLP_UINT32 tmpBytes = 0;
116 FILLP_UINT32 bytesExpected;
117
118 if ((pcb == FILLP_NULL_PTR) || (pcb->conn == FILLP_NULL_PTR)) {
119 FILLP_LOGERR("NULL Pointer");
120 return;
121 }
122
123 FILLP_SIZE_T pktSize = pcb->fpcb.pktSize;
124 struct FillpFlowControl *flowControl = &pcb->fpcb.send.flowControl;
125 struct FtNetconn *conn = (struct FtNetconn *)pcb->conn;
126 struct FtSocket *sock = (struct FtSocket *)conn->sock;
127
128 if (sock == FILLP_NULL_PTR) {
129 FILLP_LOGERR("NULL Pointer");
130 return;
131 }
132
133 flowControl->sendTime = inst->curTime;
134 bytesExpected = SpungeCalExpectedBytes(&sendPktNum, pcb, sock, flowControl, detaTime);
135
136 /* flow control alg may need to change bytesExpected for the this send cycle a according to current status */
137 if (pcb->fpcb.algFuncs.updateExpectSendBytes != FILLP_NULL_PTR) {
138 pcb->fpcb.algFuncs.updateExpectSendBytes(&pcb->fpcb, &bytesExpected);
139 }
140
141 /* If BytesExpected less than pktSize, no need to send, just store the remainBytes */
142 if (bytesExpected >= pktSize) {
143 /* Make sure that the send bytes won't more than bytesExpected */
144 tmpBytes = (FILLP_UINT32)(bytesExpected - pktSize);
145
146 sendBytes = FillpSendOne(&pcb->fpcb, tmpBytes, sendPktNum);
147 SpungeDoSendUpdate(pcb, sendBytes, bytesExpected);
148 } else {
149 pcb->fpcb.send.flowControl.remainBytes = bytesExpected;
150 }
151
152 FILLP_LOGDBG("after_send_cycle: fillp_sock_id:%d expected bytes:%u sentBytes:%u remain:%u \r\n",
153 sock->index, sendBytes, tmpBytes, pcb->fpcb.send.flowControl.remainBytes);
154
155 if ((pcb->fpcb.send.flowControl.remainBytes) || (!HLIST_EMPTY(&pcb->fpcb.send.unSendList)) ||
156 (pcb->fpcb.send.redunList.nodeNum) || (pcb->fpcb.send.unrecvList.nodeNum)) {
157 FillpEnableSendTimer(&pcb->fpcb);
158 } else {
159 FillpDisableSendTimer(&pcb->fpcb);
160 }
161
162 return;
163 }
164
SpungeDestroySockTableSocket(struct FtSocketTable * table,int tableIndex)165 static void SpungeDestroySockTableSocket(struct FtSocketTable *table, int tableIndex)
166 {
167 struct FtSocket *sock = FILLP_NULL_PTR;
168
169 if (table == FILLP_NULL_PTR) {
170 return;
171 }
172
173 sock = table->sockPool[tableIndex];
174 if (sock == FILLP_NULL_PTR) {
175 return;
176 }
177 (void)SYS_ARCH_RWSEM_DESTROY(&sock->sockConnSem);
178 (void)SYS_ARCH_SEM_DESTROY(&sock->connBlockSem);
179 (void)SYS_ARCH_SEM_DESTROY(&sock->sockCloseProtect);
180 (void)SYS_ARCH_SEM_DESTROY(&sock->epollTaskListLock);
181 SpungeFree(sock, SPUNGE_ALLOC_TYPE_CALLOC);
182 table->sockPool[tableIndex] = FILLP_NULL_PTR;
183 }
184
185 /* SFT */
SpungeCreateSockTable(FILLP_UINT maxSock)186 struct FtSocketTable *SpungeCreateSockTable(FILLP_UINT maxSock)
187 {
188 int i;
189 struct FtSocketTable *table;
190 table = (struct FtSocketTable *)SpungeAlloc(1, sizeof(struct FtSocketTable), SPUNGE_ALLOC_TYPE_CALLOC);
191 if (table == FILLP_NULL_PTR) {
192 FILLP_LOGERR("Failed to allocate memory for socket table \r\n");
193 return FILLP_NULL_PTR;
194 }
195
196 table->freeQueqe = FillpQueueCreate("sock_free_table", (FILLP_SIZE_T)maxSock, SPUNGE_ALLOC_TYPE_CALLOC);
197
198 if (table->freeQueqe == FILLP_NULL_PTR) {
199 FILLP_LOGERR("Fail to create socket table free queue");
200 goto ERR_FAIL;
201 }
202
203 FillpQueueSetConsSafe(table->freeQueqe, FILLP_TRUE);
204 FillpQueueSetProdSafe(table->freeQueqe, FILLP_TRUE);
205
206 table->sockPool =
207 (struct FtSocket **)SpungeAlloc(maxSock, (FILLP_SIZE_T)sizeof(struct FtSocket *), SPUNGE_ALLOC_TYPE_CALLOC);
208 if (table->sockPool == FILLP_NULL_PTR) {
209 FILLP_LOGERR("Failed to allocate memory for sockPool of socket table");
210 goto ERR_FAIL;
211 }
212
213 table->size = (FILLP_INT)maxSock;
214 SYS_ARCH_ATOMIC_SET(&table->used, 0);
215 for (i = 0; i < table->size; i++) {
216 table->sockPool[i] = FILLP_NULL_PTR;
217 }
218
219 return table;
220
221 ERR_FAIL:
222 if (table->freeQueqe != FILLP_NULL_PTR) {
223 FillpQueueDestroy(table->freeQueqe);
224 table->freeQueqe = FILLP_NULL_PTR;
225 }
226
227 if (table->sockPool != FILLP_NULL_PTR) {
228 SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
229 table->sockPool = FILLP_NULL_PTR;
230 }
231
232 SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
233
234 return FILLP_NULL_PTR;
235 }
236
237
238 /* SFT */
SpungeDestroySockTable(struct FtSocketTable * table)239 void SpungeDestroySockTable(struct FtSocketTable *table)
240 {
241 FILLP_INT i;
242
243 for (i = 0; i < SYS_ARCH_ATOMIC_READ(&table->used); i++) {
244 SpungeDestroySockTableSocket(table, i);
245 }
246
247 if (table->freeQueqe != FILLP_NULL_PTR) {
248 FillpQueueDestroy(table->freeQueqe);
249 table->freeQueqe = FILLP_NULL_PTR;
250 }
251
252 if (table->sockPool != FILLP_NULL_PTR) {
253 SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
254 table->sockPool = FILLP_NULL_PTR;
255 }
256
257 /* NULL check for table already done at the caller, and also in the above
258 check, table is dereferenced without validating, so need to check for NULL
259 again here before freeing it */
260 SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
261 }
262
SpungeInstMsgBoxInit(struct SpungeInstance * inst)263 static FILLP_INT SpungeInstMsgBoxInit(struct SpungeInstance *inst)
264 {
265 (void)SYS_ARCH_ATOMIC_SET(&inst->msgUsingCount, 0);
266 inst->msgBox = FillpQueueCreate("spunge_msg_box", g_spunge->resConf.maxMsgItemNum, SPUNGE_ALLOC_TYPE_MALLOC);
267 if (inst->msgBox == FILLP_NULL_PTR) {
268 FILLP_LOGERR("Init inst->msgBox Fail");
269 return ERR_NORES;
270 }
271
272 FillpQueueSetConsSafe(inst->msgBox, FILLP_TRUE);
273 FillpQueueSetProdSafe(inst->msgBox, FILLP_TRUE);
274
275 inst->msgPool = SpungeMsgCreatePool(FILLP_MSG_ITEM_INIT_NUM, (int)g_spunge->resConf.maxMsgItemNum);
276 if (inst->msgPool == FILLP_NULL_PTR) {
277 FILLP_LOGERR("create msg pool fail");
278 return ERR_NORES;
279 }
280
281 DympSetConsSafe(inst->msgPool, FILLP_TRUE);
282 DympSetProdSafe(inst->msgPool, FILLP_TRUE);
283 return ERR_OK;
284 }
285
SpungeInstSendInit(struct SpungeInstance * inst)286 static FILLP_INT SpungeInstSendInit(struct SpungeInstance *inst)
287 {
288 int i;
289
290 /* To control on client sending */
291 inst->rateControl.connectionNum = FILLP_NULL;
292
293 inst->rateControl.recv.maxRate = g_resource.flowControl.maxRecvRate;
294
295 /* To control on server sending */
296 inst->rateControl.send.maxRate = g_resource.flowControl.maxRate;
297
298 inst->thresdSemInited = FILLP_FALSE;
299 int ret = SYS_ARCH_SEM_INIT(&inst->threadSem, 1);
300 if (ret != FILLP_OK) {
301 FILLP_LOGERR("SYS_ARCH_SEM_INIT fails");
302 return ERR_NORES;
303 }
304 inst->thresdSemInited = FILLP_TRUE;
305
306 inst->unsendItem =
307 SpungeAlloc(FILLP_UNSEND_BOX_LOOP_CHECK_BURST, sizeof(struct FillpPcbItem *), SPUNGE_ALLOC_TYPE_CALLOC);
308 if (inst->unsendItem == FILLP_NULL_PTR) {
309 FILLP_LOGERR("inst->unsendItem NULL");
310 return ERR_NORES;
311 }
312
313 for (i = 0; i < FILLP_VLEN; i++) {
314 inst->tmpBuf[i] = SpungeAlloc(1, (sizeof(FILLP_CHAR) * FILLP_MAX_PKT_SIZE), SPUNGE_ALLOC_TYPE_MALLOC);
315 if (inst->tmpBuf[i] == FILLP_NULL_PTR) {
316 FILLP_LOGERR("inst->tmpBuf[%d] is NULL", i);
317 return ERR_NORES;
318 }
319 }
320
321 HLIST_INIT(&inst->sendPcbList);
322 for (i = 0; i < FILLP_INST_UNSEND_BOX_NUM; i++) {
323 inst->unsendBox[i] = FillpQueueCreate("socket_send_box", FILLP_INST_UNSEND_BOX_SIZE, SPUNGE_ALLOC_TYPE_MALLOC);
324 if (inst->unsendBox[i] == FILLP_NULL_PTR) {
325 FILLP_LOGERR("inst->unsendBox[%d] is NULL", i);
326 return ERR_NORES;
327 }
328
329 FillpQueueSetConsSafe(inst->unsendBox[i], FILLP_FALSE);
330 FillpQueueSetProdSafe(inst->unsendBox[i], FILLP_TRUE);
331 }
332 return ERR_OK;
333 }
334
SpungeInstTimerInit(struct SpungeInstance * inst)335 static void SpungeInstTimerInit(struct SpungeInstance *inst)
336 {
337 inst->curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
338
339 (void)memset_s(&inst->macInfo, sizeof(FillpMacInfo), 0, sizeof(FillpMacInfo));
340 FillpMacTimerExpire(&inst->macInfo, inst->curTime);
341
342 FillpTimingWheelInit(&inst->timingWheel, FILLP_TIMING_WHEEL_ACCURACY);
343
344 /* Init the global timers */
345 FtGlobalTimerInit(inst);
346 SpungeInitTokenBucket(inst);
347 }
348
SpungeThreadInit(struct SpungeInstance * inst)349 static FILLP_INT SpungeThreadInit(struct SpungeInstance *inst)
350 {
351 FILLP_THREAD threadId;
352
353 inst->mainThreadParam.func = SpungeInstanceMainThread;
354 inst->mainThreadParam.param = inst;
355 inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
356
357 inst->hasInited = FILLP_TRUE;
358
359 (void)FILLP_SYS_START_NEWTHREAD(&inst->mainThreadParam, &threadId);
360
361 return ERR_OK;
362 }
363
SpungeInstInit(struct SpungeInstance * inst)364 FILLP_INT SpungeInstInit(struct SpungeInstance *inst)
365 {
366 FILLP_INT err;
367
368 if (inst == FILLP_NULL_PTR) {
369 FILLP_LOGERR("Init inst null");
370 return ERR_NULLPTR;
371 }
372
373 if (inst->hasInited) {
374 FILLP_LOGERR("Stack has been inited");
375 return ERR_OK;
376 }
377
378 err = SpungeInstMsgBoxInit(inst);
379 if (err != ERR_OK) {
380 goto FAIL;
381 }
382
383 HLIST_INIT(&inst->osSockist);
384 HLIST_INIT(&inst->pcbList.list);
385
386 err = SpungeInstSendInit(inst);
387 if (err != ERR_OK) {
388 goto FAIL;
389 }
390
391 SpungeInstTimerInit(inst);
392
393 inst->cleanseDataCtr = 0;
394
395 err = SpungeThreadInit(inst);
396 if (err != ERR_OK) {
397 goto FAIL;
398 }
399
400 return ERR_OK;
401
402 FAIL:
403 SpungeFreeInstanceResource(inst);
404 return err;
405 }
406
SpungeSysCallRegisted(void)407 static FILLP_INT SpungeSysCallRegisted(void)
408 {
409 FILLP_INT ret;
410
411 ret = FillpValidateFuncPtr(&g_fillpOsBasicLibFun, sizeof(FillpSysLibBasicCallbackFuncSt));
412 if (ret != ERR_OK) {
413 SET_ERRNO(FILLP_EINVAL);
414 FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsBasicLibFun failed");
415 return ret;
416 }
417
418 ret = FillpValidateFuncPtr(&g_fillpOsSemLibFun, sizeof(FillpSysLibSemCallbackFuncSt));
419 if (ret != ERR_OK) {
420 SET_ERRNO(FILLP_EINVAL);
421 FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsSemLibFun failed");
422 return ret;
423 }
424
425 ret = FillpValidateFuncPtr(&g_fillpOsSocketLibFun, sizeof(FillpSysLibSockCallbackFuncSt));
426 if (ret != ERR_OK) {
427 SET_ERRNO(FILLP_EINVAL);
428 FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsSocketLibFun failed");
429 return ret;
430 }
431
432 return ERR_OK;
433 }
434
FtFreeEpollResource(void)435 static void FtFreeEpollResource(void)
436 {
437 if (g_spunge->epitemPool != FILLP_NULL_PTR) {
438 DympDestroyPool(g_spunge->epitemPool);
439 g_spunge->epitemPool = FILLP_NULL_PTR;
440 }
441
442 if (g_spunge->eventpollPool != FILLP_NULL_PTR) {
443 DympDestroyPool(g_spunge->eventpollPool);
444 g_spunge->eventpollPool = FILLP_NULL_PTR;
445 }
446 }
447
FtAllocateEpollResource(void)448 static FILLP_INT FtAllocateEpollResource(void)
449 {
450 DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
451 g_spunge->epitemPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
452 sizeof(struct EpItem), FILLP_TRUE, &itemOperaCb);
453 if (g_spunge->epitemPool == FILLP_NULL_PTR) {
454 FILLP_LOGERR("create mem pool for g_spunge->epitemPool failed");
455 return ERR_NORES;
456 }
457 DympSetConsSafe(g_spunge->epitemPool, FILLP_TRUE);
458 DympSetProdSafe(g_spunge->epitemPool, FILLP_TRUE);
459
460 g_spunge->eventpollPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
461 sizeof(struct EventPoll), FILLP_TRUE, &itemOperaCb);
462 if (g_spunge->eventpollPool == FILLP_NULL_PTR) {
463 FtFreeEpollResource();
464 FILLP_LOGERR("create Dym pool for g_spunge->eventpollPool failed");
465 return ERR_NORES;
466 }
467 DympSetConsSafe(g_spunge->eventpollPool, FILLP_TRUE);
468 DympSetProdSafe(g_spunge->eventpollPool, FILLP_TRUE);
469 return ERR_OK;
470 }
471
SpungeAllocInstRes(void)472 static int SpungeAllocInstRes(void)
473 {
474 FILLP_UINT i;
475 FILLP_UINT j;
476 FILLP_INT err;
477
478 for (i = 0; i < g_spunge->insNum; i++) {
479 (void)memset_s(&g_spunge->instPool[i], sizeof(struct SpungeInstance), 0, sizeof(struct SpungeInstance));
480 g_spunge->instPool[i].instIndex = (FILLP_INT)i;
481 err = SpungeInstInit(&g_spunge->instPool[i]);
482 if (err == ERR_OK) {
483 continue;
484 }
485 FILLP_LOGERR("SpungeInstInit failed :: Instance number :: %u", i);
486
487 /* Release instances which are created success */
488 if (i > 0) {
489 g_spunge->insNum = i;
490
491 g_spunge->hasDeinitBlked = FILLP_TRUE;
492 for (j = 0; j < g_spunge->insNum; j++) {
493 g_spunge->instPool[j].waitTobeCoreKilled = FILLP_TRUE;
494 }
495
496 /* After this step g_spunge will be freed, it should not be accessed, caller has check for NULL pointer
497 before accessing, so it will not cause problem */
498 (void)SYS_ARCH_SEM_WAIT(&g_resDeinitSem);
499 }
500 return err;
501 }
502
503 return ERR_OK;
504 }
505
SpungeFreeInstSendRecv(struct SpungeInstance * inst)506 static void SpungeFreeInstSendRecv(struct SpungeInstance *inst)
507 {
508 int j;
509 if (inst->thresdSemInited) {
510 (void)SYS_ARCH_SEM_DESTROY(&inst->threadSem);
511 inst->thresdSemInited = FILLP_FALSE;
512 }
513
514 for (j = 0; j < FILLP_INST_UNSEND_BOX_NUM; j++) {
515 if (inst->unsendBox[j] != FILLP_NULL_PTR) {
516 FillpQueueDestroy(inst->unsendBox[j]);
517 inst->unsendBox[j] = FILLP_NULL_PTR;
518 } else {
519 break;
520 }
521 }
522 if (inst->unsendItem != FILLP_NULL_PTR) {
523 SpungeFree(inst->unsendItem, SPUNGE_ALLOC_TYPE_CALLOC);
524 inst->unsendItem = FILLP_NULL_PTR;
525 }
526
527 for (j = 0; j < FILLP_VLEN; j++) {
528 if (inst->tmpBuf[j] == FILLP_NULL_PTR) {
529 break;
530 }
531 SpungeFree(inst->tmpBuf[j], SPUNGE_ALLOC_TYPE_MALLOC);
532 inst->tmpBuf[j] = FILLP_NULL_PTR;
533 }
534 }
535
SpungeFreeInstanceResource(struct SpungeInstance * inst)536 void SpungeFreeInstanceResource(struct SpungeInstance *inst)
537 {
538 if (inst == FILLP_NULL_PTR) {
539 return;
540 }
541
542 if (inst->msgBox != FILLP_NULL_PTR) {
543 FillpQueueDestroy(inst->msgBox);
544 inst->msgBox = FILLP_NULL_PTR;
545 }
546
547 while (SYS_ARCH_ATOMIC_READ(&inst->msgUsingCount) > 0) {
548 FILLP_SLEEP_MS(1);
549 }
550
551 if (inst->msgPool != FILLP_NULL_PTR) {
552 SpungeMsgPoolDestroy(inst->msgPool);
553 inst->msgPool = FILLP_NULL_PTR;
554 }
555
556 SpungeFreeInstSendRecv(inst);
557
558 inst->hasInited = FILLP_FALSE;
559 }
560
FtGetSpungeRes(struct SpungeResConf * resConf)561 static void FtGetSpungeRes(struct SpungeResConf *resConf)
562 {
563 (void)memset_s(resConf, sizeof(struct SpungeResConf), 0, sizeof(struct SpungeResConf));
564
565 resConf->maxInstNum = (FILLP_UINT)UTILS_MIN(g_resource.common.maxInstNum, MAX_SPUNGEINSTANCE_NUM);
566 resConf->maxSockNum = g_resource.common.maxSockNum;
567 resConf->maxConnNum = g_resource.common.maxConnNum;
568 resConf->maxMsgItemNum = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_SPUNGE_EVENTG_MULT_NUM);
569 resConf->maxTimerItemNum = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
570 resConf->maxEpollEventNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
571 resConf->maxEpollItemNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
572 }
573
FtGlobalTimerInit(struct SpungeInstance * inst)574 void FtGlobalTimerInit(struct SpungeInstance *inst)
575 {
576 /* Initialize the Fairness timer */
577 inst->fairTimerNode.cbNode.cb = SpinstLoopFairnessChecker;
578 inst->fairTimerNode.cbNode.arg = (void *)inst;
579 inst->fairTimerNode.interval = SPUNGE_WEIGHT_ADJUST_INTERVAL;
580 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
581 &inst->fairTimerNode);
582 /* Initialize the MAC timer */
583 inst->macTimerNode.cbNode.cb = SpinstLoopMacTimerChecker;
584 inst->macTimerNode.cbNode.arg = (void *)inst;
585 inst->macTimerNode.interval = FILLP_KEY_REFRESH_TIME;
586 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
587 &inst->macTimerNode);
588 }
589
SpungeCheckCallbacks(void)590 static FILLP_INT SpungeCheckCallbacks(void)
591 {
592 return SpungeSysCallRegisted();
593 }
594
FtInitGlobalUdpIo(void)595 static FILLP_INT FtInitGlobalUdpIo(void)
596 {
597 g_udpIo.readSet = FILLP_FD_CREATE_FD_SET();
598 if (g_udpIo.readSet == FILLP_NULL_PTR) {
599 FILLP_LOGERR("Malloc g_udpIo.readSet failed");
600 return ERR_NORES;
601 }
602
603 g_udpIo.readableSet = FILLP_FD_CREATE_FD_SET();
604 if (g_udpIo.readableSet == FILLP_NULL_PTR) {
605 FILLP_LOGERR("Malloc g_udpIo.readableSet failed");
606 return ERR_NORES;
607 }
608
609 HLIST_INIT(&g_udpIo.listenPcbList);
610
611 return ERR_OK;
612 }
613
FtInitGlobalInstPool(void)614 static FILLP_INT FtInitGlobalInstPool(void)
615 {
616 g_spunge->insNum = g_spunge->resConf.maxInstNum;
617 g_spunge->instPool = (struct SpungeInstance *)SpungeAlloc(g_spunge->insNum, sizeof(struct SpungeInstance),
618 SPUNGE_ALLOC_TYPE_MALLOC);
619 if (g_spunge->instPool == FILLP_NULL_PTR) {
620 FILLP_LOGERR("Malloc g_spunge->instPool failed");
621 return ERR_NORES;
622 }
623
624 return ERR_OK;
625 }
626
FtInitGlobalSockTable(void)627 static FILLP_INT FtInitGlobalSockTable(void)
628 {
629 g_spunge->sockTable = SpungeCreateSockTable(g_spunge->resConf.maxSockNum);
630 if (g_spunge->sockTable == FILLP_NULL_PTR) {
631 FILLP_LOGERR("Malloc g_spunge->sockTable failed");
632 return ERR_NORES;
633 }
634 return ERR_OK;
635 }
636
FtInitGlobalNetPool(void)637 static FILLP_INT FtInitGlobalNetPool(void)
638 {
639 FILLP_UINT netPoolInitSize = FILLP_CONN_ITEM_INIT_NUM;
640
641 if (netPoolInitSize > g_spunge->resConf.maxConnNum) {
642 netPoolInitSize = g_spunge->resConf.maxConnNum;
643 }
644
645 DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
646 g_spunge->netPool = DympCreatePool((FILLP_INT)netPoolInitSize, (int)g_spunge->resConf.maxConnNum,
647 sizeof(struct FtNetconn), FILLP_TRUE, &itemOperaCb);
648 if (g_spunge->netPool == FILLP_NULL_PTR) {
649 FILLP_LOGERR("Malloc g_spunge->netPool failed");
650 return ERR_NORES;
651 }
652
653 DympSetConsSafe(g_spunge->netPool, FILLP_TRUE);
654 DympSetProdSafe(g_spunge->netPool, FILLP_FALSE);
655 return ERR_OK;
656 }
657
FtFreeGlobalUdpIo(void)658 static void FtFreeGlobalUdpIo(void)
659 {
660 if (g_udpIo.readSet != FILLP_NULL_PTR) {
661 FILLP_FD_DESTROY_FD_SET(g_udpIo.readSet);
662 g_udpIo.readSet = FILLP_NULL_PTR;
663 }
664
665 if (g_udpIo.readableSet != FILLP_NULL_PTR) {
666 FILLP_FD_DESTROY_FD_SET(g_udpIo.readableSet);
667 g_udpIo.readableSet = FILLP_NULL_PTR;
668 }
669 }
670
FtFreeGlobalSpunge(void)671 static void FtFreeGlobalSpunge(void)
672 {
673 if (g_spunge == FILLP_NULL_PTR) {
674 return;
675 }
676 g_spunge->hasInited = FILLP_FALSE;
677
678 FtFreeEpollResource();
679
680 if (g_spunge->sockTable != FILLP_NULL_PTR) {
681 SpungeDestroySockTable(g_spunge->sockTable);
682 g_spunge->sockTable = FILLP_NULL_PTR;
683 }
684
685 if (g_spunge->netPool != FILLP_NULL_PTR) {
686 DympDestroyPool(g_spunge->netPool);
687 g_spunge->netPool = FILLP_NULL_PTR;
688 }
689
690 if (g_spunge->instPool != FILLP_NULL_PTR) {
691 SpungeFree(g_spunge->instPool, SPUNGE_ALLOC_TYPE_MALLOC);
692 g_spunge->instPool = FILLP_NULL_PTR;
693 }
694
695 FtFreeGlobalUdpIo();
696
697 SpungeFree(g_spunge, SPUNGE_ALLOC_TYPE_MALLOC);
698 g_spunge = FILLP_NULL_PTR;
699 }
700
FtModuleInit(void)701 static FILLP_INT FtModuleInit(void)
702 {
703 FILLP_INT err;
704 int ret;
705
706 err = FtInitGlobalUdpIo();
707 if (err != ERR_OK) {
708 return err;
709 }
710
711 err = FtInitGlobalInstPool();
712 if (err != ERR_OK) {
713 return err;
714 }
715
716 err = FtInitGlobalSockTable();
717 if (err != ERR_OK) {
718 return err;
719 }
720
721 err = FtInitGlobalNetPool();
722 if (err != ERR_OK) {
723 return err;
724 }
725
726 err = FtAllocateEpollResource();
727 if (err != ERR_OK) {
728 FILLP_LOGERR("Alloc epoll resource fail");
729 return err;
730 }
731
732 ret = SYS_ARCH_SEM_INIT(&g_resDeinitSem, 0);
733 if (ret != FILLP_OK) {
734 FILLP_LOGERR("deinit sem init failed. ");
735 return ERR_NORES;
736 }
737
738 err = SpungeAllocInstRes();
739 if (err != ERR_OK) {
740 FILLP_LOGERR("Spunge init instances resource fail");
741 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
742 return err;
743 }
744 return ERR_OK;
745 }
746
FtInit(void)747 FILLP_INT FtInit(void)
748 {
749 FILLP_INT err;
750
751 FILLP_LOGBUTT("init stack");
752 if (g_spunge != FILLP_NULL_PTR) {
753 FILLP_LOGERR("Init already done");
754 return ERR_STACK_ALREADY_INITIALD;
755 }
756
757 if (SpungeCheckCallbacks() != ERR_OK) {
758 FILLP_LOGERR("User has not registered system callback functions");
759 return ERR_ADP_SYS_CALLBACK_NOT_REGISTERED;
760 }
761
762 if (SYS_ARCH_INIT() != ERR_OK) {
763 FILLP_LOGERR("SYS_ARCH_INIT ssp failed");
764 return ERR_NORES;
765 }
766
767 g_spunge = (struct Spunge *)SpungeAlloc(1, sizeof(struct Spunge), SPUNGE_ALLOC_TYPE_MALLOC);
768 if (g_spunge == FILLP_NULL_PTR) {
769 FILLP_LOGERR("Alloc g_spunge fail");
770 return ERR_NORES;
771 }
772
773 (void)memset_s(g_spunge, sizeof(struct Spunge), FILLP_NULL_NUM, sizeof(struct Spunge));
774
775 FtGetSpungeRes(&g_spunge->resConf);
776
777 err = FtModuleInit();
778 if (err != ERR_OK) {
779 goto ERR_FAIL;
780 }
781
782 FILLP_LOGBUTT("FillP_init: Spunge mem_zone alloc finished!");
783
784 FILLP_LOGBUTT("FillP Core init success!");
785 FILLP_LOGBUTT("version " FILLP_VERSION);
786
787 g_spunge->traceFlag = 0;
788 g_spunge->hasInited = FILLP_TRUE;
789 FILLP_LOGBUTT("Init success");
790 return ERR_OK;
791
792 ERR_FAIL:
793 FtFreeGlobalSpunge();
794
795 FILLP_LOGERR("Init fail,clean up");
796 return err;
797 }
798
799
800 /* starts from LSB bit position, cnt starts from 0 */
801 #define SPUNGE_SET_BIT(num, pos) ((num) |= (1U << (pos)))
802
SpungZeroInstance(void)803 static void SpungZeroInstance(void)
804 {
805 FILLP_BOOL hasDeinitBlked = g_spunge->hasDeinitBlked;
806 /* This logic can work for 32 instance in future need to change if more number of
807 instance are supported */
808 /* instance 0 is already closed so mark in bit field. */
809 FILLP_UINT32 instBitClosed = 1;
810 FILLP_UINT32 i;
811 FILLP_UINT32 instAllBit = (FILLP_UINT32)((1U << g_spunge->insNum) - 1);
812
813 /* In case of blocking FtDestroy 0th instance should post semaphore after all instance threads are exited,
814 and all resources are release. In case on non blocking FtDestroy 0th instance should free free all
815 reasource no need to post semaphore and need to release semaphore also
816 Wait for other instance threads to release respective resource and exit thread */
817 while (instBitClosed != instAllBit) {
818 FILLP_SLEEP_MS(1);
819 for (i = 1; i < g_spunge->insNum; i++) {
820 if (g_spunge->instPool[i].hasInited == 0) {
821 /* Mark as closed */
822 SPUNGE_SET_BIT(instBitClosed, i);
823 }
824 }
825 }
826
827 /* Free all global resource and reset parameters */
828 InitGlobalResourceDefault();
829 InitGlobalAppResourceDefault();
830 FtFreeGlobalSpunge();
831 FillpSysOsDeinit();
832 FillpDfxDoEvtCbSet(FILLP_NULL_PTR, FILLP_NULL_PTR);
833
834 /* Signal or release deinit sem */
835 if (hasDeinitBlked) {
836 (void)SYS_ARCH_SEM_POST(&g_resDeinitSem);
837 } else {
838 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
839 }
840 }
841
SpungeDestroyInstance(struct SpungeInstance * inst)842 void SpungeDestroyInstance(struct SpungeInstance *inst)
843 {
844 FILLP_INT instIdx = inst->instIndex;
845
846 SpungeFreeInstanceResource(inst);
847
848 if (instIdx == 0) {
849 SpungZeroInstance();
850 }
851
852 FILLP_LOGERR("Destroy finish index: %d", instIdx);
853 }
854
FtDestroyInner(FILLP_INT block)855 static void FtDestroyInner(FILLP_INT block)
856 {
857 FILLP_UINT i;
858 FILLP_LOGERR("Destroy stack start, block(%d)", block);
859
860 if ((g_spunge == FILLP_NULL_PTR) || (!g_spunge->hasInited)) {
861 return;
862 }
863
864 g_spunge->hasDeinitBlked = (FILLP_BOOL)block;
865
866 /*
867 * should check g_spunge again,
868 * because the g_spunge may be freed in main thread after all the inst is freed
869 */
870 for (i = 0; g_spunge != FILLP_NULL_PTR && i < g_spunge->insNum; i++) {
871 (void)SYS_ARCH_SEM_WAIT(&g_spunge->instPool[i].threadSem);
872 g_spunge->instPool[i].waitTobeCoreKilled = FILLP_TRUE;
873 (void)SYS_ARCH_SEM_POST(&g_spunge->instPool[i].threadSem);
874 }
875
876 if ((block) && (SYS_ARCH_SEM_WAIT(&g_resDeinitSem) == 0)) {
877 (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
878 }
879
880 FILLP_LOGERR("Destroy finished");
881 return;
882 }
883
FtDestroy(void)884 void FtDestroy(void)
885 {
886 FtDestroyInner(FILLP_TRUE);
887 }
888
FtDestroyNonblock(void)889 void FtDestroyNonblock(void)
890 {
891 FtDestroyInner(FILLP_FALSE);
892 }
893
SpungeHandleMsgCycle(struct SpungeInstance * inst)894 void SpungeHandleMsgCycle(struct SpungeInstance *inst)
895 {
896 struct SpungeMsg *msg = FILLP_NULL_PTR;
897 FILLP_INT ret;
898 FILLP_ULONG i;
899
900 FILLP_ULONG boxItems = FillpQueueValidOnes(inst->msgBox);
901 if ((boxItems == 0) || (boxItems > inst->msgBox->size)) {
902 boxItems = (FILLP_ULONG)inst->msgBox->size;
903 }
904
905 for (i = 0; i < boxItems; i++) {
906 ret = FillpQueuePop(inst->msgBox, (void *)&msg, 1);
907 if (ret <= 0) {
908 break;
909 }
910 if (msg->msgType < MSG_TYPE_END) {
911 g_msgHandler[msg->msgType](msg->value, inst);
912 }
913 if (!msg->block) {
914 DympFree(msg);
915 } else {
916 (void)SYS_ARCH_SEM_POST(&msg->syncSem);
917 }
918 }
919 }
920
SpungeLoopCheckUnsendBox(struct SpungeInstance * inst)921 static void SpungeLoopCheckUnsendBox(struct SpungeInstance *inst)
922 {
923 int j;
924 FillpQueue *boxQueue = inst->unsendBox[0];
925 struct FillpPcbItem **item = inst->unsendItem;
926 struct FtNetconn *netconn = FILLP_NULL_PTR;
927 struct FillpPcb *fpcb = FILLP_NULL_PTR;
928 FILLP_INT count;
929
930 count = FillpQueuePop(boxQueue, (void *)item, FILLP_UNSEND_BOX_LOOP_CHECK_BURST);
931 if (count <= 0) {
932 return;
933 }
934
935 for (j = 0; j < count; j++) {
936 netconn = (struct FtNetconn *)item[j]->netconn;
937 if (netconn == FILLP_NULL_PTR) {
938 FillpFreeBufItem(item[j]);
939 continue;
940 }
941
942 fpcb = &(netconn->pcb->fpcb);
943 HlistAddTail(&fpcb->send.unSendList, &item[j]->unsendNode);
944 (void)FillpFrameAddItem(&fpcb->frameHandle, item[j]);
945 FillpPcbSendFc(fpcb);
946 }
947 }
948
SpungeDelay(struct SpungeInstance * inst,FILLP_LLONG curTime)949 static FILLP_BOOL SpungeDelay(struct SpungeInstance *inst, FILLP_LLONG curTime)
950 {
951 FILLP_LLONG timePass = curTime - inst->curTime;
952
953 FILLP_LLONG minSendInterval = (FILLP_LLONG)((FILLP_ULLONG)inst->minSendInterval >> FILLP_TIME_PRECISION);
954 if ((timePass > minSendInterval) && (timePass > FILLP_MINIMUM_SELECT_TIME)) {
955 minSendInterval = 0;
956 } else if (minSendInterval < FILLP_MINIMUM_SELECT_TIME) {
957 minSendInterval = FILLP_MINIMUM_SELECT_TIME;
958 }
959
960 if (SYS_ARCH_SEM_POST(&inst->threadSem)) {
961 FILLP_LOGWAR("sem wait failed");
962 }
963 if (inst->pcbList.list.size > 0) {
964 (void)SysioSelect((FILLP_INT)minSendInterval);
965 } else {
966 FILLP_SLEEP_MS((FILLP_UINT)FILLP_UTILS_US2MS(minSendInterval));
967 }
968 if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
969 FILLP_LOGWAR("sem wait failed");
970 }
971 return FILLP_TRUE;
972 }
973
SpungeMainDelay(struct SpungeInstance * inst)974 static FILLP_BOOL SpungeMainDelay(struct SpungeInstance *inst)
975 {
976 FILLP_BOOL isTimeout = FILLP_TRUE;
977 FILLP_LLONG curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
978
979 if (g_resource.common.fullCpuEnable && (inst->stb.tbFpcbLists.size > 0)) {
980 (void)SysioSelect(0);
981 inst->curTime = curTime;
982 return isTimeout;
983 }
984
985 if (curTime < inst->curTime) {
986 FILLP_LOGERR("System Time has been changed to past value");
987 return isTimeout;
988 }
989 isTimeout = SpungeDelay(inst, curTime);
990 curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
991 if (curTime < inst->curTime) {
992 FILLP_LOGERR("System Time has been changed to past value\r\n");
993 return isTimeout;
994 }
995
996 inst->curTime = curTime;
997 inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
998 return isTimeout;
999 }
1000
FillpServerRecvRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcRecvTotalRate,FILLP_INT realRecvConn,FILLP_UINT32 * connRecvCalLimit)1001 void FillpServerRecvRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcRecvTotalRate, FILLP_INT realRecvConn,
1002 FILLP_UINT32 *connRecvCalLimit)
1003 {
1004 static FILLP_UINT8 recvStableState = 0;
1005 static FILLP_UINT32 prevRecvTotRate = 0;
1006 static const FILLP_UINT32 maxCalcRecvRate = 0;
1007
1008 if ((calcRecvTotalRate > (RECV_RATE_PAR_LOW * prevRecvTotRate)) &&
1009 (calcRecvTotalRate < (RECV_RATE_PAT_HIGH * prevRecvTotRate))) {
1010 if (recvStableState < RECV_STATE_THRESHOLD) {
1011 recvStableState++;
1012 }
1013 } else {
1014 if (recvStableState > 0) {
1015 recvStableState--;
1016 }
1017 }
1018
1019 prevRecvTotRate = calcRecvTotalRate;
1020
1021 /* Give some space for every connection to grow, since if the network
1022 conditions are varying for every connection */
1023 /* If the sum of rate of all connections is less than the historical max
1024 recv rate, then allow to grow */
1025 if (recvStableState < FILLP_FC_STABLESTATE_VAL_2) {
1026 calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1027 } else if (calcRecvTotalRate < (maxCalcRecvRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1028 /* Give the enough room for the client to grow the bandwidth */
1029 calcRecvTotalRate = maxCalcRecvRate;
1030 } else {
1031 /* Give 5% room for connections to grow, so that it can achieve the max
1032 network goodput */
1033 calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1034 }
1035
1036 /* If the sum of received rate of all the connections is more than the configured
1037 rate, then limit it to configured rate.
1038 Rate should not exceed the configured value */
1039 if (calcRecvTotalRate > inst->rateControl.recv.maxRate) {
1040 calcRecvTotalRate = inst->rateControl.recv.maxRate;
1041 }
1042
1043 if (realRecvConn > 0) {
1044 *connRecvCalLimit = (FILLP_UINT32)((double)calcRecvTotalRate / realRecvConn);
1045 } else {
1046 /* If there are no connections which are active and connected, then set
1047 the rate limit for every connection to maximum limit */
1048 *connRecvCalLimit = inst->rateControl.recv.maxRate;
1049 }
1050 /* End of rate adjustment for Data receiving at server side */
1051 }
1052
FillpServerSendRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcSendTotalRate,FILLP_INT realSendConn,FILLP_UINT32 * connSendCalLimit)1053 void FillpServerSendRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcSendTotalRate, FILLP_INT realSendConn,
1054 FILLP_UINT32 *connSendCalLimit)
1055 {
1056 static FILLP_UINT8 sendStableState = 0;
1057 static FILLP_UINT32 prevSendTotRate = 0;
1058 static const FILLP_UINT32 maxCalcSendRate = 0;
1059
1060 if ((calcSendTotalRate > (FILLP_FC_PREV_ADJUSTMENT_RATE_LOW_VAL * prevSendTotRate)) &&
1061 (calcSendTotalRate < (FILLP_FC_PREV_ADJUSTMENT_RATE_HIGH_VAL * prevSendTotRate))) {
1062 if (sendStableState < FILLP_FC_STABLESTATE_VAL_1) {
1063 sendStableState++;
1064 }
1065 } else {
1066 if (sendStableState > 0) {
1067 sendStableState--;
1068 }
1069 }
1070
1071 prevSendTotRate = calcSendTotalRate;
1072
1073 /* Give some space for every connection to grow, since if the network
1074 conditions are varying for every connection */
1075 /* If the sum of rate of all connections is less than the historical max
1076 recv rate, then allow to grow */
1077 if (sendStableState < FILLP_FC_STABLESTATE_VAL_2) {
1078 calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1079 } else if (calcSendTotalRate < (maxCalcSendRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1080 calcSendTotalRate = maxCalcSendRate;
1081 } else {
1082 /* Give 5% room for connections to grow, so that it can achieve the max
1083 network goodput */
1084 calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1085 }
1086
1087 /* If the sum of sending rate as acked by PACK for all the connections is
1088 more than the configured rate, then limit it to configured rate.
1089 Rate should not exceed the configured value */
1090 if (calcSendTotalRate > inst->rateControl.send.maxRate) {
1091 calcSendTotalRate = inst->rateControl.send.maxRate;
1092 }
1093
1094 if (realSendConn > 0) {
1095 *connSendCalLimit = (FILLP_UINT32)((double)calcSendTotalRate / realSendConn);
1096 } else {
1097 /* If there are no connections which are active and connected, then set
1098 the rate limit for every connection to maximum limit */
1099 *connSendCalLimit = inst->rateControl.send.maxRate;
1100 }
1101
1102 /* End of rate adjustment for Data receiving at server side */
1103 }
1104
FillpCalculateFairness(struct SpungeInstance * inst)1105 void FillpCalculateFairness(struct SpungeInstance *inst)
1106 {
1107 struct HlistNode *pcbNode = FILLP_NULL_PTR;
1108 struct SpungePcb *pcb = FILLP_NULL_PTR;
1109 FILLP_INT realSendConn = 0;
1110 FILLP_INT realRecvConn = 0;
1111 struct FtNetconn *conn = FILLP_NULL_PTR;
1112 FILLP_UINT8 connState;
1113 FILLP_UINT32 connRecvCalLimit;
1114 FILLP_UINT32 connSendCalLimit;
1115 FILLP_UINT32 calcRecvTotalRate = 0;
1116 FILLP_UINT32 calcSendTotalRate = 0;
1117
1118 pcbNode = HLIST_FIRST(&inst->pcbList.list);
1119 while (pcbNode != FILLP_NULL_PTR) {
1120 pcb = SpungePcbListNodeEntry(pcbNode);
1121 pcbNode = pcbNode->next;
1122 conn = (struct FtNetconn *)pcb->conn;
1123
1124 connState = NETCONN_GET_STATE(conn);
1125 if (connState > CONN_STATE_CONNECTED) {
1126 /* Connection state is greater than the connected state, so skip and continue */
1127 continue;
1128 }
1129
1130 if (pcb->fpcb.statistics.pack.periodRecvRate > FILLP_DEFAULT_MIN_RATE) {
1131 realRecvConn++;
1132 }
1133
1134 if (pcb->fpcb.statistics.pack.periodSendRate > FILLP_DEFAULT_MIN_RATE) {
1135 realSendConn++;
1136 }
1137
1138 /* Calculate for Data receiving on server side */
1139 calcRecvTotalRate = calcRecvTotalRate + pcb->fpcb.statistics.pack.periodRecvRate;
1140
1141 /* Calculate for Data sending from server side */
1142 calcSendTotalRate = calcSendTotalRate + pcb->fpcb.statistics.pack.periodAckByPackRate;
1143 }
1144
1145 /* Calculation of rate adjustment for Data receiving at server side */
1146 FillpServerRecvRateAdjustment(inst, calcRecvTotalRate, realRecvConn, &connRecvCalLimit);
1147
1148 /* Calculation of rate adjustment for Data Sending at server side */
1149 FillpServerSendRateAdjustment(inst, calcSendTotalRate, realSendConn, &connSendCalLimit);
1150
1151 pcbNode = HLIST_FIRST(&inst->pcbList.list);
1152 while (pcbNode != FILLP_NULL_PTR) {
1153 pcb = SpungePcbListNodeEntry(pcbNode);
1154 pcbNode = pcbNode->next;
1155
1156 /* The rate is set to all the connections irrespective of whether the
1157 connection is idle or not, so that, once the connection starts pumping
1158 the data, it will have enough window to start with.
1159 All this algorithm will adjust the rate of all the connections accordingly */
1160 pcb->rateControl.recv.curMaxRateLimitation = connRecvCalLimit;
1161 pcb->fpcb.recv.oppositeSetRate = pcb->rateControl.recv.curMaxRateLimitation;
1162
1163 pcb->rateControl.send.curMaxRateLimitation = connSendCalLimit;
1164 pcb->fpcb.send.flowControl.sendRateLimit = pcb->rateControl.send.curMaxRateLimitation;
1165 }
1166 }
1167
FillpKillCore(void)1168 FILLP_BOOL FillpKillCore(void)
1169 {
1170 FILLP_UINT16 i;
1171 for (i = 0; i < SYS_ARCH_ATOMIC_READ(&g_spunge->sockTable->used); i++) {
1172 struct FtSocket *sock = g_spunge->sockTable->sockPool[i];
1173
1174 if ((sock->allocState != SOCK_ALLOC_STATE_FREE)) {
1175 return FILLP_FALSE;
1176 }
1177 }
1178
1179 return FILLP_TRUE;
1180 }
1181
FillpCheckPcbNackListToSend(void * args)1182 void FillpCheckPcbNackListToSend(void *args)
1183 {
1184 struct SpungePcb *pcb = ((struct FillpPcb *)args)->spcb;
1185 struct Hlist *nackList = FILLP_NULL_PTR;
1186 FILLP_LLONG curTime;
1187 struct HlistNode *node = FILLP_NULL_PTR;
1188 struct HlistNode *tmp = FILLP_NULL_PTR;
1189
1190 if (pcb == FILLP_NULL_PTR) {
1191 FILLP_LOGERR("spunge_pcb is NULL");
1192 return;
1193 }
1194
1195 nackList = &(pcb->fpcb.recv.nackList);
1196 if (nackList->size == 0) {
1197 return;
1198 }
1199
1200 curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
1201 node = HLIST_FIRST(nackList);
1202 while (node != FILLP_NULL_PTR) {
1203 struct FillpNackNode *nackNode = FillpNackNodeEntry(node);
1204 FILLP_LLONG timestamp = nackNode->timestamp;
1205 /*
1206 Commenting the timeout check again here, since the timing wheel
1207 will ensure that the time has elapsed before invoking this timeout
1208 function
1209 */
1210 if (curTime > timestamp) {
1211 FILLP_UINT32 startPktNum = nackNode->startPktNum;
1212 FILLP_UINT32 endPktNum = nackNode->endPktNum;
1213 FillpSendNack(&(pcb->fpcb), startPktNum, endPktNum);
1214 tmp = node;
1215 node = node->next;
1216 HlistDelete(nackList, tmp);
1217 SpungeFree(nackNode, SPUNGE_ALLOC_TYPE_CALLOC);
1218 nackNode = FILLP_NULL_PTR;
1219 } else {
1220 break;
1221 }
1222 }
1223
1224 /* if all the delay NACKs are sent out, then stop the timer */
1225 if (nackList->size > 0) {
1226 FillpEnableDelayNackTimer((struct FillpPcb *)args);
1227 }
1228 }
1229
SpinstLoopMacTimerChecker(void * p)1230 void SpinstLoopMacTimerChecker(void *p)
1231 {
1232 struct SpungeInstance *inst = (struct SpungeInstance *)p;
1233 /* Check server cookie Refresh */
1234 /* Duration is put as 30minutes, 1 Minute = 60,000 Milliseconds
1235 */
1236 if (((inst->curTime - (FILLP_LLONG)inst->macInfo.switchOverTime) > FILLP_KEY_REFRESH_TIME)) {
1237 FillpMacTimerExpire(&inst->macInfo, inst->curTime);
1238 }
1239 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->macTimerNode)) {
1240 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
1241 &inst->macTimerNode);
1242 }
1243 }
1244
SpinstLoopFairnessChecker(void * p)1245 void SpinstLoopFairnessChecker(void *p)
1246 {
1247 struct SpungeInstance *inst = (struct SpungeInstance *)p;
1248
1249 if ((g_resource.flowControl.supportFairness == FILLP_FAIRNESS_TYPE_EQUAL_WEIGHT) &&
1250 (inst->rateControl.connectionNum > 0)) {
1251 inst->rateControl.lastControlTime = inst->curTime;
1252 FillpCalculateFairness(inst);
1253 }
1254
1255 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->fairTimerNode)) {
1256 FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
1257 &inst->fairTimerNode);
1258 }
1259 }
1260
SpungeEnableTokenTimer(struct SpungeTokenBucke * stb)1261 void SpungeEnableTokenTimer(struct SpungeTokenBucke *stb)
1262 {
1263 if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1264 FillpTimingWheelAddTimer(&stb->inst->timingWheel, stb->tockenTimerNode.interval + stb->inst->curTime,
1265 &stb->tockenTimerNode);
1266 }
1267 }
1268
SpungeDisableTokenTimer(struct SpungeTokenBucke * stb)1269 void SpungeDisableTokenTimer(struct SpungeTokenBucke *stb)
1270 {
1271 if (FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1272 FillpTimingWheelDelTimer(stb->tockenTimerNode.wheel, &stb->tockenTimerNode);
1273 }
1274 }
1275
SpungeTokenTimerCb(void * p)1276 void SpungeTokenTimerCb(void *p)
1277 {
1278 struct SpungeTokenBucke *stb = (struct SpungeTokenBucke *)p;
1279 struct SpungeInstance *inst = (struct SpungeInstance *)stb->inst;
1280 FILLP_ULLONG bitAdded;
1281 FILLP_UINT32 tokens;
1282
1283 if (stb->rate != g_resource.flowControl.limitRate) {
1284 FILLP_UINT32 rate_bck = stb->rate;
1285 stb->rate = g_resource.flowControl.limitRate;
1286 stb->tokenCount = 0;
1287
1288 if (stb->rate != 0) {
1289 stb->tockenTimerNode.interval = (FILLP_UINT32)(
1290 ((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1291 if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1292 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1293 }
1294 } else {
1295 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1296 }
1297
1298 FILLP_LOGINF("limite rate change from:%u to:%u, timer_interval:%u, maxPktSize:%u", rate_bck, stb->rate,
1299 stb->tockenTimerNode.interval, stb->maxPktSize);
1300 }
1301
1302 bitAdded = (FILLP_ULLONG)(inst->curTime - stb->lastTime) * (FILLP_ULLONG)stb->rate;
1303 stb->lastTime = inst->curTime;
1304 tokens = (FILLP_UINT32)((bitAdded / (FILLP_ULLONG)FILLP_BPS_TO_KBPS) >> BIT_MOVE_CNT);
1305 if ((tokens < stb->maxPktSize) || (stb->tokenCount < stb->maxPktSize)) {
1306 stb->tokenCount += tokens;
1307 } else {
1308 stb->tokenCount = tokens;
1309 }
1310
1311 if (stb->tockenTimerNode.interval != SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1312 SpungeEnableTokenTimer(stb);
1313 }
1314 }
1315
SpungeItemRouteByToken(struct FillpPcbItem * item,struct FillpPcb * fpcb)1316 FILLP_INT SpungeItemRouteByToken(struct FillpPcbItem *item, struct FillpPcb *fpcb)
1317 {
1318 struct SpungeTokenBucke *stb;
1319 FILLP_INT ret = ERR_OK;
1320
1321 stb = &fpcb->pcbInst->stb;
1322
1323 if (stb->tockenTimerNode.interval == SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1324 SpungeTokenTimerCb(stb);
1325 }
1326 if ((stb->rate == 0) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) { /* no limit or limit -> nolimit */
1327 ret = FillpSendItem(item, fpcb);
1328 } else if ((stb->tokenCount >= (FILLP_UINT32)item->dataLen) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) {
1329 ret = FillpSendItem(item, fpcb);
1330 if (ret == ERR_OK) {
1331 stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1332 }
1333 } else {
1334 if (SkipListInsert(&fpcb->send.itemWaitTokenLists, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1335 /* this can't be happen */
1336 FILLP_LOGERR("fillp_sock_id:%d Can't add item <%u,%u> to itemWaitTokenLists", FILLP_GET_SOCKET(fpcb)->index,
1337 item->seqNum, item->dataLen);
1338 FillpFreeBufItem(item);
1339 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1340 #ifdef SOCK_SEND_SEM
1341 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1342 #endif /* SOCK_SEND_SEM */
1343 } else {
1344 stb->waitPktCount++;
1345 }
1346 }
1347
1348 return ret;
1349 }
1350
SpungeClearItemWaitTokenList(struct SpungeTokenBucke * stb)1351 static void SpungeClearItemWaitTokenList(struct SpungeTokenBucke *stb)
1352 {
1353 struct HlistNode *fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1354 struct FillpPcb *fpcb = FILLP_NULL_PTR;
1355 struct FillpPcbItem *item = FILLP_NULL_PTR;
1356
1357 while (fpcbNode != FILLP_NULL_PTR) {
1358 fpcb = FillpPcbStbNodeEntry(fpcbNode);
1359 fpcbNode = fpcbNode->next;
1360 item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1361 while (item != FILLP_NULL_PTR) {
1362 stb->waitPktCount--;
1363 /* here item should move to unrecvList, not directly send by udp,
1364 or the sendrate may be over the max send rate */
1365 if (SkipListInsert(&fpcb->send.unrecvList, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1366 FillpFreeBufItem(item);
1367 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1368 #ifdef SOCK_SEND_SEM
1369 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1370 #endif /* SOCK_SEND_SEM */
1371 } else if (item->sendCount > 0) {
1372 fpcb->send.unrecvRedunListBytes += item->dataLen;
1373 }
1374 item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1375 }
1376
1377 if (fpcb->send.unrecvList.nodeNum != 0) {
1378 FillpEnableSendTimer(fpcb);
1379 }
1380 }
1381
1382 if (stb->waitPktCount != 0) {
1383 FILLP_LOGERR("waitPktCount %llu is not 0", stb->waitPktCount);
1384 stb->waitPktCount = 0;
1385 }
1386 stb->fpcbCur = HLIST_FIRST(&(stb->tbFpcbLists));
1387 }
1388
SpungeCheckItemWaitTokenList(struct SpungeTokenBucke * stb)1389 void SpungeCheckItemWaitTokenList(struct SpungeTokenBucke *stb)
1390 {
1391 struct HlistNode *fpcbNode = FILLP_NULL_PTR;
1392 struct SkipListNode *node = FILLP_NULL_PTR;
1393 struct FillpPcb *fpcb = FILLP_NULL_PTR;
1394 struct FillpPcbItem *item = FILLP_NULL_PTR;
1395 FILLP_UINT32 fpcbCount = (FILLP_UINT32)stb->tbFpcbLists.size;
1396 FILLP_UINT32 waitListEmptyCount = 0;
1397 FILLP_INT err;
1398
1399 if (stb->waitPktCount == 0) {
1400 return;
1401 }
1402
1403 /* stb->rate change from !0 to 0, need to move all item form itemWaitTokenLists to unSendList */
1404 if (stb->rate == 0) {
1405 SpungeClearItemWaitTokenList(stb);
1406 return;
1407 }
1408
1409 fpcbNode = stb->fpcbCur;
1410 while ((stb->tokenCount > 0) && (stb->waitPktCount > 0) && (waitListEmptyCount < fpcbCount)) {
1411 if (fpcbNode == FILLP_NULL_PTR) {
1412 fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1413 }
1414
1415 fpcb = FillpPcbStbNodeEntry(fpcbNode);
1416 node = SkipListGetPop(&(fpcb->send.itemWaitTokenLists));
1417 if (node == FILLP_NULL_PTR) {
1418 fpcbNode = fpcbNode->next;
1419 waitListEmptyCount++;
1420 continue;
1421 }
1422
1423 item = (struct FillpPcbItem *)node->item;
1424 if (stb->tokenCount < item->dataLen) {
1425 break;
1426 }
1427
1428 stb->waitPktCount--;
1429 (void)SkipListPopValue(&fpcb->send.itemWaitTokenLists);
1430 err = FillpSendItem(item, fpcb);
1431 if (err == ERR_OK) {
1432 stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1433 }
1434 fpcbNode = fpcbNode->next;
1435 waitListEmptyCount = 0;
1436 }
1437
1438 stb->fpcbCur = fpcbNode;
1439 }
1440
SpungeInitTokenBucket(struct SpungeInstance * inst)1441 void SpungeInitTokenBucket(struct SpungeInstance *inst)
1442 {
1443 struct SpungeTokenBucke *stb = &inst->stb;
1444
1445 stb->inst = inst;
1446 stb->lastTime = inst->curTime;
1447 stb->rate = g_resource.flowControl.limitRate;
1448 stb->waitPktCount = 0;
1449 stb->tokenCount = 0;
1450 stb->maxPktSize = (FILLP_UINT32)g_appResource.flowControl.pktSize;
1451
1452 stb->fpcbCur = FILLP_NULL_PTR;
1453 HLIST_INIT(&(stb->tbFpcbLists));
1454
1455 FILLP_TIMING_WHEEL_INIT_NODE(&stb->tockenTimerNode);
1456 stb->tockenTimerNode.cbNode.cb = SpungeTokenTimerCb;
1457 stb->tockenTimerNode.cbNode.arg = (void *)stb;
1458 if (stb->rate != 0) {
1459 stb->tockenTimerNode.interval =
1460 (FILLP_UINT32)(((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1461 if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1462 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1463 }
1464 } else {
1465 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1466 }
1467
1468 FILLP_LOGINF("limite rate:%u, timer_interval:%u, maxPktSize:%u", stb->rate, stb->tockenTimerNode.interval,
1469 stb->maxPktSize);
1470 SpungeEnableTokenTimer(stb);
1471 }
1472
SpungeTokenBucketAddFpcb(struct FillpPcb * fpcb)1473 void SpungeTokenBucketAddFpcb(struct FillpPcb *fpcb)
1474 {
1475 struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1476
1477 if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1478 return;
1479 }
1480
1481 stb = &fpcb->pcbInst->stb;
1482 if (stb->maxPktSize < (FILLP_UINT32)fpcb->pktSize) {
1483 stb->maxPktSize = (FILLP_UINT32)fpcb->pktSize;
1484 }
1485
1486 HLIST_INIT_NODE(&(fpcb->stbNode));
1487 HlistAddTail(&stb->tbFpcbLists, &(fpcb->stbNode));
1488 FILLP_LOGINF("fillp_sock_id:%d, maxPktSize:%u,"
1489 "limitRate:%u",
1490 FILLP_GET_SOCKET(fpcb)->index, stb->maxPktSize, stb->rate);
1491 }
1492
SpungeTokenBucketDelFpcb(struct FillpPcb * fpcb)1493 void SpungeTokenBucketDelFpcb(struct FillpPcb *fpcb)
1494 {
1495 struct HlistNode *node = FILLP_NULL_PTR;
1496 struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1497
1498 if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1499 return;
1500 }
1501
1502 stb = &fpcb->pcbInst->stb;
1503 if ((stb->fpcbCur != FILLP_NULL_PTR) && (stb->fpcbCur == &(fpcb->stbNode))) {
1504 stb->fpcbCur = stb->fpcbCur->next;
1505 }
1506
1507 node = HLIST_FIRST(&(stb->tbFpcbLists));
1508 while (node != FILLP_NULL_PTR) {
1509 if (&(fpcb->stbNode) == node) {
1510 stb->waitPktCount -= (FILLP_ULLONG)fpcb->send.itemWaitTokenLists.nodeNum;
1511 HlistDelete(&(stb->tbFpcbLists), node);
1512 FILLP_LOGINF("fillp_sock_id:%d, limitRate:%u", FILLP_GET_SOCKET(fpcb)->index, stb->rate);
1513 break;
1514 }
1515 node = node->next;
1516 }
1517 }
1518
1519 /* Return 1 if still alive , or return 0 */
SpinstLoopCheckAlive(struct SpungeInstance * inst)1520 static int SpinstLoopCheckAlive(struct SpungeInstance *inst)
1521 {
1522 if (inst->waitTobeCoreKilled && FillpKillCore()) {
1523 inst->waitTobeCoreKilled = FILLP_FALSE;
1524 return 0;
1525 }
1526
1527 return 1;
1528 }
1529
SpinstLoopRecv(struct SpungeInstance * inst)1530 static void SpinstLoopRecv(struct SpungeInstance *inst)
1531 {
1532 struct HlistNode *osSockNode;
1533 int readable = 1;
1534 osSockNode = HLIST_FIRST(&inst->osSockist);
1535 /* Select doesn't work with sendmmsg/recvmmsg, so in that case it is always
1536 set as 1 */
1537 while (osSockNode != FILLP_NULL_PTR) {
1538 struct SockOsSocket *osSock = SockOsListEntry(osSockNode);
1539 if (!g_resource.udp.supportMmsg) {
1540 readable = SysioIsSockReadable((void *)osSock->ioSock);
1541 }
1542 osSockNode = osSockNode->next;
1543
1544 if (readable) {
1545 SpungeDoRecvCycle(osSock, inst);
1546 }
1547 }
1548 }
1549
1550 #if !defined(FILLP_LW_LITEOS)
SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance * inst)1551 static void SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance *inst)
1552 {
1553 FILLP_CHAR threadName[SPUNGE_MAX_THREAD_NAME_LENGTH] = {0};
1554 FILLP_UINT8 random = (FILLP_UINT8)(FILLP_RAND() & 0xFF);
1555 (void)inst;
1556 FILLP_INT ret = sprintf_s(threadName, sizeof(threadName), "%s_%u", "Fillp_core", (FILLP_UINT)random);
1557 if (ret < ERR_OK) {
1558 FILLP_LOGWAR("SpungeInstanceMainThread sprintf_s thread name failed(%d), random(%u)", ret, random);
1559 }
1560 (void)SysSetThreadName(threadName, sizeof(threadName));
1561
1562 #if defined(FILLP_LINUX)
1563 {
1564 pthread_t self;
1565 self = pthread_self();
1566 FILLP_LOGINF("FillP Core threadId:%ld", self);
1567 /* thread resource will be auto recycled
1568 only this detach set if no other thread try to join it */
1569 if (pthread_detach(self)) {
1570 FILLP_LOGERR("Set Detach fail");
1571 }
1572 }
1573 #elif defined(FILLP_WIN32)
1574 FILLP_LOGBUTT("FillP Core threadId:%d", GetCurrentThreadId());
1575 #endif
1576 }
1577 #endif
1578
SpungeInstanceMainThread(void * p)1579 void SpungeInstanceMainThread(void *p)
1580 {
1581 struct SpungeInstance *inst = FILLP_NULL_PTR;
1582 FILLP_BOOL isTimeout;
1583
1584 if (p == FILLP_NULL_PTR) {
1585 FILLP_LOGERR("parameter p is NULL");
1586 return;
1587 }
1588
1589 inst = (struct SpungeInstance *)p;
1590 #if !defined(FILLP_LW_LITEOS)
1591 SpungeSetThreadInfo(inst);
1592 #endif
1593
1594 if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
1595 FILLP_LOGWAR("sem wait failed");
1596 return;
1597 }
1598 while (inst->hasInited) {
1599 SpungeHandleMsgCycle(inst);
1600 SpungeLoopCheckUnsendBox(inst);
1601 if (!SpinstLoopCheckAlive(inst)) {
1602 break;
1603 }
1604 isTimeout = SpungeMainDelay(inst);
1605 SpinstLoopRecv(inst);
1606
1607 if (isTimeout == FILLP_TRUE) {
1608 FillpTimingWheelLoopCheck(&inst->timingWheel, inst->curTime);
1609 }
1610
1611 SpungeCheckItemWaitTokenList(&inst->stb);
1612 }
1613
1614 SpungeDestroyInstance(inst);
1615 }
1616
SpungePushRecvdDataToStack(void * arg)1617 void SpungePushRecvdDataToStack(void *arg)
1618 {
1619 struct FillpPcb *pcb = (struct FillpPcb *)arg;
1620 struct FillpPcbItem *item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1621 while (item != FILLP_NULL_PTR) {
1622 FillpDataToStack(pcb, item);
1623 item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1624 }
1625
1626 FillpEnableDataBurstTimer(pcb);
1627 }
1628
1629 #ifdef __cplusplus
1630 }
1631 #endif
1632