1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "fillp_flow_control.h"
17 #include "res.h"
18 #include "spunge_stack.h"
19 #include "fillp_algorithm.h"
20 #include "fillp_common.h"
21 #include "fillp_output.h"
22 #include "fillp_dfx.h"
23 
24 #ifdef __cplusplus
25 extern "C" {
26 #endif
27 
28 #define FILLP_RECV_RATE_INDEX_FIRST 0
29 #define FILLP_RECV_RATE_INDEX_NEXT 1
30 #define FILLP_RECV_RATE_INDEX_THIRD 2
31 
FillpAlg2GetRedunCount(void * argPcb,void * argItem)32 FILLP_UINT16 FillpAlg2GetRedunCount(void *argPcb, void *argItem)
33 {
34     (void)argPcb;
35     (void)argItem;
36     return 1;
37 }
38 
FillpAlg1GetRedunCount(void * argPcb,void * argItem)39 FILLP_UINT16 FillpAlg1GetRedunCount(void *argPcb, void *argItem)
40 {
41     (void)argPcb;
42     (void)argItem;
43     return 1;
44 }
45 
FillpRecvRateIsBigger(struct FillpRateSample * rateSample,FILLP_UINT32 maxCnt,FILLP_UINT32 indexK)46 static FILLP_BOOL FillpRecvRateIsBigger(struct FillpRateSample *rateSample, FILLP_UINT32 maxCnt,
47     FILLP_UINT32 indexK)
48 {
49     FILLP_UINT32 indexN = indexK - 1;
50     if (rateSample[indexK].v > rateSample[indexN].v) {
51         struct FillpRateSample tmp = rateSample[indexK];
52         rateSample[indexK] = rateSample[indexN];
53         rateSample[indexN] = tmp;
54         return FILLP_TRUE;
55     }
56     FILLP_UNUSED_PARA(maxCnt);
57 
58     return FILLP_FALSE;
59 }
60 
FillpUpdateRecvRateSample(struct FillpMaxRateSample * maxRateSample,FILLP_UINT32 rateValue,FILLP_UINT8 rateI)61 void FillpUpdateRecvRateSample(struct FillpMaxRateSample *maxRateSample, FILLP_UINT32 rateValue,
62     FILLP_UINT8 rateI)
63 {
64     FILLP_UINT32 index;
65     FILLP_UINT32 indexK;
66     struct FillpRateSample val;
67     struct FillpRateSample *rateSample = maxRateSample->rateSample;
68 
69     val.i = rateI;
70     val.v = rateValue;
71 
72     // m->s[] stores few numbers of maximal rates, when do update, if the index is already in s->m[], just resort
73     // or , we need to insert a new one
74     for (index = 0; index < maxRateSample->maxCnt; index++) {
75         if (rateSample[index].i == val.i) {
76             break;
77         }
78     }
79     // Now m->s[index] not include pack_index, means we need to update the m->s[]
80     if (index >= maxRateSample->maxCnt) {
81         for (indexK = 0; indexK < maxRateSample->maxCnt; indexK++) {
82             if (val.v > rateSample[indexK].v) {
83                 struct FillpRateSample tmp = rateSample[indexK];
84                 rateSample[indexK] = val;
85                 val = tmp;
86             }
87         }
88         return;
89     }
90 
91     // Don't need to re-sort the whole list, because the list always be sorted already
92     // such as if val.v > m->s[index].v, then just need to update the upper ones
93     if (rateSample[index].v > val.v) { // The new value is lighter , then float up
94         rateSample[index].v = val.v;
95 
96         for (indexK = index; indexK < maxRateSample->maxCnt - 1; indexK++) {
97             FILLP_UINT32 indexN = indexK + 1;
98             if (rateSample[indexK].v < rateSample[indexN].v) {
99                 struct FillpRateSample tmp = rateSample[indexK];
100                 rateSample[indexK] = rateSample[indexN];
101                 rateSample[indexN] = tmp;
102                 continue;
103             }
104 
105             break;
106         }
107     } else { // The new value is bigger, then sink down
108         rateSample[index].v = val.v;
109 
110         for (indexK = index; indexK > 0; indexK--) {
111             if (FillpRecvRateIsBigger(rateSample, maxRateSample->maxCnt - 1, indexK)) {
112                 continue;
113             }
114 
115             break;
116         }
117     }
118 
119     FILLP_LOGDBG("max expired pack_index %u the max is %u the 2ed max is %u the 3th max is %u,recv rate is %u", rateI,
120         rateSample[FILLP_RECV_RATE_INDEX_FIRST].v, rateSample[FILLP_RECV_RATE_INDEX_NEXT].v,
121         rateSample[FILLP_RECV_RATE_INDEX_THIRD].v, rateValue);
122 }
123 
FillpAppLimitedStatus(struct FillpPcb * pcb,FILLP_UINT32 beginPktNum,FILLP_UINT32 endPktNum)124 FILLP_BOOL FillpAppLimitedStatus(struct FillpPcb *pcb, FILLP_UINT32 beginPktNum, FILLP_UINT32 endPktNum)
125 {
126     struct FillpHashLlist *mapList = &pcb->send.pktSeqMap;
127     struct Hlist *list = FILLP_NULL_PTR;
128     struct HlistNode *pos = FILLP_NULL_PTR;
129     struct FillpPcbItem *item = FILLP_NULL_PTR;
130     FILLP_UINT32 i, j;
131     FILLP_UINT32 mapLevel;
132     FILLP_BOOL appLimited = FILLP_FALSE;
133 
134     if (!FillpNumIsbigger(endPktNum, beginPktNum)) {
135         return appLimited;
136     }
137 
138     for (i = beginPktNum, j = 0; !FillpNumIsbigger(i, endPktNum) && j < mapList->count; i++, j++) {
139         mapLevel = (FILLP_UINT32)(i & mapList->hashModSize);
140         list = &mapList->hashMap[mapLevel];
141         pos = HLIST_FIRST(list);
142         while (pos != FILLP_NULL_PTR) {
143             item = FillpPcbPktSeqMapNodeEntry(pos);
144             if (FillpNumIsbigger(item->pktNum, endPktNum)) {
145                 break;
146             }
147 
148             if ((!FillpNumIsbigger(beginPktNum, item->pktNum)) &&
149                 UTILS_FLAGS_CHECK(item->flags, FILLP_ITEM_FLAGS_APP_LIMITED)) {
150                 appLimited = FILLP_TRUE;
151                 break;
152             }
153             pos = pos->next;
154         }
155 
156         if (appLimited == FILLP_TRUE) {
157             break;
158         }
159     }
160 
161     return appLimited;
162 }
163 
FillpCalSendInterval(struct FillpPcb * pcb)164 void FillpCalSendInterval(struct FillpPcb *pcb)
165 {
166     struct FillpFlowControl *flowControl = &pcb->send.flowControl;
167     struct FtSocket *sock = FILLP_GET_SOCKET(pcb);
168 
169     if (sock->resConf.flowControl.constRateEnbale) {
170         flowControl->sendRate = sock->resConf.flowControl.maxRate;
171     }
172 
173     if (flowControl->sendRate == 0) {
174         flowControl->sendInterval = FILLP_NULL;
175         return;
176     }
177 
178     /* The rate is calculated based on Kbps, hence multiplied by 8 and 1000 */
179     flowControl->sendInterval = (FILLP_LLONG)(pcb->pktSize * FILLP_FC_IN_KBPS * FILLP_FC_IN_BIT);
180     /* need round up to avoid sendInterval is smaller */
181     flowControl->sendInterval = FILLP_DIV_ROUND_UP(flowControl->sendInterval, (FILLP_LLONG)flowControl->sendRate);
182     if (flowControl->sendInterval < FILLP_NULL) {
183         flowControl->sendInterval = FILLP_NULL;
184     }
185 
186     pcb->sendTimerNode.interval = (FILLP_UINT32)(flowControl->sendInterval / FILLP_FC_IN_BIT);
187     FILLP_LOGDBG("Send interval %lld, timer_interval:%u", flowControl->sendInterval, pcb->sendTimerNode.interval);
188 }
189 
FillpFcTailProtected(struct FillpPcb * pcb,struct FillpPktPack * pack)190 void FillpFcTailProtected(struct FillpPcb *pcb, struct FillpPktPack *pack)
191 {
192     struct FillpTailLostProtected *tailProtect = FILLP_NULL_PTR;
193     FILLP_LLONG deltaUs;
194     FILLP_BOOL isDataWaitedEmpty;
195     FILLP_UINT32 infBytes = 0;
196     FILLP_UINT32 infCap = 0;
197 
198     struct FillpPktHead *pktHdr = (struct FillpPktHead *)pack->head;
199     FILLP_UINT32 ackSeqNum = pktHdr->seqNum;
200     FILLP_UINT32 lostSeqNum = pack->lostSeq;
201 
202     FILLP_UINT32 unackNum = pcb->send.unackList.count;
203     FILLP_UINT32 unsendSize =
204         pcb->send.unrecvList.nodeNum + pcb->send.itemWaitTokenLists.nodeNum + pcb->send.redunList.nodeNum;
205     isDataWaitedEmpty = (unsendSize == 0);
206 
207     unsendSize += pcb->send.unSendList.size;
208     isDataWaitedEmpty = (unsendSize == 0) && (SpungeConnCheckUnsendBoxEmpty(FILLP_GET_CONN(pcb)) == FILLP_TRUE);
209 
210     deltaUs = pcb->pcbInst->curTime - pcb->send.lastSendTs;
211 
212     /* ackSeqNum equal to lostSeqNum, peer doesn't recv valid packet which can be give to app */
213     tailProtect = &pcb->send.tailProtect;
214     if ((ackSeqNum == lostSeqNum) && (ackSeqNum == tailProtect->lastPackSeq) && (unackNum != 0) &&
215         (pack->rate == 0) && isDataWaitedEmpty && (pcb->statistics.debugPcb.curPackDeltaUs != 0) &&
216         (deltaUs >= pcb->statistics.pack.packIntervalBackup)) {
217         tailProtect->samePackCount++;
218         if (tailProtect->samePackCount >= tailProtect->judgeThreshold) {
219             FILLP_LOGDTL("fillp_sock_id:%d tail protection active,Threshold:%u,infBytes:%u,"
220                          "infCap:%u,unSendList:%u,unackList:%u, ackSeqNum%u",
221                          FILLP_GET_SOCKET(pcb)->index, tailProtect->judgeThreshold, infBytes, infCap,
222                          pcb->send.unSendList.size, pcb->send.unackList.count, ackSeqNum);
223             FillpMoveUnackToUnrecv(ackSeqNum, pcb->send.seqNum, pcb, FILLP_FALSE);
224             tailProtect->judgeThreshold = tailProtect->maxJudgeThreshold;
225             tailProtect->samePackCount = FILLP_NULL;
226         }
227     } else {
228         pcb->send.tailProtect.judgeThreshold = tailProtect->minJudgeThreshold;
229         pcb->send.tailProtect.samePackCount = FILLP_NULL;
230         pcb->send.tailProtect.lastPackSeq = ackSeqNum;
231     }
232 }
233 
FillpFcPackInput(struct FillpPcb * pcb,struct FillpPktPack * pack)234 void FillpFcPackInput(struct FillpPcb *pcb, struct FillpPktPack *pack)
235 {
236     if (pcb->algFuncs.analysisPack != FILLP_NULL_PTR) {
237         pcb->algFuncs.analysisPack(pcb, (void *)pack);
238     }
239 
240     if (!(pack->flag & FILLP_PACK_FLAG_REQURE_RTT)) {
241         FillpFcTailProtected(pcb, pack);
242     }
243 }
244 
FillpFcNackInput(struct FillpPcb * pcb,struct FillpPktNack * nack)245 void FillpFcNackInput(struct FillpPcb *pcb, struct FillpPktNack *nack)
246 {
247     if (pcb->algFuncs.analysisNack != FILLP_NULL_PTR) {
248         pcb->algFuncs.analysisNack(pcb, (void *)nack);
249     }
250 }
251 
FillpGetAlgFun(struct FillpPcb * pcb)252 static int FillpGetAlgFun(struct FillpPcb *pcb)
253 {
254     switch (pcb->fcAlg) {
255         case FILLP_SUPPORT_ALG_BASE:
256             pcb->algFuncs = g_fillpAlg0;
257             break;
258         case FILLP_SUPPORT_ALG_3:
259             pcb->algFuncs = g_fillpAlg0;
260             break;
261         default:
262             FILLP_LOGERR("flow control not set");
263             return -1;
264     }
265     return 0;
266 }
267 
FillpFcInit(struct FillpPcb * pcb)268 FILLP_INT FillpFcInit(struct FillpPcb *pcb)
269 {
270     FILLP_INT ret = ERR_OK;
271 
272     if (pcb == FILLP_NULL_PTR) {
273         FILLP_LOGERR("pcb null");
274         return -1;
275     }
276 
277     if (pcb->send.slowStart) {
278         /* Sender interval, be used to control the sending rate kbits/s */
279         pcb->send.flowControl.sendRate = FILLP_INITIAL_RATE;
280         FILLP_LOGDBG("slowStart:%u init_rate:%u", pcb->send.slowStart, pcb->send.flowControl.sendRate);
281     } else {
282         /* The maxRate configured by the user is in Mbps, hence multiplied by
283               100 to get the value in Kbps */
284         pcb->send.flowControl.sendRate = g_resource.flowControl.maxRate;
285         FILLP_LOGDBG("slowStart not enabled, init_rate:%u", pcb->send.flowControl.sendRate);
286     }
287 
288     pcb->send.flowControl.sendTime = 0;
289     pcb->send.flowControl.sendRateLimit = 0;
290     pcb->send.flowControl.remainBytes = 0;
291     pcb->send.flowControl.lastCycleNoEnoughData = FILLP_FALSE;
292     pcb->send.flowControl.sendOneNoData = FILLP_TRUE;
293 
294     pcb->send.tailProtect.lastPackSeq = 0;
295     pcb->send.tailProtect.samePackCount = 0;
296 
297     pcb->statistics.keepAlive.lastRecvTime = pcb->pcbInst->curTime;
298     pcb->statistics.keepAlive.lastDataRecvTime = pcb->pcbInst->curTime;
299 
300     pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
301     FILLP_LOGERR("fillp_sock_id:%d, fc alg:%xh, characters:%xh, peer_alg:%xh, peerCharacters:%xh",
302         FILLP_GET_SOCKET(pcb)->index, pcb->fcAlg, pcb->characters, FILLP_GET_CONN(pcb)->peerFcAlgs,
303         FILLP_GET_CONN(pcb)->peerCharacters);
304     if (FillpGetAlgFun(pcb) != 0) {
305         return -1;
306     }
307 
308     FillpAdjustFcParamsByRtt(pcb);
309 
310     if (pcb->algFuncs.fcInit != FILLP_NULL_PTR) {
311         ret = pcb->algFuncs.fcInit(pcb);
312     }
313 
314     return ret;
315 }
316 
FillpFcDeinit(struct FillpPcb * pcb)317 void FillpFcDeinit(struct FillpPcb *pcb)
318 {
319     if (pcb->algFuncs.fcDeinit != FILLP_NULL_PTR) {
320         pcb->algFuncs.fcDeinit(pcb);
321     }
322     pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
323 }
324 
325 /* recv a data packet  */
FillpFcDataInput(struct FillpPcb * pcb,FILLP_CONST struct FillpPktHead * pkt)326 void FillpFcDataInput(struct FillpPcb *pcb, FILLP_CONST struct FillpPktHead *pkt)
327 {
328     pcb->statistics.traffic.totalRecved++;
329 
330     if (pcb->statistics.traffic.totalRecved == 1) {
331         FILLP_LOGDBG("fillp_sock_id:%d "
332                      "First data receiving time =%lld, recv seq num = %u, recv pkt num = %u \r\n",
333             FILLP_GET_SOCKET(pcb)->index, pcb->pcbInst->curTime, pcb->recv.seqNum, pcb->recv.pktNum);
334     }
335 
336     pcb->statistics.traffic.totalRecvedBytes += ((FILLP_UINT32)pkt->dataLen);
337     pcb->statistics.pack.periodRecvedOnes++;
338     pcb->statistics.pack.periodRecvBits += FILLP_FC_VAL_IN_BITS((FILLP_ULLONG)pkt->dataLen);
339 }
340 
341 /* discard a data packet */
FillpFcRecvDropOne(struct FillpPcb * pcb)342 void FillpFcRecvDropOne(struct FillpPcb *pcb)
343 {
344     pcb->statistics.pack.periodDroped++;
345     pcb->statistics.traffic.totalDroped++;
346 }
347 
348 /* recv an packet outof order */
FillpFcRecvOutOfOrder(struct FillpPcb * pcb)349 void FillpFcRecvOutOfOrder(struct FillpPcb *pcb)
350 {
351     pcb->statistics.traffic.totalOutOfOrder++;
352 }
353 
354 /* calculate the lost packets on recv side */
FillpFcRecvLost(struct FillpPcb * pcb,FILLP_UINT32 ones)355 void FillpFcRecvLost(struct FillpPcb *pcb, FILLP_UINT32 ones)
356 {
357     pcb->statistics.traffic.totalRecvLost += ones;
358 }
359 
FillpFcCycle(void * arg)360 void FillpFcCycle(void *arg)
361 {
362     struct FillpPcb *pcb = (struct FillpPcb *)arg;
363     /* The unit of the time returned here is micro seconds */
364     FILLP_LLONG detaTime;
365     struct FtNetconn *netconn = FILLP_GET_CONN(pcb);
366     struct FtSocket *sock;
367 
368     sock = (struct FtSocket *)netconn->sock;
369 
370     if (sock->isListenSock) {
371         FILLP_LOGERR("Listen socket should not hit here!!!");
372         return;
373     }
374 
375     detaTime = pcb->pcbInst->curTime - pcb->statistics.keepAlive.lastRecvTime;
376 
377     if (detaTime >= (FILLP_LLONG)FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime)) {
378         FILLP_LOGERR("Keep alive timeout, fillp_sock_id:%d,detaTime:%lld,keepAliveTime:%u(ms)",
379             sock->index, detaTime, sock->resConf.common.keepAliveTime);
380 
381         FillpDfxSockLinkAndQosNotify(sock, FILLP_DFX_LINK_KEEPALIVE_TIMEOUT);
382         SpungeShutdownSock(sock, SPUNGE_SHUT_RDWR);
383         sock->errEvent |= SPUNGE_EPOLLERR;
384         SpungeEpollEventCallback(sock, (FILLP_INT)SPUNGE_EPOLLIN | (FILLP_INT)SPUNGE_EPOLLERR, 1);
385         SpungeConnClosed(FILLP_GET_CONN(pcb));
386         return;
387     }
388 
389     pcb->keepAliveTimerNode.interval =
390         (FILLP_UINT32)(FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime) - detaTime);
391     FILLP_LOGDTL("update the keep alive interval to %u, fillp_sock_id:%d, detaTime:%lld, keepAliveTime:%u(ms)",
392         pcb->keepAliveTimerNode.interval, sock->index, detaTime, sock->resConf.common.keepAliveTime);
393     FillpEnableKeepAliveTimer(pcb);
394 }
395 
396 #ifdef __cplusplus
397 }
398 #endif
399