1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "fillp_flow_control.h"
17 #include "res.h"
18 #include "spunge_stack.h"
19 #include "fillp_algorithm.h"
20 #include "fillp_common.h"
21 #include "fillp_output.h"
22 #include "fillp_dfx.h"
23
24 #ifdef __cplusplus
25 extern "C" {
26 #endif
27
28 #define FILLP_RECV_RATE_INDEX_FIRST 0
29 #define FILLP_RECV_RATE_INDEX_NEXT 1
30 #define FILLP_RECV_RATE_INDEX_THIRD 2
31
FillpAlg2GetRedunCount(void * argPcb,void * argItem)32 FILLP_UINT16 FillpAlg2GetRedunCount(void *argPcb, void *argItem)
33 {
34 (void)argPcb;
35 (void)argItem;
36 return 1;
37 }
38
FillpAlg1GetRedunCount(void * argPcb,void * argItem)39 FILLP_UINT16 FillpAlg1GetRedunCount(void *argPcb, void *argItem)
40 {
41 (void)argPcb;
42 (void)argItem;
43 return 1;
44 }
45
FillpRecvRateIsBigger(struct FillpRateSample * rateSample,FILLP_UINT32 maxCnt,FILLP_UINT32 indexK)46 static FILLP_BOOL FillpRecvRateIsBigger(struct FillpRateSample *rateSample, FILLP_UINT32 maxCnt,
47 FILLP_UINT32 indexK)
48 {
49 FILLP_UINT32 indexN = indexK - 1;
50 if (rateSample[indexK].v > rateSample[indexN].v) {
51 struct FillpRateSample tmp = rateSample[indexK];
52 rateSample[indexK] = rateSample[indexN];
53 rateSample[indexN] = tmp;
54 return FILLP_TRUE;
55 }
56 FILLP_UNUSED_PARA(maxCnt);
57
58 return FILLP_FALSE;
59 }
60
FillpUpdateRecvRateSample(struct FillpMaxRateSample * maxRateSample,FILLP_UINT32 rateValue,FILLP_UINT8 rateI)61 void FillpUpdateRecvRateSample(struct FillpMaxRateSample *maxRateSample, FILLP_UINT32 rateValue,
62 FILLP_UINT8 rateI)
63 {
64 FILLP_UINT32 index;
65 FILLP_UINT32 indexK;
66 struct FillpRateSample val;
67 struct FillpRateSample *rateSample = maxRateSample->rateSample;
68
69 val.i = rateI;
70 val.v = rateValue;
71
72 // m->s[] stores few numbers of maximal rates, when do update, if the index is already in s->m[], just resort
73 // or , we need to insert a new one
74 for (index = 0; index < maxRateSample->maxCnt; index++) {
75 if (rateSample[index].i == val.i) {
76 break;
77 }
78 }
79 // Now m->s[index] not include pack_index, means we need to update the m->s[]
80 if (index >= maxRateSample->maxCnt) {
81 for (indexK = 0; indexK < maxRateSample->maxCnt; indexK++) {
82 if (val.v > rateSample[indexK].v) {
83 struct FillpRateSample tmp = rateSample[indexK];
84 rateSample[indexK] = val;
85 val = tmp;
86 }
87 }
88 return;
89 }
90
91 // Don't need to re-sort the whole list, because the list always be sorted already
92 // such as if val.v > m->s[index].v, then just need to update the upper ones
93 if (rateSample[index].v > val.v) { // The new value is lighter , then float up
94 rateSample[index].v = val.v;
95
96 for (indexK = index; indexK < maxRateSample->maxCnt - 1; indexK++) {
97 FILLP_UINT32 indexN = indexK + 1;
98 if (rateSample[indexK].v < rateSample[indexN].v) {
99 struct FillpRateSample tmp = rateSample[indexK];
100 rateSample[indexK] = rateSample[indexN];
101 rateSample[indexN] = tmp;
102 continue;
103 }
104
105 break;
106 }
107 } else { // The new value is bigger, then sink down
108 rateSample[index].v = val.v;
109
110 for (indexK = index; indexK > 0; indexK--) {
111 if (FillpRecvRateIsBigger(rateSample, maxRateSample->maxCnt - 1, indexK)) {
112 continue;
113 }
114
115 break;
116 }
117 }
118
119 FILLP_LOGDBG("max expired pack_index %u the max is %u the 2ed max is %u the 3th max is %u,recv rate is %u", rateI,
120 rateSample[FILLP_RECV_RATE_INDEX_FIRST].v, rateSample[FILLP_RECV_RATE_INDEX_NEXT].v,
121 rateSample[FILLP_RECV_RATE_INDEX_THIRD].v, rateValue);
122 }
123
FillpAppLimitedStatus(struct FillpPcb * pcb,FILLP_UINT32 beginPktNum,FILLP_UINT32 endPktNum)124 FILLP_BOOL FillpAppLimitedStatus(struct FillpPcb *pcb, FILLP_UINT32 beginPktNum, FILLP_UINT32 endPktNum)
125 {
126 struct FillpHashLlist *mapList = &pcb->send.pktSeqMap;
127 struct Hlist *list = FILLP_NULL_PTR;
128 struct HlistNode *pos = FILLP_NULL_PTR;
129 struct FillpPcbItem *item = FILLP_NULL_PTR;
130 FILLP_UINT32 i, j;
131 FILLP_UINT32 mapLevel;
132 FILLP_BOOL appLimited = FILLP_FALSE;
133
134 if (!FillpNumIsbigger(endPktNum, beginPktNum)) {
135 return appLimited;
136 }
137
138 for (i = beginPktNum, j = 0; !FillpNumIsbigger(i, endPktNum) && j < mapList->count; i++, j++) {
139 mapLevel = (FILLP_UINT32)(i & mapList->hashModSize);
140 list = &mapList->hashMap[mapLevel];
141 pos = HLIST_FIRST(list);
142 while (pos != FILLP_NULL_PTR) {
143 item = FillpPcbPktSeqMapNodeEntry(pos);
144 if (FillpNumIsbigger(item->pktNum, endPktNum)) {
145 break;
146 }
147
148 if ((!FillpNumIsbigger(beginPktNum, item->pktNum)) &&
149 UTILS_FLAGS_CHECK(item->flags, FILLP_ITEM_FLAGS_APP_LIMITED)) {
150 appLimited = FILLP_TRUE;
151 break;
152 }
153 pos = pos->next;
154 }
155
156 if (appLimited == FILLP_TRUE) {
157 break;
158 }
159 }
160
161 return appLimited;
162 }
163
FillpCalSendInterval(struct FillpPcb * pcb)164 void FillpCalSendInterval(struct FillpPcb *pcb)
165 {
166 struct FillpFlowControl *flowControl = &pcb->send.flowControl;
167 struct FtSocket *sock = FILLP_GET_SOCKET(pcb);
168
169 if (sock->resConf.flowControl.constRateEnbale) {
170 flowControl->sendRate = sock->resConf.flowControl.maxRate;
171 }
172
173 if (flowControl->sendRate == 0) {
174 flowControl->sendInterval = FILLP_NULL;
175 return;
176 }
177
178 /* The rate is calculated based on Kbps, hence multiplied by 8 and 1000 */
179 flowControl->sendInterval = (FILLP_LLONG)(pcb->pktSize * FILLP_FC_IN_KBPS * FILLP_FC_IN_BIT);
180 /* need round up to avoid sendInterval is smaller */
181 flowControl->sendInterval = FILLP_DIV_ROUND_UP(flowControl->sendInterval, (FILLP_LLONG)flowControl->sendRate);
182 if (flowControl->sendInterval < FILLP_NULL) {
183 flowControl->sendInterval = FILLP_NULL;
184 }
185
186 pcb->sendTimerNode.interval = (FILLP_UINT32)(flowControl->sendInterval / FILLP_FC_IN_BIT);
187 FILLP_LOGDBG("Send interval %lld, timer_interval:%u", flowControl->sendInterval, pcb->sendTimerNode.interval);
188 }
189
FillpFcTailProtected(struct FillpPcb * pcb,struct FillpPktPack * pack)190 void FillpFcTailProtected(struct FillpPcb *pcb, struct FillpPktPack *pack)
191 {
192 struct FillpTailLostProtected *tailProtect = FILLP_NULL_PTR;
193 FILLP_LLONG deltaUs;
194 FILLP_BOOL isDataWaitedEmpty;
195 FILLP_UINT32 infBytes = 0;
196 FILLP_UINT32 infCap = 0;
197
198 struct FillpPktHead *pktHdr = (struct FillpPktHead *)pack->head;
199 FILLP_UINT32 ackSeqNum = pktHdr->seqNum;
200 FILLP_UINT32 lostSeqNum = pack->lostSeq;
201
202 FILLP_UINT32 unackNum = pcb->send.unackList.count;
203 FILLP_UINT32 unsendSize =
204 pcb->send.unrecvList.nodeNum + pcb->send.itemWaitTokenLists.nodeNum + pcb->send.redunList.nodeNum;
205 isDataWaitedEmpty = (unsendSize == 0);
206
207 unsendSize += pcb->send.unSendList.size;
208 isDataWaitedEmpty = (unsendSize == 0) && (SpungeConnCheckUnsendBoxEmpty(FILLP_GET_CONN(pcb)) == FILLP_TRUE);
209
210 deltaUs = pcb->pcbInst->curTime - pcb->send.lastSendTs;
211
212 /* ackSeqNum equal to lostSeqNum, peer doesn't recv valid packet which can be give to app */
213 tailProtect = &pcb->send.tailProtect;
214 if ((ackSeqNum == lostSeqNum) && (ackSeqNum == tailProtect->lastPackSeq) && (unackNum != 0) &&
215 (pack->rate == 0) && isDataWaitedEmpty && (pcb->statistics.debugPcb.curPackDeltaUs != 0) &&
216 (deltaUs >= pcb->statistics.pack.packIntervalBackup)) {
217 tailProtect->samePackCount++;
218 if (tailProtect->samePackCount >= tailProtect->judgeThreshold) {
219 FILLP_LOGDTL("fillp_sock_id:%d tail protection active,Threshold:%u,infBytes:%u,"
220 "infCap:%u,unSendList:%u,unackList:%u, ackSeqNum%u",
221 FILLP_GET_SOCKET(pcb)->index, tailProtect->judgeThreshold, infBytes, infCap,
222 pcb->send.unSendList.size, pcb->send.unackList.count, ackSeqNum);
223 FillpMoveUnackToUnrecv(ackSeqNum, pcb->send.seqNum, pcb, FILLP_FALSE);
224 tailProtect->judgeThreshold = tailProtect->maxJudgeThreshold;
225 tailProtect->samePackCount = FILLP_NULL;
226 }
227 } else {
228 pcb->send.tailProtect.judgeThreshold = tailProtect->minJudgeThreshold;
229 pcb->send.tailProtect.samePackCount = FILLP_NULL;
230 pcb->send.tailProtect.lastPackSeq = ackSeqNum;
231 }
232 }
233
FillpFcPackInput(struct FillpPcb * pcb,struct FillpPktPack * pack)234 void FillpFcPackInput(struct FillpPcb *pcb, struct FillpPktPack *pack)
235 {
236 if (pcb->algFuncs.analysisPack != FILLP_NULL_PTR) {
237 pcb->algFuncs.analysisPack(pcb, (void *)pack);
238 }
239
240 if (!(pack->flag & FILLP_PACK_FLAG_REQURE_RTT)) {
241 FillpFcTailProtected(pcb, pack);
242 }
243 }
244
FillpFcNackInput(struct FillpPcb * pcb,struct FillpPktNack * nack)245 void FillpFcNackInput(struct FillpPcb *pcb, struct FillpPktNack *nack)
246 {
247 if (pcb->algFuncs.analysisNack != FILLP_NULL_PTR) {
248 pcb->algFuncs.analysisNack(pcb, (void *)nack);
249 }
250 }
251
FillpGetAlgFun(struct FillpPcb * pcb)252 static int FillpGetAlgFun(struct FillpPcb *pcb)
253 {
254 switch (pcb->fcAlg) {
255 case FILLP_SUPPORT_ALG_BASE:
256 pcb->algFuncs = g_fillpAlg0;
257 break;
258 case FILLP_SUPPORT_ALG_3:
259 pcb->algFuncs = g_fillpAlg0;
260 break;
261 default:
262 FILLP_LOGERR("flow control not set");
263 return -1;
264 }
265 return 0;
266 }
267
FillpFcInit(struct FillpPcb * pcb)268 FILLP_INT FillpFcInit(struct FillpPcb *pcb)
269 {
270 FILLP_INT ret = ERR_OK;
271
272 if (pcb == FILLP_NULL_PTR) {
273 FILLP_LOGERR("pcb null");
274 return -1;
275 }
276
277 if (pcb->send.slowStart) {
278 /* Sender interval, be used to control the sending rate kbits/s */
279 pcb->send.flowControl.sendRate = FILLP_INITIAL_RATE;
280 FILLP_LOGDBG("slowStart:%u init_rate:%u", pcb->send.slowStart, pcb->send.flowControl.sendRate);
281 } else {
282 /* The maxRate configured by the user is in Mbps, hence multiplied by
283 100 to get the value in Kbps */
284 pcb->send.flowControl.sendRate = g_resource.flowControl.maxRate;
285 FILLP_LOGDBG("slowStart not enabled, init_rate:%u", pcb->send.flowControl.sendRate);
286 }
287
288 pcb->send.flowControl.sendTime = 0;
289 pcb->send.flowControl.sendRateLimit = 0;
290 pcb->send.flowControl.remainBytes = 0;
291 pcb->send.flowControl.lastCycleNoEnoughData = FILLP_FALSE;
292 pcb->send.flowControl.sendOneNoData = FILLP_TRUE;
293
294 pcb->send.tailProtect.lastPackSeq = 0;
295 pcb->send.tailProtect.samePackCount = 0;
296
297 pcb->statistics.keepAlive.lastRecvTime = pcb->pcbInst->curTime;
298 pcb->statistics.keepAlive.lastDataRecvTime = pcb->pcbInst->curTime;
299
300 pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
301 FILLP_LOGERR("fillp_sock_id:%d, fc alg:%xh, characters:%xh, peer_alg:%xh, peerCharacters:%xh",
302 FILLP_GET_SOCKET(pcb)->index, pcb->fcAlg, pcb->characters, FILLP_GET_CONN(pcb)->peerFcAlgs,
303 FILLP_GET_CONN(pcb)->peerCharacters);
304 if (FillpGetAlgFun(pcb) != 0) {
305 return -1;
306 }
307
308 FillpAdjustFcParamsByRtt(pcb);
309
310 if (pcb->algFuncs.fcInit != FILLP_NULL_PTR) {
311 ret = pcb->algFuncs.fcInit(pcb);
312 }
313
314 return ret;
315 }
316
FillpFcDeinit(struct FillpPcb * pcb)317 void FillpFcDeinit(struct FillpPcb *pcb)
318 {
319 if (pcb->algFuncs.fcDeinit != FILLP_NULL_PTR) {
320 pcb->algFuncs.fcDeinit(pcb);
321 }
322 pcb->send.flowControl.fcAlg = FILLP_NULL_PTR;
323 }
324
325 /* recv a data packet */
FillpFcDataInput(struct FillpPcb * pcb,FILLP_CONST struct FillpPktHead * pkt)326 void FillpFcDataInput(struct FillpPcb *pcb, FILLP_CONST struct FillpPktHead *pkt)
327 {
328 pcb->statistics.traffic.totalRecved++;
329
330 if (pcb->statistics.traffic.totalRecved == 1) {
331 FILLP_LOGDBG("fillp_sock_id:%d "
332 "First data receiving time =%lld, recv seq num = %u, recv pkt num = %u \r\n",
333 FILLP_GET_SOCKET(pcb)->index, pcb->pcbInst->curTime, pcb->recv.seqNum, pcb->recv.pktNum);
334 }
335
336 pcb->statistics.traffic.totalRecvedBytes += ((FILLP_UINT32)pkt->dataLen);
337 pcb->statistics.pack.periodRecvedOnes++;
338 pcb->statistics.pack.periodRecvBits += FILLP_FC_VAL_IN_BITS((FILLP_ULLONG)pkt->dataLen);
339 }
340
341 /* discard a data packet */
FillpFcRecvDropOne(struct FillpPcb * pcb)342 void FillpFcRecvDropOne(struct FillpPcb *pcb)
343 {
344 pcb->statistics.pack.periodDroped++;
345 pcb->statistics.traffic.totalDroped++;
346 }
347
348 /* recv an packet outof order */
FillpFcRecvOutOfOrder(struct FillpPcb * pcb)349 void FillpFcRecvOutOfOrder(struct FillpPcb *pcb)
350 {
351 pcb->statistics.traffic.totalOutOfOrder++;
352 }
353
354 /* calculate the lost packets on recv side */
FillpFcRecvLost(struct FillpPcb * pcb,FILLP_UINT32 ones)355 void FillpFcRecvLost(struct FillpPcb *pcb, FILLP_UINT32 ones)
356 {
357 pcb->statistics.traffic.totalRecvLost += ones;
358 }
359
FillpFcCycle(void * arg)360 void FillpFcCycle(void *arg)
361 {
362 struct FillpPcb *pcb = (struct FillpPcb *)arg;
363 /* The unit of the time returned here is micro seconds */
364 FILLP_LLONG detaTime;
365 struct FtNetconn *netconn = FILLP_GET_CONN(pcb);
366 struct FtSocket *sock;
367
368 sock = (struct FtSocket *)netconn->sock;
369
370 if (sock->isListenSock) {
371 FILLP_LOGERR("Listen socket should not hit here!!!");
372 return;
373 }
374
375 detaTime = pcb->pcbInst->curTime - pcb->statistics.keepAlive.lastRecvTime;
376
377 if (detaTime >= (FILLP_LLONG)FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime)) {
378 FILLP_LOGERR("Keep alive timeout, fillp_sock_id:%d,detaTime:%lld,keepAliveTime:%u(ms)",
379 sock->index, detaTime, sock->resConf.common.keepAliveTime);
380
381 FillpDfxSockLinkAndQosNotify(sock, FILLP_DFX_LINK_KEEPALIVE_TIMEOUT);
382 SpungeShutdownSock(sock, SPUNGE_SHUT_RDWR);
383 sock->errEvent |= SPUNGE_EPOLLERR;
384 SpungeEpollEventCallback(sock, (FILLP_INT)SPUNGE_EPOLLIN | (FILLP_INT)SPUNGE_EPOLLERR, 1);
385 SpungeConnClosed(FILLP_GET_CONN(pcb));
386 return;
387 }
388
389 pcb->keepAliveTimerNode.interval =
390 (FILLP_UINT32)(FILLP_UTILS_MS2US((FILLP_LLONG)sock->resConf.common.keepAliveTime) - detaTime);
391 FILLP_LOGDTL("update the keep alive interval to %u, fillp_sock_id:%d, detaTime:%lld, keepAliveTime:%u(ms)",
392 pcb->keepAliveTimerNode.interval, sock->index, detaTime, sock->resConf.common.keepAliveTime);
393 FillpEnableKeepAliveTimer(pcb);
394 }
395
396 #ifdef __cplusplus
397 }
398 #endif
399