1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "spunge_core.h"
17 #ifdef FILLP_LINUX
18 #include <errno.h>
19 #endif
20 #include <stdio.h>
21 #include "securec.h"
22 #include "sysio.h"
23 #include "res.h"
24 #include "socket_common.h"
25 #include "fillp_flow_control.h"
26 #include "timing_wheel.h"
27 #include "fillp_buf_item.h"
28 #include "callbacks.h"
29 #include "fillp_common.h"
30 #include "spunge.h"
31 #include "spunge_stack.h"
32 #include "spunge_message.h"
33 #include "fillp_output.h"
34 #include "fillp_input.h"
35 #include "fillp_dfx.h"
36 
37 #ifdef __cplusplus
38 extern "C" {
39 #endif
40 
41 SYS_ARCH_SEM g_resDeinitSem;
42 #define BIT_MOVE_CNT 3
43 #define RECV_RATE_PAR_LOW 0.98
44 #define RECV_RATE_PAT_HIGH 1.02
45 #define RECV_STATE_THRESHOLD 10
46 
47 void SpungeFreeInstanceResource(struct SpungeInstance *inst);
48 
49 
SpungeDoRecvCycle(struct SockOsSocket * osSock,struct SpungeInstance * inst)50 void SpungeDoRecvCycle(struct SockOsSocket *osSock, struct SpungeInstance *inst)
51 {
52     FILLP_UINT32 i;
53     struct NetBuf buf;
54     struct SpungePcb *spcb = FILLP_NULL_PTR;
55 
56     if (!OS_SOCK_OPS_FUNC_VALID(osSock, fetchPacket)) {
57         return;
58     }
59 
60     (void)memset_s(&buf, sizeof(buf), 0, sizeof(buf));
61     buf.p = inst->tmpBuf[0];
62     for (i = 0; i < g_resource.udp.rxBurst; i++) {
63         spcb = osSock->ioSock->ops->fetchPacket((void *)osSock, (void *)&buf, 0);
64         if (spcb != FILLP_NULL_PTR) {
65             FillpDoInput(&spcb->fpcb, &buf, inst);
66             continue;
67         } else {
68             break;
69         }
70     }
71 }
72 
SpungeCalExpectedBytes(FILLP_UINT32 * sendPktNum,struct SpungePcb * pcb,struct FtSocket * sock,struct FillpFlowControl * flowControl,FILLP_LLONG detaTime)73 static FILLP_UINT32 SpungeCalExpectedBytes(FILLP_UINT32 *sendPktNum, struct SpungePcb *pcb,
74     struct FtSocket *sock, struct FillpFlowControl *flowControl, FILLP_LLONG detaTime)
75 {
76     FILLP_UINT32 bytesExpected;
77     FILLP_UINT32 pktNum = sock->resConf.udp.txBurst;
78 
79     if (flowControl->sendInterval) {
80         pktNum = (FILLP_UINT32)(detaTime / flowControl->sendInterval);
81     }
82 
83     if (pktNum <= (sock->resConf.udp.txBurst)) {
84         /* sendRate is kbps */
85         FILLP_ULLONG bitsExpected = (FILLP_ULLONG)(detaTime * flowControl->sendRate / FILLP_ONE_SECOND);
86         bitsExpected >>= FILLP_TIME_PRECISION;
87         bytesExpected = (FILLP_UINT32)(FILLP_UTILS_BIT2BYTE(bitsExpected));
88         pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
89         bytesExpected += pcb->fpcb.send.flowControl.remainBytes;
90     } else {
91         pktNum = sock->resConf.udp.txBurst;
92         bytesExpected = (FILLP_UINT32)(pktNum * pcb->fpcb.pktSize);
93         pcb->fpcb.statistics.traffic.packExpSendBytes += bytesExpected;
94     }
95     *sendPktNum = pktNum;
96     FILLP_LOGDBG("before_send_cycle fillp_sock_id:%d unRecvNum:%u, unAck:%u\r\n",
97         sock->index, pcb->fpcb.send.unrecvList.nodeNum, pcb->fpcb.send.unackList.count);
98     return bytesExpected;
99 }
100 
SpungeDoSendUpdate(struct SpungePcb * pcb,FILLP_UINT32 sendBytes,FILLP_UINT32 bytesExpected)101 static void SpungeDoSendUpdate(struct SpungePcb *pcb, FILLP_UINT32 sendBytes, FILLP_UINT32 bytesExpected)
102 {
103     if ((sendBytes > 0) && (bytesExpected >= (FILLP_UINT32)sendBytes)) {
104         pcb->fpcb.statistics.traffic.packSendBytes += sendBytes;
105         pcb->fpcb.send.flowControl.remainBytes = (bytesExpected - (FILLP_UINT32)sendBytes);
106     } else {
107         pcb->fpcb.send.flowControl.remainBytes = 0;
108     }
109 }
110 
SpungeDoSendCycle(struct SpungePcb * pcb,struct SpungeInstance * inst,FILLP_LLONG detaTime)111 void SpungeDoSendCycle(struct SpungePcb *pcb, struct SpungeInstance *inst, FILLP_LLONG detaTime)
112 {
113     FILLP_UINT32 sendPktNum;
114     FILLP_UINT32 sendBytes = 0;
115     FILLP_UINT32 tmpBytes = 0;
116     FILLP_UINT32 bytesExpected;
117 
118     if ((pcb == FILLP_NULL_PTR) || (pcb->conn == FILLP_NULL_PTR)) {
119         FILLP_LOGERR("NULL Pointer");
120         return;
121     }
122 
123     FILLP_SIZE_T pktSize = pcb->fpcb.pktSize;
124     struct FillpFlowControl *flowControl = &pcb->fpcb.send.flowControl;
125     struct FtNetconn *conn = (struct FtNetconn *)pcb->conn;
126     struct FtSocket *sock = (struct FtSocket *)conn->sock;
127 
128     if (sock == FILLP_NULL_PTR) {
129         FILLP_LOGERR("NULL Pointer");
130         return;
131     }
132 
133     flowControl->sendTime = inst->curTime;
134     bytesExpected = SpungeCalExpectedBytes(&sendPktNum, pcb, sock, flowControl, detaTime);
135 
136     /* flow control alg may need to change bytesExpected for the this send cycle a according to current status */
137     if (pcb->fpcb.algFuncs.updateExpectSendBytes != FILLP_NULL_PTR) {
138         pcb->fpcb.algFuncs.updateExpectSendBytes(&pcb->fpcb, &bytesExpected);
139     }
140 
141     /* If BytesExpected less than pktSize, no need to send, just store the remainBytes */
142     if (bytesExpected >= pktSize) {
143         /* Make sure that the send bytes won't more than bytesExpected */
144         tmpBytes = (FILLP_UINT32)(bytesExpected - pktSize);
145 
146         sendBytes = FillpSendOne(&pcb->fpcb, tmpBytes, sendPktNum);
147         SpungeDoSendUpdate(pcb, sendBytes, bytesExpected);
148     } else {
149         pcb->fpcb.send.flowControl.remainBytes = bytesExpected;
150     }
151 
152     FILLP_LOGDBG("after_send_cycle: fillp_sock_id:%d expected bytes:%u sentBytes:%u remain:%u \r\n",
153         sock->index, sendBytes, tmpBytes, pcb->fpcb.send.flowControl.remainBytes);
154 
155     if ((pcb->fpcb.send.flowControl.remainBytes) || (!HLIST_EMPTY(&pcb->fpcb.send.unSendList)) ||
156         (pcb->fpcb.send.redunList.nodeNum) || (pcb->fpcb.send.unrecvList.nodeNum)) {
157         FillpEnableSendTimer(&pcb->fpcb);
158     } else {
159         FillpDisableSendTimer(&pcb->fpcb);
160     }
161 
162     return;
163 }
164 
SpungeDestroySockTableSocket(struct FtSocketTable * table,int tableIndex)165 static void SpungeDestroySockTableSocket(struct FtSocketTable *table, int tableIndex)
166 {
167     struct FtSocket *sock = FILLP_NULL_PTR;
168 
169     if (table == FILLP_NULL_PTR) {
170         return;
171     }
172 
173     sock = table->sockPool[tableIndex];
174     if (sock == FILLP_NULL_PTR) {
175         return;
176     }
177     (void)SYS_ARCH_RWSEM_DESTROY(&sock->sockConnSem);
178     (void)SYS_ARCH_SEM_DESTROY(&sock->connBlockSem);
179     (void)SYS_ARCH_SEM_DESTROY(&sock->sockCloseProtect);
180     (void)SYS_ARCH_SEM_DESTROY(&sock->epollTaskListLock);
181     SpungeFree(sock, SPUNGE_ALLOC_TYPE_CALLOC);
182     table->sockPool[tableIndex] = FILLP_NULL_PTR;
183 }
184 
185 /* SFT */
SpungeCreateSockTable(FILLP_UINT maxSock)186 struct FtSocketTable *SpungeCreateSockTable(FILLP_UINT maxSock)
187 {
188     int i;
189     struct FtSocketTable *table;
190     table = (struct FtSocketTable *)SpungeAlloc(1, sizeof(struct FtSocketTable), SPUNGE_ALLOC_TYPE_CALLOC);
191     if (table == FILLP_NULL_PTR) {
192         FILLP_LOGERR("Failed to allocate memory for socket table \r\n");
193         return FILLP_NULL_PTR;
194     }
195 
196     table->freeQueqe = FillpQueueCreate("sock_free_table", (FILLP_SIZE_T)maxSock, SPUNGE_ALLOC_TYPE_CALLOC);
197 
198     if (table->freeQueqe == FILLP_NULL_PTR) {
199         FILLP_LOGERR("Fail to create socket table free queue");
200         goto ERR_FAIL;
201     }
202 
203     FillpQueueSetConsSafe(table->freeQueqe, FILLP_TRUE);
204     FillpQueueSetProdSafe(table->freeQueqe, FILLP_TRUE);
205 
206     table->sockPool =
207         (struct FtSocket **)SpungeAlloc(maxSock, (FILLP_SIZE_T)sizeof(struct FtSocket *), SPUNGE_ALLOC_TYPE_CALLOC);
208     if (table->sockPool == FILLP_NULL_PTR) {
209         FILLP_LOGERR("Failed to allocate memory for sockPool of socket table");
210         goto ERR_FAIL;
211     }
212 
213     table->size = (FILLP_INT)maxSock;
214     SYS_ARCH_ATOMIC_SET(&table->used, 0);
215     for (i = 0; i < table->size; i++) {
216         table->sockPool[i] = FILLP_NULL_PTR;
217     }
218 
219     return table;
220 
221 ERR_FAIL:
222     if (table->freeQueqe != FILLP_NULL_PTR) {
223         FillpQueueDestroy(table->freeQueqe);
224         table->freeQueqe = FILLP_NULL_PTR;
225     }
226 
227     if (table->sockPool != FILLP_NULL_PTR) {
228         SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
229         table->sockPool = FILLP_NULL_PTR;
230     }
231 
232     SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
233 
234     return FILLP_NULL_PTR;
235 }
236 
237 
238 /* SFT */
SpungeDestroySockTable(struct FtSocketTable * table)239 void SpungeDestroySockTable(struct FtSocketTable *table)
240 {
241     FILLP_INT i;
242 
243     for (i = 0; i < SYS_ARCH_ATOMIC_READ(&table->used); i++) {
244         SpungeDestroySockTableSocket(table, i);
245     }
246 
247     if (table->freeQueqe != FILLP_NULL_PTR) {
248         FillpQueueDestroy(table->freeQueqe);
249         table->freeQueqe = FILLP_NULL_PTR;
250     }
251 
252     if (table->sockPool != FILLP_NULL_PTR) {
253         SpungeFree(table->sockPool, SPUNGE_ALLOC_TYPE_CALLOC);
254         table->sockPool = FILLP_NULL_PTR;
255     }
256 
257     /* NULL check for table already done at the caller, and also in the above
258     check, table is dereferenced without validating, so need to check for NULL
259     again here before freeing it */
260     SpungeFree(table, SPUNGE_ALLOC_TYPE_CALLOC);
261 }
262 
SpungeInstMsgBoxInit(struct SpungeInstance * inst)263 static FILLP_INT SpungeInstMsgBoxInit(struct SpungeInstance *inst)
264 {
265     (void)SYS_ARCH_ATOMIC_SET(&inst->msgUsingCount, 0);
266     inst->msgBox = FillpQueueCreate("spunge_msg_box", g_spunge->resConf.maxMsgItemNum, SPUNGE_ALLOC_TYPE_MALLOC);
267     if (inst->msgBox == FILLP_NULL_PTR) {
268         FILLP_LOGERR("Init inst->msgBox Fail");
269         return ERR_NORES;
270     }
271 
272     FillpQueueSetConsSafe(inst->msgBox, FILLP_TRUE);
273     FillpQueueSetProdSafe(inst->msgBox, FILLP_TRUE);
274 
275     inst->msgPool = SpungeMsgCreatePool(FILLP_MSG_ITEM_INIT_NUM, (int)g_spunge->resConf.maxMsgItemNum);
276     if (inst->msgPool == FILLP_NULL_PTR) {
277         FILLP_LOGERR("create msg pool fail");
278         return ERR_NORES;
279     }
280 
281     DympSetConsSafe(inst->msgPool, FILLP_TRUE);
282     DympSetProdSafe(inst->msgPool, FILLP_TRUE);
283     return ERR_OK;
284 }
285 
SpungeInstSendInit(struct SpungeInstance * inst)286 static FILLP_INT SpungeInstSendInit(struct SpungeInstance *inst)
287 {
288     int i;
289 
290     /* To control on client sending */
291     inst->rateControl.connectionNum = FILLP_NULL;
292 
293     inst->rateControl.recv.maxRate = g_resource.flowControl.maxRecvRate;
294 
295     /* To control on server sending */
296     inst->rateControl.send.maxRate = g_resource.flowControl.maxRate;
297 
298     inst->thresdSemInited = FILLP_FALSE;
299     int ret = SYS_ARCH_SEM_INIT(&inst->threadSem, 1);
300     if (ret != FILLP_OK) {
301         FILLP_LOGERR("SYS_ARCH_SEM_INIT fails");
302         return ERR_NORES;
303     }
304     inst->thresdSemInited = FILLP_TRUE;
305 
306     inst->unsendItem =
307         SpungeAlloc(FILLP_UNSEND_BOX_LOOP_CHECK_BURST, sizeof(struct FillpPcbItem *), SPUNGE_ALLOC_TYPE_CALLOC);
308     if (inst->unsendItem == FILLP_NULL_PTR) {
309         FILLP_LOGERR("inst->unsendItem NULL");
310         return ERR_NORES;
311     }
312 
313     for (i = 0; i < FILLP_VLEN; i++) {
314         inst->tmpBuf[i] = SpungeAlloc(1, (sizeof(FILLP_CHAR) * FILLP_MAX_PKT_SIZE), SPUNGE_ALLOC_TYPE_MALLOC);
315         if (inst->tmpBuf[i] == FILLP_NULL_PTR) {
316             FILLP_LOGERR("inst->tmpBuf[%d] is NULL", i);
317             return ERR_NORES;
318         }
319     }
320 
321     HLIST_INIT(&inst->sendPcbList);
322     for (i = 0; i < FILLP_INST_UNSEND_BOX_NUM; i++) {
323         inst->unsendBox[i] = FillpQueueCreate("socket_send_box", FILLP_INST_UNSEND_BOX_SIZE, SPUNGE_ALLOC_TYPE_MALLOC);
324         if (inst->unsendBox[i] == FILLP_NULL_PTR) {
325             FILLP_LOGERR("inst->unsendBox[%d] is NULL", i);
326             return ERR_NORES;
327         }
328 
329         FillpQueueSetConsSafe(inst->unsendBox[i], FILLP_FALSE);
330         FillpQueueSetProdSafe(inst->unsendBox[i], FILLP_TRUE);
331     }
332     return ERR_OK;
333 }
334 
SpungeInstTimerInit(struct SpungeInstance * inst)335 static void SpungeInstTimerInit(struct SpungeInstance *inst)
336 {
337     inst->curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
338 
339     (void)memset_s(&inst->macInfo, sizeof(FillpMacInfo), 0, sizeof(FillpMacInfo));
340     FillpMacTimerExpire(&inst->macInfo, inst->curTime);
341 
342     FillpTimingWheelInit(&inst->timingWheel, FILLP_TIMING_WHEEL_ACCURACY);
343 
344     /* Init the global timers */
345     FtGlobalTimerInit(inst);
346     SpungeInitTokenBucket(inst);
347 }
348 
SpungeThreadInit(struct SpungeInstance * inst)349 static FILLP_INT SpungeThreadInit(struct SpungeInstance *inst)
350 {
351     FILLP_THREAD threadId;
352 
353     inst->mainThreadParam.func = SpungeInstanceMainThread;
354     inst->mainThreadParam.param = inst;
355     inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
356 
357     inst->hasInited = FILLP_TRUE;
358 
359     (void)FILLP_SYS_START_NEWTHREAD(&inst->mainThreadParam, &threadId);
360 
361     return ERR_OK;
362 }
363 
SpungeInstInit(struct SpungeInstance * inst)364 FILLP_INT SpungeInstInit(struct SpungeInstance *inst)
365 {
366     FILLP_INT err;
367 
368     if (inst == FILLP_NULL_PTR) {
369         FILLP_LOGERR("Init inst null");
370         return ERR_NULLPTR;
371     }
372 
373     if (inst->hasInited) {
374         FILLP_LOGERR("Stack has been inited");
375         return ERR_OK;
376     }
377 
378     err = SpungeInstMsgBoxInit(inst);
379     if (err != ERR_OK) {
380         goto FAIL;
381     }
382 
383     HLIST_INIT(&inst->osSockist);
384     HLIST_INIT(&inst->pcbList.list);
385 
386     err = SpungeInstSendInit(inst);
387     if (err != ERR_OK) {
388         goto FAIL;
389     }
390 
391     SpungeInstTimerInit(inst);
392 
393     inst->cleanseDataCtr = 0;
394 
395     err = SpungeThreadInit(inst);
396     if (err != ERR_OK) {
397         goto FAIL;
398     }
399 
400     return ERR_OK;
401 
402 FAIL:
403     SpungeFreeInstanceResource(inst);
404     return err;
405 }
406 
SpungeSysCallRegisted(void)407 static FILLP_INT SpungeSysCallRegisted(void)
408 {
409     FILLP_INT ret;
410 
411     ret = FillpValidateFuncPtr(&g_fillpOsBasicLibFun, sizeof(FillpSysLibBasicCallbackFuncSt));
412     if (ret != ERR_OK) {
413         SET_ERRNO(FILLP_EINVAL);
414         FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsBasicLibFun failed");
415         return ret;
416     }
417 
418     ret = FillpValidateFuncPtr(&g_fillpOsSemLibFun, sizeof(FillpSysLibSemCallbackFuncSt));
419     if (ret != ERR_OK) {
420         SET_ERRNO(FILLP_EINVAL);
421         FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsSemLibFun failed");
422         return ret;
423     }
424 
425     ret = FillpValidateFuncPtr(&g_fillpOsSocketLibFun, sizeof(FillpSysLibSockCallbackFuncSt));
426     if (ret != ERR_OK) {
427         SET_ERRNO(FILLP_EINVAL);
428         FILLP_LOGERR("FillpValidateFuncPtr g_fillpOsSocketLibFun failed");
429         return ret;
430     }
431 
432     return ERR_OK;
433 }
434 
FtFreeEpollResource(void)435 static void FtFreeEpollResource(void)
436 {
437     if (g_spunge->epitemPool != FILLP_NULL_PTR) {
438         DympDestroyPool(g_spunge->epitemPool);
439         g_spunge->epitemPool = FILLP_NULL_PTR;
440     }
441 
442     if (g_spunge->eventpollPool != FILLP_NULL_PTR) {
443         DympDestroyPool(g_spunge->eventpollPool);
444         g_spunge->eventpollPool = FILLP_NULL_PTR;
445     }
446 }
447 
FtAllocateEpollResource(void)448 static FILLP_INT FtAllocateEpollResource(void)
449 {
450     DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
451     g_spunge->epitemPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
452         sizeof(struct EpItem), FILLP_TRUE, &itemOperaCb);
453     if (g_spunge->epitemPool == FILLP_NULL_PTR) {
454         FILLP_LOGERR("create mem pool for g_spunge->epitemPool failed");
455         return ERR_NORES;
456     }
457     DympSetConsSafe(g_spunge->epitemPool, FILLP_TRUE);
458     DympSetProdSafe(g_spunge->epitemPool, FILLP_TRUE);
459 
460     g_spunge->eventpollPool = DympCreatePool(FILLP_EPOLL_ITEM_INIT_NUM, (int)g_spunge->resConf.maxEpollEventNum,
461         sizeof(struct EventPoll), FILLP_TRUE, &itemOperaCb);
462     if (g_spunge->eventpollPool == FILLP_NULL_PTR) {
463         FtFreeEpollResource();
464         FILLP_LOGERR("create Dym pool for g_spunge->eventpollPool failed");
465         return ERR_NORES;
466     }
467     DympSetConsSafe(g_spunge->eventpollPool, FILLP_TRUE);
468     DympSetProdSafe(g_spunge->eventpollPool, FILLP_TRUE);
469     return ERR_OK;
470 }
471 
SpungeAllocInstRes(void)472 static int SpungeAllocInstRes(void)
473 {
474     FILLP_UINT i;
475     FILLP_UINT j;
476     FILLP_INT err;
477 
478     for (i = 0; i < g_spunge->insNum; i++) {
479         (void)memset_s(&g_spunge->instPool[i], sizeof(struct SpungeInstance), 0, sizeof(struct SpungeInstance));
480         g_spunge->instPool[i].instIndex = (FILLP_INT)i;
481         err = SpungeInstInit(&g_spunge->instPool[i]);
482         if (err == ERR_OK) {
483             continue;
484         }
485         FILLP_LOGERR("SpungeInstInit failed :: Instance number :: %u", i);
486 
487         /* Release instances which are created success */
488         if (i > 0) {
489             g_spunge->insNum = i;
490 
491             g_spunge->hasDeinitBlked = FILLP_TRUE;
492             for (j = 0; j < g_spunge->insNum; j++) {
493                 g_spunge->instPool[j].waitTobeCoreKilled = FILLP_TRUE;
494             }
495 
496             /* After this step g_spunge will be freed, it should not be accessed, caller has check for NULL pointer
497                 before accessing, so it will not cause problem */
498             (void)SYS_ARCH_SEM_WAIT(&g_resDeinitSem);
499         }
500         return err;
501     }
502 
503     return ERR_OK;
504 }
505 
SpungeFreeInstSendRecv(struct SpungeInstance * inst)506 static void SpungeFreeInstSendRecv(struct SpungeInstance *inst)
507 {
508     int j;
509     if (inst->thresdSemInited) {
510         (void)SYS_ARCH_SEM_DESTROY(&inst->threadSem);
511         inst->thresdSemInited = FILLP_FALSE;
512     }
513 
514     for (j = 0; j < FILLP_INST_UNSEND_BOX_NUM; j++) {
515         if (inst->unsendBox[j] != FILLP_NULL_PTR) {
516             FillpQueueDestroy(inst->unsendBox[j]);
517             inst->unsendBox[j] = FILLP_NULL_PTR;
518         } else {
519             break;
520         }
521     }
522     if (inst->unsendItem != FILLP_NULL_PTR) {
523         SpungeFree(inst->unsendItem, SPUNGE_ALLOC_TYPE_CALLOC);
524         inst->unsendItem = FILLP_NULL_PTR;
525     }
526 
527     for (j = 0; j < FILLP_VLEN; j++) {
528         if (inst->tmpBuf[j] == FILLP_NULL_PTR) {
529             break;
530         }
531         SpungeFree(inst->tmpBuf[j], SPUNGE_ALLOC_TYPE_MALLOC);
532         inst->tmpBuf[j] = FILLP_NULL_PTR;
533     }
534 }
535 
SpungeFreeInstanceResource(struct SpungeInstance * inst)536 void SpungeFreeInstanceResource(struct SpungeInstance *inst)
537 {
538     if (inst == FILLP_NULL_PTR) {
539         return;
540     }
541 
542     if (inst->msgBox != FILLP_NULL_PTR) {
543         FillpQueueDestroy(inst->msgBox);
544         inst->msgBox = FILLP_NULL_PTR;
545     }
546 
547     while (SYS_ARCH_ATOMIC_READ(&inst->msgUsingCount) > 0) {
548         FILLP_SLEEP_MS(1);
549     }
550 
551     if (inst->msgPool != FILLP_NULL_PTR) {
552         SpungeMsgPoolDestroy(inst->msgPool);
553         inst->msgPool = FILLP_NULL_PTR;
554     }
555 
556     SpungeFreeInstSendRecv(inst);
557 
558     inst->hasInited = FILLP_FALSE;
559 }
560 
FtGetSpungeRes(struct SpungeResConf * resConf)561 static void FtGetSpungeRes(struct SpungeResConf *resConf)
562 {
563     (void)memset_s(resConf, sizeof(struct SpungeResConf), 0, sizeof(struct SpungeResConf));
564 
565     resConf->maxInstNum = (FILLP_UINT)UTILS_MIN(g_resource.common.maxInstNum, MAX_SPUNGEINSTANCE_NUM);
566     resConf->maxSockNum = g_resource.common.maxSockNum;
567     resConf->maxConnNum = g_resource.common.maxConnNum;
568     resConf->maxMsgItemNum    = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_SPUNGE_EVENTG_MULT_NUM);
569     resConf->maxTimerItemNum  = ((FILLP_UINT)g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
570     resConf->maxEpollEventNum = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
571     resConf->maxEpollItemNum  = (FILLP_UINT)(g_resource.common.maxSockNum * FILLP_ITEM_MULT_NUM);
572 }
573 
FtGlobalTimerInit(struct SpungeInstance * inst)574 void FtGlobalTimerInit(struct SpungeInstance *inst)
575 {
576     /* Initialize the Fairness timer */
577     inst->fairTimerNode.cbNode.cb = SpinstLoopFairnessChecker;
578     inst->fairTimerNode.cbNode.arg = (void *)inst;
579     inst->fairTimerNode.interval = SPUNGE_WEIGHT_ADJUST_INTERVAL;
580     FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
581         &inst->fairTimerNode);
582     /* Initialize the MAC timer */
583     inst->macTimerNode.cbNode.cb = SpinstLoopMacTimerChecker;
584     inst->macTimerNode.cbNode.arg = (void *)inst;
585     inst->macTimerNode.interval = FILLP_KEY_REFRESH_TIME;
586     FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
587         &inst->macTimerNode);
588 }
589 
SpungeCheckCallbacks(void)590 static FILLP_INT SpungeCheckCallbacks(void)
591 {
592     return SpungeSysCallRegisted();
593 }
594 
FtInitGlobalUdpIo(void)595 static FILLP_INT FtInitGlobalUdpIo(void)
596 {
597     g_udpIo.readSet = FILLP_FD_CREATE_FD_SET();
598     if (g_udpIo.readSet == FILLP_NULL_PTR) {
599         FILLP_LOGERR("Malloc g_udpIo.readSet failed");
600         return ERR_NORES;
601     }
602 
603     g_udpIo.readableSet = FILLP_FD_CREATE_FD_SET();
604     if (g_udpIo.readableSet == FILLP_NULL_PTR) {
605         FILLP_LOGERR("Malloc g_udpIo.readableSet failed");
606         return ERR_NORES;
607     }
608 
609     HLIST_INIT(&g_udpIo.listenPcbList);
610 
611     return ERR_OK;
612 }
613 
FtInitGlobalInstPool(void)614 static FILLP_INT FtInitGlobalInstPool(void)
615 {
616     g_spunge->insNum = g_spunge->resConf.maxInstNum;
617     g_spunge->instPool = (struct SpungeInstance *)SpungeAlloc(g_spunge->insNum, sizeof(struct SpungeInstance),
618         SPUNGE_ALLOC_TYPE_MALLOC);
619     if (g_spunge->instPool == FILLP_NULL_PTR) {
620         FILLP_LOGERR("Malloc g_spunge->instPool failed");
621         return ERR_NORES;
622     }
623 
624     return ERR_OK;
625 }
626 
FtInitGlobalSockTable(void)627 static FILLP_INT FtInitGlobalSockTable(void)
628 {
629     g_spunge->sockTable = SpungeCreateSockTable(g_spunge->resConf.maxSockNum);
630     if (g_spunge->sockTable == FILLP_NULL_PTR) {
631         FILLP_LOGERR("Malloc g_spunge->sockTable failed");
632         return ERR_NORES;
633     }
634     return ERR_OK;
635 }
636 
FtInitGlobalNetPool(void)637 static FILLP_INT FtInitGlobalNetPool(void)
638 {
639     FILLP_UINT netPoolInitSize = FILLP_CONN_ITEM_INIT_NUM;
640 
641     if (netPoolInitSize > g_spunge->resConf.maxConnNum) {
642         netPoolInitSize = g_spunge->resConf.maxConnNum;
643     }
644 
645     DympoolItemOperaCbSt itemOperaCb = {FILLP_NULL_PTR, FILLP_NULL_PTR};
646     g_spunge->netPool = DympCreatePool((FILLP_INT)netPoolInitSize, (int)g_spunge->resConf.maxConnNum,
647         sizeof(struct FtNetconn), FILLP_TRUE, &itemOperaCb);
648     if (g_spunge->netPool == FILLP_NULL_PTR) {
649         FILLP_LOGERR("Malloc g_spunge->netPool failed");
650         return ERR_NORES;
651     }
652 
653     DympSetConsSafe(g_spunge->netPool, FILLP_TRUE);
654     DympSetProdSafe(g_spunge->netPool, FILLP_FALSE);
655     return ERR_OK;
656 }
657 
FtFreeGlobalUdpIo(void)658 static void FtFreeGlobalUdpIo(void)
659 {
660     if (g_udpIo.readSet != FILLP_NULL_PTR) {
661         FILLP_FD_DESTROY_FD_SET(g_udpIo.readSet);
662         g_udpIo.readSet = FILLP_NULL_PTR;
663     }
664 
665     if (g_udpIo.readableSet != FILLP_NULL_PTR) {
666         FILLP_FD_DESTROY_FD_SET(g_udpIo.readableSet);
667         g_udpIo.readableSet = FILLP_NULL_PTR;
668     }
669 }
670 
FtFreeGlobalSpunge(void)671 static void FtFreeGlobalSpunge(void)
672 {
673     if (g_spunge == FILLP_NULL_PTR) {
674         return;
675     }
676     g_spunge->hasInited = FILLP_FALSE;
677 
678     FtFreeEpollResource();
679 
680     if (g_spunge->sockTable != FILLP_NULL_PTR) {
681         SpungeDestroySockTable(g_spunge->sockTable);
682         g_spunge->sockTable = FILLP_NULL_PTR;
683     }
684 
685     if (g_spunge->netPool != FILLP_NULL_PTR) {
686         DympDestroyPool(g_spunge->netPool);
687         g_spunge->netPool = FILLP_NULL_PTR;
688     }
689 
690     if (g_spunge->instPool != FILLP_NULL_PTR) {
691         SpungeFree(g_spunge->instPool, SPUNGE_ALLOC_TYPE_MALLOC);
692         g_spunge->instPool = FILLP_NULL_PTR;
693     }
694 
695     FtFreeGlobalUdpIo();
696 
697     SpungeFree(g_spunge, SPUNGE_ALLOC_TYPE_MALLOC);
698     g_spunge = FILLP_NULL_PTR;
699 }
700 
FtModuleInit(void)701 static FILLP_INT FtModuleInit(void)
702 {
703     FILLP_INT err;
704     int ret;
705 
706     err = FtInitGlobalUdpIo();
707     if (err != ERR_OK) {
708         return err;
709     }
710 
711     err = FtInitGlobalInstPool();
712     if (err != ERR_OK) {
713         return err;
714     }
715 
716     err = FtInitGlobalSockTable();
717     if (err != ERR_OK) {
718         return err;
719     }
720 
721     err = FtInitGlobalNetPool();
722     if (err != ERR_OK) {
723         return err;
724     }
725 
726     err = FtAllocateEpollResource();
727     if (err != ERR_OK) {
728         FILLP_LOGERR("Alloc epoll resource fail");
729         return err;
730     }
731 
732     ret = SYS_ARCH_SEM_INIT(&g_resDeinitSem, 0);
733     if (ret != FILLP_OK) {
734         FILLP_LOGERR("deinit sem init failed. ");
735         return ERR_NORES;
736     }
737 
738     err = SpungeAllocInstRes();
739     if (err != ERR_OK) {
740         FILLP_LOGERR("Spunge init instances resource fail");
741         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
742         return err;
743     }
744     return ERR_OK;
745 }
746 
FtInit(void)747 FILLP_INT FtInit(void)
748 {
749     FILLP_INT err;
750 
751     FILLP_LOGBUTT("init stack");
752     if (g_spunge != FILLP_NULL_PTR) {
753         FILLP_LOGERR("Init already done");
754         return ERR_STACK_ALREADY_INITIALD;
755     }
756 
757     if (SpungeCheckCallbacks() != ERR_OK) {
758         FILLP_LOGERR("User has not registered system callback functions");
759         return ERR_ADP_SYS_CALLBACK_NOT_REGISTERED;
760     }
761 
762     if (SYS_ARCH_INIT() != ERR_OK) {
763         FILLP_LOGERR("SYS_ARCH_INIT ssp failed");
764         return ERR_NORES;
765     }
766 
767     g_spunge = (struct Spunge *)SpungeAlloc(1, sizeof(struct Spunge), SPUNGE_ALLOC_TYPE_MALLOC);
768     if (g_spunge == FILLP_NULL_PTR) {
769         FILLP_LOGERR("Alloc g_spunge fail");
770         return ERR_NORES;
771     }
772 
773     (void)memset_s(g_spunge, sizeof(struct Spunge), FILLP_NULL_NUM, sizeof(struct Spunge));
774 
775     FtGetSpungeRes(&g_spunge->resConf);
776 
777     err = FtModuleInit();
778     if (err != ERR_OK) {
779         goto ERR_FAIL;
780     }
781 
782     FILLP_LOGBUTT("FillP_init: Spunge mem_zone alloc finished!");
783 
784     FILLP_LOGBUTT("FillP Core init success!");
785     FILLP_LOGBUTT("version " FILLP_VERSION);
786 
787     g_spunge->traceFlag = 0;
788     g_spunge->hasInited = FILLP_TRUE;
789     FILLP_LOGBUTT("Init success");
790     return ERR_OK;
791 
792 ERR_FAIL:
793     FtFreeGlobalSpunge();
794 
795     FILLP_LOGERR("Init fail,clean up");
796     return err;
797 }
798 
799 
800 /* starts from LSB bit position, cnt starts from 0 */
801 #define SPUNGE_SET_BIT(num, pos) ((num) |= (1U << (pos)))
802 
SpungZeroInstance(void)803 static void SpungZeroInstance(void)
804 {
805     FILLP_BOOL hasDeinitBlked = g_spunge->hasDeinitBlked;
806     /* This logic can work for 32 instance in future need to change if more number of
807         instance are supported */
808     /* instance 0 is already closed so mark in bit field. */
809     FILLP_UINT32 instBitClosed = 1;
810     FILLP_UINT32 i;
811     FILLP_UINT32 instAllBit = (FILLP_UINT32)((1U << g_spunge->insNum) - 1);
812 
813     /* In case of blocking FtDestroy 0th instance should post semaphore after all instance threads are exited,
814         and all resources are release. In case on non blocking FtDestroy 0th instance should free free all
815         reasource no need to post semaphore and need to release semaphore also
816         Wait for other instance threads to release respective resource and exit thread */
817     while (instBitClosed != instAllBit) {
818         FILLP_SLEEP_MS(1);
819         for (i = 1; i < g_spunge->insNum; i++) {
820             if (g_spunge->instPool[i].hasInited == 0) {
821                 /* Mark as closed */
822                 SPUNGE_SET_BIT(instBitClosed, i);
823             }
824         }
825     }
826 
827     /* Free all global resource and reset parameters */
828     InitGlobalResourceDefault();
829     InitGlobalAppResourceDefault();
830     FtFreeGlobalSpunge();
831     FillpSysOsDeinit();
832     FillpDfxDoEvtCbSet(FILLP_NULL_PTR, FILLP_NULL_PTR);
833 
834     /* Signal or release deinit sem */
835     if (hasDeinitBlked) {
836         (void)SYS_ARCH_SEM_POST(&g_resDeinitSem);
837     } else {
838         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
839     }
840 }
841 
SpungeDestroyInstance(struct SpungeInstance * inst)842 void SpungeDestroyInstance(struct SpungeInstance *inst)
843 {
844     FILLP_INT instIdx = inst->instIndex;
845 
846     SpungeFreeInstanceResource(inst);
847 
848     if (instIdx == 0) {
849         SpungZeroInstance();
850     }
851 
852     FILLP_LOGERR("Destroy finish index: %d", instIdx);
853 }
854 
FtDestroyInner(FILLP_INT block)855 static void FtDestroyInner(FILLP_INT block)
856 {
857     FILLP_UINT i;
858     FILLP_LOGERR("Destroy stack start, block(%d)", block);
859 
860     if ((g_spunge == FILLP_NULL_PTR) || (!g_spunge->hasInited)) {
861         return;
862     }
863 
864     g_spunge->hasDeinitBlked = (FILLP_BOOL)block;
865 
866     /*
867      * should check g_spunge again,
868      * because the g_spunge may be freed in main thread after all the inst is freed
869      */
870     for (i = 0; g_spunge != FILLP_NULL_PTR && i < g_spunge->insNum; i++) {
871         (void)SYS_ARCH_SEM_WAIT(&g_spunge->instPool[i].threadSem);
872         g_spunge->instPool[i].waitTobeCoreKilled = FILLP_TRUE;
873         (void)SYS_ARCH_SEM_POST(&g_spunge->instPool[i].threadSem);
874     }
875 
876     if ((block) && (SYS_ARCH_SEM_WAIT(&g_resDeinitSem) == 0)) {
877         (void)SYS_ARCH_SEM_DESTROY(&g_resDeinitSem);
878     }
879 
880     FILLP_LOGERR("Destroy finished");
881     return;
882 }
883 
FtDestroy(void)884 void FtDestroy(void)
885 {
886     FtDestroyInner(FILLP_TRUE);
887 }
888 
FtDestroyNonblock(void)889 void FtDestroyNonblock(void)
890 {
891     FtDestroyInner(FILLP_FALSE);
892 }
893 
SpungeHandleMsgCycle(struct SpungeInstance * inst)894 void SpungeHandleMsgCycle(struct SpungeInstance *inst)
895 {
896     struct SpungeMsg *msg = FILLP_NULL_PTR;
897     FILLP_INT ret;
898     FILLP_ULONG i;
899 
900     FILLP_ULONG boxItems = FillpQueueValidOnes(inst->msgBox);
901     if ((boxItems == 0) || (boxItems > inst->msgBox->size)) {
902         boxItems = (FILLP_ULONG)inst->msgBox->size;
903     }
904 
905     for (i = 0; i < boxItems; i++) {
906         ret = FillpQueuePop(inst->msgBox, (void *)&msg, 1);
907         if (ret <= 0) {
908             break;
909         }
910         if (msg->msgType < MSG_TYPE_END) {
911             g_msgHandler[msg->msgType](msg->value, inst);
912         }
913         if (!msg->block) {
914             DympFree(msg);
915         } else {
916             (void)SYS_ARCH_SEM_POST(&msg->syncSem);
917         }
918     }
919 }
920 
SpungeLoopCheckUnsendBox(struct SpungeInstance * inst)921 static void SpungeLoopCheckUnsendBox(struct SpungeInstance *inst)
922 {
923     int j;
924     FillpQueue *boxQueue = inst->unsendBox[0];
925     struct FillpPcbItem **item = inst->unsendItem;
926     struct FtNetconn *netconn = FILLP_NULL_PTR;
927     struct FillpPcb *fpcb = FILLP_NULL_PTR;
928     FILLP_INT count;
929 
930     count = FillpQueuePop(boxQueue, (void *)item, FILLP_UNSEND_BOX_LOOP_CHECK_BURST);
931     if (count <= 0) {
932         return;
933     }
934 
935     for (j = 0; j < count; j++) {
936         netconn = (struct FtNetconn *)item[j]->netconn;
937         if (netconn == FILLP_NULL_PTR) {
938             FillpFreeBufItem(item[j]);
939             continue;
940         }
941 
942         fpcb = &(netconn->pcb->fpcb);
943         HlistAddTail(&fpcb->send.unSendList, &item[j]->unsendNode);
944         (void)FillpFrameAddItem(&fpcb->frameHandle, item[j]);
945         FillpPcbSendFc(fpcb);
946     }
947 }
948 
SpungeDelay(struct SpungeInstance * inst,FILLP_LLONG curTime)949 static FILLP_BOOL SpungeDelay(struct SpungeInstance *inst, FILLP_LLONG curTime)
950 {
951     FILLP_LLONG timePass = curTime - inst->curTime;
952 
953     FILLP_LLONG minSendInterval = (FILLP_LLONG)((FILLP_ULLONG)inst->minSendInterval >> FILLP_TIME_PRECISION);
954     if ((timePass > minSendInterval) && (timePass > FILLP_MINIMUM_SELECT_TIME)) {
955         minSendInterval = 0;
956     } else if (minSendInterval < FILLP_MINIMUM_SELECT_TIME) {
957         minSendInterval = FILLP_MINIMUM_SELECT_TIME;
958     }
959 
960     if (SYS_ARCH_SEM_POST(&inst->threadSem)) {
961         FILLP_LOGWAR("sem wait failed");
962     }
963     if (inst->pcbList.list.size > 0) {
964         (void)SysioSelect((FILLP_INT)minSendInterval);
965     } else {
966         FILLP_SLEEP_MS((FILLP_UINT)FILLP_UTILS_US2MS(minSendInterval));
967     }
968     if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
969         FILLP_LOGWAR("sem wait failed");
970     }
971     return FILLP_TRUE;
972 }
973 
SpungeMainDelay(struct SpungeInstance * inst)974 static FILLP_BOOL SpungeMainDelay(struct SpungeInstance *inst)
975 {
976     FILLP_BOOL isTimeout = FILLP_TRUE;
977     FILLP_LLONG curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
978 
979     if (g_resource.common.fullCpuEnable && (inst->stb.tbFpcbLists.size > 0)) {
980         (void)SysioSelect(0);
981         inst->curTime = curTime;
982         return isTimeout;
983     }
984 
985     if (curTime < inst->curTime) {
986         FILLP_LOGERR("System Time has been changed to past value");
987         return isTimeout;
988     }
989     isTimeout = SpungeDelay(inst, curTime);
990     curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
991     if (curTime < inst->curTime) {
992         FILLP_LOGERR("System Time has been changed to past value\r\n");
993         return isTimeout;
994     }
995 
996     inst->curTime = curTime;
997     inst->minSendInterval = FILLP_MAX_SEND_INTERVAL;
998     return isTimeout;
999 }
1000 
FillpServerRecvRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcRecvTotalRate,FILLP_INT realRecvConn,FILLP_UINT32 * connRecvCalLimit)1001 void FillpServerRecvRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcRecvTotalRate, FILLP_INT realRecvConn,
1002     FILLP_UINT32 *connRecvCalLimit)
1003 {
1004     static FILLP_UINT8 recvStableState = 0;
1005     static FILLP_UINT32 prevRecvTotRate = 0;
1006     static const FILLP_UINT32 maxCalcRecvRate = 0;
1007 
1008     if ((calcRecvTotalRate > (RECV_RATE_PAR_LOW * prevRecvTotRate)) &&
1009         (calcRecvTotalRate < (RECV_RATE_PAT_HIGH * prevRecvTotRate))) {
1010         if (recvStableState < RECV_STATE_THRESHOLD) {
1011             recvStableState++;
1012         }
1013     } else {
1014         if (recvStableState > 0) {
1015             recvStableState--;
1016         }
1017     }
1018 
1019     prevRecvTotRate = calcRecvTotalRate;
1020 
1021     /* Give some space for every connection to grow, since if the network
1022     conditions are varying for every connection */
1023     /* If the sum of rate of all connections is less than the historical max
1024     recv rate, then allow to grow */
1025     if (recvStableState < FILLP_FC_STABLESTATE_VAL_2) {
1026         calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1027     } else if (calcRecvTotalRate < (maxCalcRecvRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1028         /* Give the enough room for the client to grow the bandwidth */
1029         calcRecvTotalRate = maxCalcRecvRate;
1030     } else {
1031         /* Give 5% room for connections to grow, so that it can achieve the max
1032             network goodput */
1033         calcRecvTotalRate = (FILLP_UINT32)(calcRecvTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1034     }
1035 
1036     /* If the sum of received rate of all the connections is more than the configured
1037     rate, then limit it to configured rate.
1038     Rate should not exceed the configured value */
1039     if (calcRecvTotalRate > inst->rateControl.recv.maxRate) {
1040         calcRecvTotalRate = inst->rateControl.recv.maxRate;
1041     }
1042 
1043     if (realRecvConn > 0) {
1044         *connRecvCalLimit = (FILLP_UINT32)((double)calcRecvTotalRate / realRecvConn);
1045     } else {
1046         /* If there are no connections which are active and connected, then set
1047         the rate limit for every connection to maximum limit */
1048         *connRecvCalLimit = inst->rateControl.recv.maxRate;
1049     }
1050     /* End of rate adjustment for Data receiving at server side */
1051 }
1052 
FillpServerSendRateAdjustment(struct SpungeInstance * inst,FILLP_UINT32 calcSendTotalRate,FILLP_INT realSendConn,FILLP_UINT32 * connSendCalLimit)1053 void FillpServerSendRateAdjustment(struct SpungeInstance *inst, FILLP_UINT32 calcSendTotalRate, FILLP_INT realSendConn,
1054     FILLP_UINT32 *connSendCalLimit)
1055 {
1056     static FILLP_UINT8 sendStableState = 0;
1057     static FILLP_UINT32 prevSendTotRate = 0;
1058     static const FILLP_UINT32 maxCalcSendRate = 0;
1059 
1060     if ((calcSendTotalRate > (FILLP_FC_PREV_ADJUSTMENT_RATE_LOW_VAL * prevSendTotRate)) &&
1061         (calcSendTotalRate < (FILLP_FC_PREV_ADJUSTMENT_RATE_HIGH_VAL * prevSendTotRate))) {
1062         if (sendStableState < FILLP_FC_STABLESTATE_VAL_1) {
1063             sendStableState++;
1064         }
1065     } else {
1066         if (sendStableState > 0) {
1067             sendStableState--;
1068         }
1069     }
1070 
1071     prevSendTotRate = calcSendTotalRate;
1072 
1073     /* Give some space for every connection to grow, since if the network
1074     conditions are varying for every connection */
1075     /* If the sum of rate of all connections is less than the historical max
1076     recv rate, then allow to grow */
1077     if (sendStableState < FILLP_FC_STABLESTATE_VAL_2) {
1078         calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_1);
1079     } else if (calcSendTotalRate < (maxCalcSendRate * FILLP_FC_SEND_RATE_MULTIPLE_FACTOR)) {
1080         calcSendTotalRate = maxCalcSendRate;
1081     } else {
1082         /* Give 5% room for connections to grow, so that it can achieve the max
1083             network goodput */
1084         calcSendTotalRate = (FILLP_UINT32)(calcSendTotalRate * FILL_FC_SEND_RATE_TOTAL_2);
1085     }
1086 
1087     /* If the sum of sending rate as acked by PACK for all the connections is
1088     more than the configured rate, then limit it to configured rate.
1089     Rate should not exceed the configured value */
1090     if (calcSendTotalRate > inst->rateControl.send.maxRate) {
1091         calcSendTotalRate = inst->rateControl.send.maxRate;
1092     }
1093 
1094     if (realSendConn > 0) {
1095         *connSendCalLimit = (FILLP_UINT32)((double)calcSendTotalRate / realSendConn);
1096     } else {
1097         /* If there are no connections which are active and connected, then set
1098         the rate limit for every connection to maximum limit */
1099         *connSendCalLimit = inst->rateControl.send.maxRate;
1100     }
1101 
1102     /* End of rate adjustment for Data receiving at server side */
1103 }
1104 
FillpCalculateFairness(struct SpungeInstance * inst)1105 void FillpCalculateFairness(struct SpungeInstance *inst)
1106 {
1107     struct HlistNode *pcbNode = FILLP_NULL_PTR;
1108     struct SpungePcb *pcb = FILLP_NULL_PTR;
1109     FILLP_INT realSendConn = 0;
1110     FILLP_INT realRecvConn = 0;
1111     struct FtNetconn *conn = FILLP_NULL_PTR;
1112     FILLP_UINT8 connState;
1113     FILLP_UINT32 connRecvCalLimit;
1114     FILLP_UINT32 connSendCalLimit;
1115     FILLP_UINT32 calcRecvTotalRate = 0;
1116     FILLP_UINT32 calcSendTotalRate = 0;
1117 
1118     pcbNode = HLIST_FIRST(&inst->pcbList.list);
1119     while (pcbNode != FILLP_NULL_PTR) {
1120         pcb = SpungePcbListNodeEntry(pcbNode);
1121         pcbNode = pcbNode->next;
1122         conn = (struct FtNetconn *)pcb->conn;
1123 
1124         connState = NETCONN_GET_STATE(conn);
1125         if (connState > CONN_STATE_CONNECTED) {
1126             /* Connection state is greater than the connected state, so skip and continue */
1127             continue;
1128         }
1129 
1130         if (pcb->fpcb.statistics.pack.periodRecvRate > FILLP_DEFAULT_MIN_RATE) {
1131             realRecvConn++;
1132         }
1133 
1134         if (pcb->fpcb.statistics.pack.periodSendRate > FILLP_DEFAULT_MIN_RATE) {
1135             realSendConn++;
1136         }
1137 
1138         /* Calculate for Data receiving on server side */
1139         calcRecvTotalRate = calcRecvTotalRate + pcb->fpcb.statistics.pack.periodRecvRate;
1140 
1141         /* Calculate for Data sending from server side */
1142         calcSendTotalRate = calcSendTotalRate + pcb->fpcb.statistics.pack.periodAckByPackRate;
1143     }
1144 
1145     /* Calculation of rate adjustment for Data receiving at server side */
1146     FillpServerRecvRateAdjustment(inst, calcRecvTotalRate, realRecvConn, &connRecvCalLimit);
1147 
1148     /* Calculation of rate adjustment for Data Sending at server side */
1149     FillpServerSendRateAdjustment(inst, calcSendTotalRate, realSendConn, &connSendCalLimit);
1150 
1151     pcbNode = HLIST_FIRST(&inst->pcbList.list);
1152     while (pcbNode != FILLP_NULL_PTR) {
1153         pcb = SpungePcbListNodeEntry(pcbNode);
1154         pcbNode = pcbNode->next;
1155 
1156         /* The rate is set to all the connections irrespective of whether the
1157         connection is idle or not, so that, once the connection starts pumping
1158         the data, it will have enough window to start with.
1159         All this algorithm will adjust the rate of all the connections accordingly */
1160         pcb->rateControl.recv.curMaxRateLimitation = connRecvCalLimit;
1161         pcb->fpcb.recv.oppositeSetRate = pcb->rateControl.recv.curMaxRateLimitation;
1162 
1163         pcb->rateControl.send.curMaxRateLimitation = connSendCalLimit;
1164         pcb->fpcb.send.flowControl.sendRateLimit = pcb->rateControl.send.curMaxRateLimitation;
1165     }
1166 }
1167 
FillpKillCore(void)1168 FILLP_BOOL FillpKillCore(void)
1169 {
1170     FILLP_UINT16 i;
1171     for (i = 0; i < SYS_ARCH_ATOMIC_READ(&g_spunge->sockTable->used); i++) {
1172         struct FtSocket *sock = g_spunge->sockTable->sockPool[i];
1173 
1174         if ((sock->allocState != SOCK_ALLOC_STATE_FREE)) {
1175             return FILLP_FALSE;
1176         }
1177     }
1178 
1179     return FILLP_TRUE;
1180 }
1181 
FillpCheckPcbNackListToSend(void * args)1182 void FillpCheckPcbNackListToSend(void *args)
1183 {
1184     struct SpungePcb *pcb = ((struct FillpPcb *)args)->spcb;
1185     struct Hlist *nackList = FILLP_NULL_PTR;
1186     FILLP_LLONG curTime;
1187     struct HlistNode *node = FILLP_NULL_PTR;
1188     struct HlistNode *tmp = FILLP_NULL_PTR;
1189 
1190     if (pcb == FILLP_NULL_PTR) {
1191         FILLP_LOGERR("spunge_pcb is NULL");
1192         return;
1193     }
1194 
1195     nackList = &(pcb->fpcb.recv.nackList);
1196     if (nackList->size == 0) {
1197         return;
1198     }
1199 
1200     curTime = SYS_ARCH_GET_CUR_TIME_LONGLONG();
1201     node = HLIST_FIRST(nackList);
1202     while (node != FILLP_NULL_PTR) {
1203         struct FillpNackNode *nackNode = FillpNackNodeEntry(node);
1204         FILLP_LLONG timestamp = nackNode->timestamp;
1205         /*
1206         Commenting the timeout check again here, since the timing wheel
1207         will ensure that the time has elapsed before invoking this timeout
1208         function
1209         */
1210         if (curTime > timestamp) {
1211             FILLP_UINT32 startPktNum = nackNode->startPktNum;
1212             FILLP_UINT32 endPktNum = nackNode->endPktNum;
1213             FillpSendNack(&(pcb->fpcb), startPktNum, endPktNum);
1214             tmp = node;
1215             node = node->next;
1216             HlistDelete(nackList, tmp);
1217             SpungeFree(nackNode, SPUNGE_ALLOC_TYPE_CALLOC);
1218             nackNode = FILLP_NULL_PTR;
1219         } else {
1220             break;
1221         }
1222     }
1223 
1224     /* if all the delay NACKs are sent out, then stop the timer */
1225     if (nackList->size > 0) {
1226         FillpEnableDelayNackTimer((struct FillpPcb *)args);
1227     }
1228 }
1229 
SpinstLoopMacTimerChecker(void * p)1230 void SpinstLoopMacTimerChecker(void *p)
1231 {
1232     struct SpungeInstance *inst = (struct SpungeInstance *)p;
1233     /* Check server cookie Refresh */
1234     /* Duration is put as 30minutes, 1 Minute = 60,000 Milliseconds
1235      */
1236     if (((inst->curTime - (FILLP_LLONG)inst->macInfo.switchOverTime) > FILLP_KEY_REFRESH_TIME)) {
1237         FillpMacTimerExpire(&inst->macInfo, inst->curTime);
1238     }
1239     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->macTimerNode)) {
1240         FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->macTimerNode.interval),
1241             &inst->macTimerNode);
1242     }
1243 }
1244 
SpinstLoopFairnessChecker(void * p)1245 void SpinstLoopFairnessChecker(void *p)
1246 {
1247     struct SpungeInstance *inst = (struct SpungeInstance *)p;
1248 
1249     if ((g_resource.flowControl.supportFairness == FILLP_FAIRNESS_TYPE_EQUAL_WEIGHT) &&
1250         (inst->rateControl.connectionNum > 0)) {
1251         inst->rateControl.lastControlTime = inst->curTime;
1252         FillpCalculateFairness(inst);
1253     }
1254 
1255     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&inst->fairTimerNode)) {
1256         FillpTimingWheelAddTimer(&inst->timingWheel, (SYS_ARCH_GET_CUR_TIME_LONGLONG() + inst->fairTimerNode.interval),
1257             &inst->fairTimerNode);
1258     }
1259 }
1260 
SpungeEnableTokenTimer(struct SpungeTokenBucke * stb)1261 void SpungeEnableTokenTimer(struct SpungeTokenBucke *stb)
1262 {
1263     if (!FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1264         FillpTimingWheelAddTimer(&stb->inst->timingWheel, stb->tockenTimerNode.interval + stb->inst->curTime,
1265             &stb->tockenTimerNode);
1266     }
1267 }
1268 
SpungeDisableTokenTimer(struct SpungeTokenBucke * stb)1269 void SpungeDisableTokenTimer(struct SpungeTokenBucke *stb)
1270 {
1271     if (FILLP_TIMING_WHEEL_IS_NODE_ENABLED(&stb->tockenTimerNode)) {
1272         FillpTimingWheelDelTimer(stb->tockenTimerNode.wheel, &stb->tockenTimerNode);
1273     }
1274 }
1275 
SpungeTokenTimerCb(void * p)1276 void SpungeTokenTimerCb(void *p)
1277 {
1278     struct SpungeTokenBucke *stb = (struct SpungeTokenBucke *)p;
1279     struct SpungeInstance *inst = (struct SpungeInstance *)stb->inst;
1280     FILLP_ULLONG bitAdded;
1281     FILLP_UINT32 tokens;
1282 
1283     if (stb->rate != g_resource.flowControl.limitRate) {
1284         FILLP_UINT32 rate_bck = stb->rate;
1285         stb->rate = g_resource.flowControl.limitRate;
1286         stb->tokenCount = 0;
1287 
1288         if (stb->rate != 0) {
1289             stb->tockenTimerNode.interval = (FILLP_UINT32)(
1290                 ((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1291             if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1292                 stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1293             }
1294         } else {
1295             stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1296         }
1297 
1298         FILLP_LOGINF("limite rate change from:%u to:%u, timer_interval:%u, maxPktSize:%u", rate_bck, stb->rate,
1299             stb->tockenTimerNode.interval, stb->maxPktSize);
1300     }
1301 
1302     bitAdded = (FILLP_ULLONG)(inst->curTime - stb->lastTime) * (FILLP_ULLONG)stb->rate;
1303     stb->lastTime = inst->curTime;
1304     tokens = (FILLP_UINT32)((bitAdded / (FILLP_ULLONG)FILLP_BPS_TO_KBPS) >> BIT_MOVE_CNT);
1305     if ((tokens < stb->maxPktSize) || (stb->tokenCount < stb->maxPktSize)) {
1306         stb->tokenCount += tokens;
1307     } else {
1308         stb->tokenCount = tokens;
1309     }
1310 
1311     if (stb->tockenTimerNode.interval != SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1312         SpungeEnableTokenTimer(stb);
1313     }
1314 }
1315 
SpungeItemRouteByToken(struct FillpPcbItem * item,struct FillpPcb * fpcb)1316 FILLP_INT SpungeItemRouteByToken(struct FillpPcbItem *item, struct FillpPcb *fpcb)
1317 {
1318     struct SpungeTokenBucke *stb;
1319     FILLP_INT ret = ERR_OK;
1320 
1321     stb = &fpcb->pcbInst->stb;
1322 
1323     if (stb->tockenTimerNode.interval == SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO) {
1324         SpungeTokenTimerCb(stb);
1325     }
1326     if ((stb->rate == 0) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) { /* no limit or limit -> nolimit */
1327         ret = FillpSendItem(item, fpcb);
1328     } else if ((stb->tokenCount >= (FILLP_UINT32)item->dataLen) && (fpcb->send.itemWaitTokenLists.nodeNum == 0)) {
1329         ret = FillpSendItem(item, fpcb);
1330         if (ret == ERR_OK) {
1331             stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1332         }
1333     } else {
1334         if (SkipListInsert(&fpcb->send.itemWaitTokenLists, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1335             /* this can't be happen */
1336             FILLP_LOGERR("fillp_sock_id:%d Can't add item <%u,%u> to itemWaitTokenLists", FILLP_GET_SOCKET(fpcb)->index,
1337                 item->seqNum, item->dataLen);
1338             FillpFreeBufItem(item);
1339             (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1340 #ifdef SOCK_SEND_SEM
1341             (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1342 #endif /* SOCK_SEND_SEM */
1343         } else {
1344             stb->waitPktCount++;
1345         }
1346     }
1347 
1348     return ret;
1349 }
1350 
SpungeClearItemWaitTokenList(struct SpungeTokenBucke * stb)1351 static void SpungeClearItemWaitTokenList(struct SpungeTokenBucke *stb)
1352 {
1353     struct HlistNode *fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1354     struct FillpPcb *fpcb = FILLP_NULL_PTR;
1355     struct FillpPcbItem *item = FILLP_NULL_PTR;
1356 
1357     while (fpcbNode != FILLP_NULL_PTR) {
1358         fpcb = FillpPcbStbNodeEntry(fpcbNode);
1359         fpcbNode = fpcbNode->next;
1360         item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1361         while (item != FILLP_NULL_PTR) {
1362             stb->waitPktCount--;
1363             /* here item should move to unrecvList, not directly send by udp,
1364                or the sendrate may be over the max send rate */
1365             if (SkipListInsert(&fpcb->send.unrecvList, (void *)item, &item->skipListNode, FILLP_TRUE) != ERR_OK) {
1366                 FillpFreeBufItem(item);
1367                 (void)SYS_ARCH_ATOMIC_INC(&(FILLP_GET_SOCKET(fpcb)->sendEventCount), 1);
1368 #ifdef SOCK_SEND_SEM
1369                 (void)SYS_ARCH_SEM_POST(&fpcb->send.sendSem);
1370 #endif /* SOCK_SEND_SEM */
1371             } else if (item->sendCount > 0) {
1372                 fpcb->send.unrecvRedunListBytes += item->dataLen;
1373             }
1374             item = (struct FillpPcbItem *)SkipListPopValue(&(fpcb->send.itemWaitTokenLists));
1375         }
1376 
1377         if (fpcb->send.unrecvList.nodeNum != 0) {
1378             FillpEnableSendTimer(fpcb);
1379         }
1380     }
1381 
1382     if (stb->waitPktCount != 0) {
1383         FILLP_LOGERR("waitPktCount %llu is not 0", stb->waitPktCount);
1384         stb->waitPktCount = 0;
1385     }
1386     stb->fpcbCur = HLIST_FIRST(&(stb->tbFpcbLists));
1387 }
1388 
SpungeCheckItemWaitTokenList(struct SpungeTokenBucke * stb)1389 void SpungeCheckItemWaitTokenList(struct SpungeTokenBucke *stb)
1390 {
1391     struct HlistNode *fpcbNode = FILLP_NULL_PTR;
1392     struct SkipListNode *node = FILLP_NULL_PTR;
1393     struct FillpPcb *fpcb = FILLP_NULL_PTR;
1394     struct FillpPcbItem *item = FILLP_NULL_PTR;
1395     FILLP_UINT32 fpcbCount = (FILLP_UINT32)stb->tbFpcbLists.size;
1396     FILLP_UINT32 waitListEmptyCount = 0;
1397     FILLP_INT err;
1398 
1399     if (stb->waitPktCount == 0) {
1400         return;
1401     }
1402 
1403     /* stb->rate change from !0 to 0, need to move all item form  itemWaitTokenLists to unSendList */
1404     if (stb->rate == 0) {
1405         SpungeClearItemWaitTokenList(stb);
1406         return;
1407     }
1408 
1409     fpcbNode = stb->fpcbCur;
1410     while ((stb->tokenCount > 0) && (stb->waitPktCount > 0) && (waitListEmptyCount < fpcbCount)) {
1411         if (fpcbNode == FILLP_NULL_PTR) {
1412             fpcbNode = HLIST_FIRST(&(stb->tbFpcbLists));
1413         }
1414 
1415         fpcb = FillpPcbStbNodeEntry(fpcbNode);
1416         node = SkipListGetPop(&(fpcb->send.itemWaitTokenLists));
1417         if (node == FILLP_NULL_PTR) {
1418             fpcbNode = fpcbNode->next;
1419             waitListEmptyCount++;
1420             continue;
1421         }
1422 
1423         item = (struct FillpPcbItem *)node->item;
1424         if (stb->tokenCount < item->dataLen) {
1425             break;
1426         }
1427 
1428         stb->waitPktCount--;
1429         (void)SkipListPopValue(&fpcb->send.itemWaitTokenLists);
1430         err = FillpSendItem(item, fpcb);
1431         if (err == ERR_OK) {
1432             stb->tokenCount -= (FILLP_UINT32)item->dataLen;
1433         }
1434         fpcbNode = fpcbNode->next;
1435         waitListEmptyCount = 0;
1436     }
1437 
1438     stb->fpcbCur = fpcbNode;
1439 }
1440 
SpungeInitTokenBucket(struct SpungeInstance * inst)1441 void SpungeInitTokenBucket(struct SpungeInstance *inst)
1442 {
1443     struct SpungeTokenBucke *stb = &inst->stb;
1444 
1445     stb->inst = inst;
1446     stb->lastTime = inst->curTime;
1447     stb->rate = g_resource.flowControl.limitRate;
1448     stb->waitPktCount = 0;
1449     stb->tokenCount = 0;
1450     stb->maxPktSize = (FILLP_UINT32)g_appResource.flowControl.pktSize;
1451 
1452     stb->fpcbCur = FILLP_NULL_PTR;
1453     HLIST_INIT(&(stb->tbFpcbLists));
1454 
1455     FILLP_TIMING_WHEEL_INIT_NODE(&stb->tockenTimerNode);
1456     stb->tockenTimerNode.cbNode.cb = SpungeTokenTimerCb;
1457     stb->tockenTimerNode.cbNode.arg = (void *)stb;
1458     if (stb->rate != 0) {
1459         stb->tockenTimerNode.interval =
1460             (FILLP_UINT32)(((FILLP_ULLONG)stb->maxPktSize * (FILLP_ULLONG)FILLP_FC_IN_KBPS) / (FILLP_ULLONG)stb->rate);
1461         if (stb->tockenTimerNode.interval > SPUNGE_TOKEN_TIMER_MAX_INTERVAL) {
1462             stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL;
1463         }
1464     } else {
1465         stb->tockenTimerNode.interval = SPUNGE_TOKEN_TIMER_MAX_INTERVAL_RATE_ZERO;
1466     }
1467 
1468     FILLP_LOGINF("limite rate:%u, timer_interval:%u, maxPktSize:%u", stb->rate, stb->tockenTimerNode.interval,
1469         stb->maxPktSize);
1470     SpungeEnableTokenTimer(stb);
1471 }
1472 
SpungeTokenBucketAddFpcb(struct FillpPcb * fpcb)1473 void SpungeTokenBucketAddFpcb(struct FillpPcb *fpcb)
1474 {
1475     struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1476 
1477     if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1478         return;
1479     }
1480 
1481     stb = &fpcb->pcbInst->stb;
1482     if (stb->maxPktSize < (FILLP_UINT32)fpcb->pktSize) {
1483         stb->maxPktSize = (FILLP_UINT32)fpcb->pktSize;
1484     }
1485 
1486     HLIST_INIT_NODE(&(fpcb->stbNode));
1487     HlistAddTail(&stb->tbFpcbLists, &(fpcb->stbNode));
1488     FILLP_LOGINF("fillp_sock_id:%d, maxPktSize:%u,"
1489         "limitRate:%u",
1490         FILLP_GET_SOCKET(fpcb)->index, stb->maxPktSize, stb->rate);
1491 }
1492 
SpungeTokenBucketDelFpcb(struct FillpPcb * fpcb)1493 void SpungeTokenBucketDelFpcb(struct FillpPcb *fpcb)
1494 {
1495     struct HlistNode *node = FILLP_NULL_PTR;
1496     struct SpungeTokenBucke *stb = FILLP_NULL_PTR;
1497 
1498     if ((fpcb == FILLP_NULL_PTR) || (fpcb->pcbInst == FILLP_NULL_PTR)) {
1499         return;
1500     }
1501 
1502     stb = &fpcb->pcbInst->stb;
1503     if ((stb->fpcbCur != FILLP_NULL_PTR) && (stb->fpcbCur == &(fpcb->stbNode))) {
1504         stb->fpcbCur = stb->fpcbCur->next;
1505     }
1506 
1507     node = HLIST_FIRST(&(stb->tbFpcbLists));
1508     while (node != FILLP_NULL_PTR) {
1509         if (&(fpcb->stbNode) == node) {
1510             stb->waitPktCount -= (FILLP_ULLONG)fpcb->send.itemWaitTokenLists.nodeNum;
1511             HlistDelete(&(stb->tbFpcbLists), node);
1512             FILLP_LOGINF("fillp_sock_id:%d, limitRate:%u", FILLP_GET_SOCKET(fpcb)->index, stb->rate);
1513             break;
1514         }
1515         node = node->next;
1516     }
1517 }
1518 
1519 /* Return 1 if still alive , or return 0 */
SpinstLoopCheckAlive(struct SpungeInstance * inst)1520 static int SpinstLoopCheckAlive(struct SpungeInstance *inst)
1521 {
1522     if (inst->waitTobeCoreKilled && FillpKillCore()) {
1523         inst->waitTobeCoreKilled = FILLP_FALSE;
1524         return 0;
1525     }
1526 
1527     return 1;
1528 }
1529 
SpinstLoopRecv(struct SpungeInstance * inst)1530 static void SpinstLoopRecv(struct SpungeInstance *inst)
1531 {
1532     struct HlistNode *osSockNode;
1533     int readable = 1;
1534     osSockNode = HLIST_FIRST(&inst->osSockist);
1535     /* Select doesn't work with sendmmsg/recvmmsg, so in that case it is always
1536     set as 1 */
1537     while (osSockNode != FILLP_NULL_PTR) {
1538         struct SockOsSocket *osSock = SockOsListEntry(osSockNode);
1539         if (!g_resource.udp.supportMmsg) {
1540             readable = SysioIsSockReadable((void *)osSock->ioSock);
1541         }
1542         osSockNode = osSockNode->next;
1543 
1544         if (readable) {
1545             SpungeDoRecvCycle(osSock, inst);
1546         }
1547     }
1548 }
1549 
1550 #if !defined(FILLP_LW_LITEOS)
SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance * inst)1551 static void SpungeSetThreadInfo(FILLP_CONST struct SpungeInstance *inst)
1552 {
1553     FILLP_CHAR threadName[SPUNGE_MAX_THREAD_NAME_LENGTH] = {0};
1554     FILLP_UINT8 random = (FILLP_UINT8)(FILLP_RAND() & 0xFF);
1555     (void)inst;
1556     FILLP_INT ret = sprintf_s(threadName, sizeof(threadName), "%s_%u", "Fillp_core", (FILLP_UINT)random);
1557     if (ret < ERR_OK) {
1558         FILLP_LOGWAR("SpungeInstanceMainThread sprintf_s thread name failed(%d), random(%u)", ret, random);
1559     }
1560     (void)SysSetThreadName(threadName, sizeof(threadName));
1561 
1562 #if defined(FILLP_LINUX)
1563     {
1564         pthread_t self;
1565         self = pthread_self();
1566         FILLP_LOGINF("FillP Core threadId:%ld", self);
1567         /* thread resource will be auto recycled
1568            only this detach set if no other thread try to join it */
1569         if (pthread_detach(self)) {
1570             FILLP_LOGERR("Set Detach fail");
1571         }
1572     }
1573 #elif defined(FILLP_WIN32)
1574     FILLP_LOGBUTT("FillP Core threadId:%d", GetCurrentThreadId());
1575 #endif
1576 }
1577 #endif
1578 
SpungeInstanceMainThread(void * p)1579 void SpungeInstanceMainThread(void *p)
1580 {
1581     struct SpungeInstance *inst = FILLP_NULL_PTR;
1582     FILLP_BOOL isTimeout;
1583 
1584     if (p == FILLP_NULL_PTR) {
1585         FILLP_LOGERR("parameter p is NULL");
1586         return;
1587     }
1588 
1589     inst = (struct SpungeInstance *)p;
1590 #if !defined(FILLP_LW_LITEOS)
1591     SpungeSetThreadInfo(inst);
1592 #endif
1593 
1594     if (SYS_ARCH_SEM_WAIT(&inst->threadSem)) {
1595         FILLP_LOGWAR("sem wait failed");
1596         return;
1597     }
1598     while (inst->hasInited) {
1599         SpungeHandleMsgCycle(inst);
1600         SpungeLoopCheckUnsendBox(inst);
1601         if (!SpinstLoopCheckAlive(inst)) {
1602             break;
1603         }
1604         isTimeout = SpungeMainDelay(inst);
1605         SpinstLoopRecv(inst);
1606 
1607         if (isTimeout == FILLP_TRUE) {
1608             FillpTimingWheelLoopCheck(&inst->timingWheel, inst->curTime);
1609         }
1610 
1611         SpungeCheckItemWaitTokenList(&inst->stb);
1612     }
1613 
1614     SpungeDestroyInstance(inst);
1615 }
1616 
SpungePushRecvdDataToStack(void * arg)1617 void SpungePushRecvdDataToStack(void *arg)
1618 {
1619     struct FillpPcb *pcb = (struct FillpPcb *)arg;
1620     struct FillpPcbItem *item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1621     while (item != FILLP_NULL_PTR) {
1622         FillpDataToStack(pcb, item);
1623         item = SkipListPopValue(&pcb->recv.recvBoxPlaceInOrder);
1624     }
1625 
1626     FillpEnableDataBurstTimer(pcb);
1627 }
1628 
1629 #ifdef __cplusplus
1630 }
1631 #endif
1632