1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "log.h"
17 #include "fillp_function.h"
18 #include "lf_ring.h"
19 
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23 
24 
25 /*
26  * Function    : FillpLfRingCalMemSize
27  *
28  * Description : This function will be invoked to calculate the memory size
29  * required for lock-free queue, based on size.
30  *
31  * Input       : size - no of items of lock-free queue
32  *
33  * Output      : None
34  *
35  * Return      : Memory size of lock-free queue
36  */
FillpLfRingCalMemSize(size_t size)37 size_t FillpLfRingCalMemSize(size_t size)
38 {
39     size_t memSize = size * sizeof(void *);
40     if ((memSize == 0) || (memSize / sizeof(void *) != size)) {
41         return 0;
42     }
43 
44     memSize = memSize + sizeof(struct FillpLfRing);
45     if (memSize < sizeof(struct FillpLfRing)) {
46         return 0;
47     }
48     return memSize;
49 }
50 
51 /*
52  * Function    : FillpLfRingInit
53  *
54  * Description : This function will be invoked to init the lock-free queue.
55  *
56  * Input       : ring - lock-free queue to be initialized
57  * name - name to be added to the lock-free queue
58  * size - size of lock-free queue
59  *
60  * Output      : ring - initialized lock-free queue
61  *
62  * Return      : None
63  */
FillpLfRingInit(struct FillpLfRing * ring,char * name,size_t size)64 void FillpLfRingInit(struct FillpLfRing *ring, char *name, size_t size)
65 {
66     FILLP_UNUSED_PARA(name);
67     if (ring == FILLP_NULL_PTR) {
68         return;
69     }
70 
71     if (size == 0) {
72         return;
73     }
74 
75     ring->size = (FILLP_ULONG)size;
76 
77     ring->cons.head = 0;
78     ring->cons.tail = 0;
79 
80     ring->prod.head = 0;
81     ring->prod.tail = 0;
82     ring->consSafe = FILLP_TRUE;
83     ring->prodSafe = FILLP_TRUE;
84 
85     (void)memset_s(ring->name, sizeof(ring->name), '\0', sizeof(ring->name));
86 }
87 
FillpLfRingSetProdSafe(struct FillpLfRing * ring,FILLP_BOOL safe)88 void FillpLfRingSetProdSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
89 {
90     ring->prodSafe = safe;
91 }
92 
FillpLfRingSetConsSafe(struct FillpLfRing * ring,FILLP_BOOL safe)93 void FillpLfRingSetConsSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
94 {
95     ring->consSafe = safe;
96 }
97 
FillpLfRingMpEnqueueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * prodHead,FILLP_ULONG * prodNext)98 static FILLP_ULONG FillpLfRingMpEnqueueWait(struct FillpLfRing *ring, FILLP_UINT count,
99     FILLP_ULONG *prodHead, FILLP_ULONG *prodNext)
100 {
101     FILLP_ULONG consTail;
102     FILLP_ULONG freeEntries;
103     FILLP_ULONG ret;
104     do {
105         *prodHead = ring->prod.head;
106         consTail = ring->cons.tail;
107 
108         sys_arch_compiler_barrier();
109         /* The subtraction is done between two unsigned 32bits value
110          * (the result is always modulo 32 bits even if we have
111          * prod_head > cons_tail). So 'free_entries' is always between 0
112          * and size(ring). */
113         freeEntries = (ring->size + consTail - *prodHead);
114 
115         /* check that we have enough room in ring */
116         if (freeEntries == 0) {
117             return 0;
118         }
119         ret = ((freeEntries > count) ? count : freeEntries);
120         *prodNext = *prodHead + ret;
121 
122         if (!ring->prodSafe) {
123             ring->prod.head = *prodNext;
124             break;
125         }
126     } while (unlikely(!CAS(&ring->prod.head, *prodHead, *prodNext)));
127 
128     return ret;
129 }
130 
FillpLfRingMpEnqueue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)131 FillpErrorType FillpLfRingMpEnqueue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
132 {
133     FILLP_ULONG prodHead = 0;
134     FILLP_ULONG prodNext = 0;
135     FILLP_ULONG i;
136     FILLP_ULONG j;
137     FILLP_ULONG rep = 0;
138     FILLP_ULONG ret;
139 
140     if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
141         return ERR_PARAM;
142     }
143     /* move prod.head atomically */
144     ret = FillpLfRingMpEnqueueWait(ring, count, &prodHead, &prodNext);
145     if (ret == 0) {
146         return ERR_NOBUFS;
147     }
148 
149     /* write entries in ring */
150     for (i = 0, j = 1; i < (FILLP_UINT)ret; i++, j++) {
151         ring->ringCache[(prodHead + j) % ring->size] = dataTable[i];
152     }
153 
154     sys_arch_compiler_barrier();
155 
156     /*
157      * If there are other enqueues in progress that preceded us,
158      * we need to wait for them to complete
159      */
160     while (unlikely(ring->prod.tail != prodHead)) {
161         FILLP_RTE_PAUSE();
162 
163         /* Set FTDP_RING_PAUSE_REP_COUNT to avoid spin too long waiting
164          * for other thread finish. It gives pre-empted thread a chance
165          * to proceed and finish with ring dequeue operation. */
166 #if LF_RING_PAUSE_REP_COUNT
167         if (++rep == LF_RING_PAUSE_REP_COUNT) {
168             rep = 0;
169             (void)SYS_ARCH_SCHED_YIELD();
170         }
171 #endif
172     }
173 
174     ring->prod.tail = prodNext;
175 
176     FILLP_UNUSED_PARA(rep);
177 
178     return (FillpErrorType)ret;
179 }
180 
FillpLfRingMcDequeueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * consHead,FILLP_ULONG * consNext)181 static FILLP_ULONG FillpLfRingMcDequeueWait(struct FillpLfRing *ring, FILLP_UINT count, FILLP_ULONG *consHead,
182     FILLP_ULONG *consNext)
183 {
184     FILLP_ULONG prodTail;
185     FILLP_ULONG entries;
186     FILLP_ULONG ret;
187 
188     do {
189         *consHead = ring->cons.head;
190         prodTail = ring->prod.tail;
191         sys_arch_compiler_barrier();
192         /* The subtraction is done between two unsigned 32bits value
193          * (the result is always modulo 32 bits even if we have
194          * cons_head > prod_tail). So 'entries' is always between 0
195          * and size(ring)-1. */
196         entries = (prodTail - *consHead);
197 
198         /* Set the actual entries for dequeue */
199         if (entries == 0) {
200             return 0;
201         }
202         ret = ((entries > count) ? count : entries);
203         *consNext = *consHead + ret;
204 
205         if (!ring->consSafe) {
206             ring->cons.head = *consNext;
207             break;
208         }
209     } while (unlikely(!CAS(&ring->cons.head, *consHead, *consNext)));
210 
211     return ret;
212 }
213 
FillpLfRingMcDequeue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)214 FILLP_INT FillpLfRingMcDequeue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
215 {
216     FILLP_ULONG consHead;
217     FILLP_ULONG consNext;
218     FILLP_ULONG rep = 0;
219     FILLP_ULONG i;
220     FILLP_ULONG j;
221     FILLP_ULONG ret;
222 
223     if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
224         return ERR_PARAM;
225     }
226     /* move cons.head atomically */
227     ret = FillpLfRingMcDequeueWait(ring, count, &consHead, &consNext);
228     if (ret == 0) {
229         return ERR_NOBUFS;
230     }
231 
232     /* copy in table */
233     for (i = 0, j = 1; i < ret; i++, j++) {
234         dataTable[i] = ring->ringCache[(consHead + j) % ring->size];
235     }
236 
237     sys_arch_compiler_barrier();
238 
239     /*
240      * If there are other dequeues in progress that preceded us,
241      * we need to wait for them to complete
242      */
243     while (unlikely(ring->cons.tail != consHead)) {
244         FILLP_RTE_PAUSE();
245 
246         /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
247          * for other thread finish. It gives pre-empted thread a chance
248          * to proceed and finish with ring dequeue operation. */
249 #if LF_RING_PAUSE_REP_COUNT
250         if (++rep == LF_RING_PAUSE_REP_COUNT) {
251             rep = 0;
252             (void)SYS_ARCH_SCHED_YIELD();
253         }
254 #endif
255     }
256 
257     ring->cons.tail = consNext;
258 
259     FILLP_UNUSED_PARA(rep);
260 
261     return (FILLP_INT)ret;
262 }
263 
FillpRingEmpty(const struct FillpLfRing * r)264 FILLP_INT FillpRingEmpty(const struct FillpLfRing *r)
265 {
266     FILLP_ULONG prodTail = r->prod.tail;
267     FILLP_ULONG consTail = r->cons.tail;
268     return (consTail == prodTail);
269 }
270 
271 
FillpRingFreeEntries(const struct FillpLfRing * r)272 FILLP_INT FillpRingFreeEntries(const struct FillpLfRing *r)
273 {
274     FILLP_ULONG consTail;
275     FILLP_ULONG prodHead;
276     FILLP_ULONG remain;
277     FILLP_INT cnt;
278 
279     if (r == FILLP_NULL_PTR) {
280         FILLP_LOGERR("ring is NULL pointer");
281         return 0;
282     }
283 
284     consTail = r->cons.tail;
285     prodHead = r->prod.head;
286 
287     remain = (r->size + consTail - prodHead);
288     cnt = (int)remain;
289     if (cnt < 0) {
290         FILLP_LOGERR("cnt is %d, real size:%lu", cnt, remain);
291         cnt = 0;
292     }
293 
294     return cnt;
295 }
296 
FillpRingValidOnes(struct FillpLfRing * r)297 FILLP_ULONG FillpRingValidOnes(struct FillpLfRing *r)
298 {
299     FILLP_ULONG prodTail;
300     FILLP_ULONG consHead;
301     FILLP_ULONG ret;
302     if (r == FILLP_NULL_PTR) {
303         return 0;
304     }
305 
306     prodTail = r->prod.tail;
307     consHead = r->cons.head;
308 
309     ret = prodTail - consHead;
310     if (((FILLP_SLONG)ret) < 0) {
311         ret = r->size;
312     }
313     return ret;
314 }
315 
316 #ifdef __cplusplus
317 }
318 #endif
319