1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "log.h"
17 #include "fillp_function.h"
18 #include "lf_ring.h"
19
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23
24
25 /*
26 * Function : FillpLfRingCalMemSize
27 *
28 * Description : This function will be invoked to calculate the memory size
29 * required for lock-free queue, based on size.
30 *
31 * Input : size - no of items of lock-free queue
32 *
33 * Output : None
34 *
35 * Return : Memory size of lock-free queue
36 */
FillpLfRingCalMemSize(size_t size)37 size_t FillpLfRingCalMemSize(size_t size)
38 {
39 size_t memSize = size * sizeof(void *);
40 if ((memSize == 0) || (memSize / sizeof(void *) != size)) {
41 return 0;
42 }
43
44 memSize = memSize + sizeof(struct FillpLfRing);
45 if (memSize < sizeof(struct FillpLfRing)) {
46 return 0;
47 }
48 return memSize;
49 }
50
51 /*
52 * Function : FillpLfRingInit
53 *
54 * Description : This function will be invoked to init the lock-free queue.
55 *
56 * Input : ring - lock-free queue to be initialized
57 * name - name to be added to the lock-free queue
58 * size - size of lock-free queue
59 *
60 * Output : ring - initialized lock-free queue
61 *
62 * Return : None
63 */
FillpLfRingInit(struct FillpLfRing * ring,char * name,size_t size)64 void FillpLfRingInit(struct FillpLfRing *ring, char *name, size_t size)
65 {
66 FILLP_UNUSED_PARA(name);
67 if (ring == FILLP_NULL_PTR) {
68 return;
69 }
70
71 if (size == 0) {
72 return;
73 }
74
75 ring->size = (FILLP_ULONG)size;
76
77 ring->cons.head = 0;
78 ring->cons.tail = 0;
79
80 ring->prod.head = 0;
81 ring->prod.tail = 0;
82 ring->consSafe = FILLP_TRUE;
83 ring->prodSafe = FILLP_TRUE;
84
85 (void)memset_s(ring->name, sizeof(ring->name), '\0', sizeof(ring->name));
86 }
87
FillpLfRingSetProdSafe(struct FillpLfRing * ring,FILLP_BOOL safe)88 void FillpLfRingSetProdSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
89 {
90 ring->prodSafe = safe;
91 }
92
FillpLfRingSetConsSafe(struct FillpLfRing * ring,FILLP_BOOL safe)93 void FillpLfRingSetConsSafe(struct FillpLfRing *ring, FILLP_BOOL safe)
94 {
95 ring->consSafe = safe;
96 }
97
FillpLfRingMpEnqueueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * prodHead,FILLP_ULONG * prodNext)98 static FILLP_ULONG FillpLfRingMpEnqueueWait(struct FillpLfRing *ring, FILLP_UINT count,
99 FILLP_ULONG *prodHead, FILLP_ULONG *prodNext)
100 {
101 FILLP_ULONG consTail;
102 FILLP_ULONG freeEntries;
103 FILLP_ULONG ret;
104 do {
105 *prodHead = ring->prod.head;
106 consTail = ring->cons.tail;
107
108 sys_arch_compiler_barrier();
109 /* The subtraction is done between two unsigned 32bits value
110 * (the result is always modulo 32 bits even if we have
111 * prod_head > cons_tail). So 'free_entries' is always between 0
112 * and size(ring). */
113 freeEntries = (ring->size + consTail - *prodHead);
114
115 /* check that we have enough room in ring */
116 if (freeEntries == 0) {
117 return 0;
118 }
119 ret = ((freeEntries > count) ? count : freeEntries);
120 *prodNext = *prodHead + ret;
121
122 if (!ring->prodSafe) {
123 ring->prod.head = *prodNext;
124 break;
125 }
126 } while (unlikely(!CAS(&ring->prod.head, *prodHead, *prodNext)));
127
128 return ret;
129 }
130
FillpLfRingMpEnqueue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)131 FillpErrorType FillpLfRingMpEnqueue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
132 {
133 FILLP_ULONG prodHead = 0;
134 FILLP_ULONG prodNext = 0;
135 FILLP_ULONG i;
136 FILLP_ULONG j;
137 FILLP_ULONG rep = 0;
138 FILLP_ULONG ret;
139
140 if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
141 return ERR_PARAM;
142 }
143 /* move prod.head atomically */
144 ret = FillpLfRingMpEnqueueWait(ring, count, &prodHead, &prodNext);
145 if (ret == 0) {
146 return ERR_NOBUFS;
147 }
148
149 /* write entries in ring */
150 for (i = 0, j = 1; i < (FILLP_UINT)ret; i++, j++) {
151 ring->ringCache[(prodHead + j) % ring->size] = dataTable[i];
152 }
153
154 sys_arch_compiler_barrier();
155
156 /*
157 * If there are other enqueues in progress that preceded us,
158 * we need to wait for them to complete
159 */
160 while (unlikely(ring->prod.tail != prodHead)) {
161 FILLP_RTE_PAUSE();
162
163 /* Set FTDP_RING_PAUSE_REP_COUNT to avoid spin too long waiting
164 * for other thread finish. It gives pre-empted thread a chance
165 * to proceed and finish with ring dequeue operation. */
166 #if LF_RING_PAUSE_REP_COUNT
167 if (++rep == LF_RING_PAUSE_REP_COUNT) {
168 rep = 0;
169 (void)SYS_ARCH_SCHED_YIELD();
170 }
171 #endif
172 }
173
174 ring->prod.tail = prodNext;
175
176 FILLP_UNUSED_PARA(rep);
177
178 return (FillpErrorType)ret;
179 }
180
FillpLfRingMcDequeueWait(struct FillpLfRing * ring,FILLP_UINT count,FILLP_ULONG * consHead,FILLP_ULONG * consNext)181 static FILLP_ULONG FillpLfRingMcDequeueWait(struct FillpLfRing *ring, FILLP_UINT count, FILLP_ULONG *consHead,
182 FILLP_ULONG *consNext)
183 {
184 FILLP_ULONG prodTail;
185 FILLP_ULONG entries;
186 FILLP_ULONG ret;
187
188 do {
189 *consHead = ring->cons.head;
190 prodTail = ring->prod.tail;
191 sys_arch_compiler_barrier();
192 /* The subtraction is done between two unsigned 32bits value
193 * (the result is always modulo 32 bits even if we have
194 * cons_head > prod_tail). So 'entries' is always between 0
195 * and size(ring)-1. */
196 entries = (prodTail - *consHead);
197
198 /* Set the actual entries for dequeue */
199 if (entries == 0) {
200 return 0;
201 }
202 ret = ((entries > count) ? count : entries);
203 *consNext = *consHead + ret;
204
205 if (!ring->consSafe) {
206 ring->cons.head = *consNext;
207 break;
208 }
209 } while (unlikely(!CAS(&ring->cons.head, *consHead, *consNext)));
210
211 return ret;
212 }
213
FillpLfRingMcDequeue(struct FillpLfRing * ring,void ** dataTable,FILLP_UINT count)214 FILLP_INT FillpLfRingMcDequeue(struct FillpLfRing *ring, void **dataTable, FILLP_UINT count)
215 {
216 FILLP_ULONG consHead;
217 FILLP_ULONG consNext;
218 FILLP_ULONG rep = 0;
219 FILLP_ULONG i;
220 FILLP_ULONG j;
221 FILLP_ULONG ret;
222
223 if ((ring == FILLP_NULL_PTR) || (dataTable == FILLP_NULL_PTR) || (count == 0)) {
224 return ERR_PARAM;
225 }
226 /* move cons.head atomically */
227 ret = FillpLfRingMcDequeueWait(ring, count, &consHead, &consNext);
228 if (ret == 0) {
229 return ERR_NOBUFS;
230 }
231
232 /* copy in table */
233 for (i = 0, j = 1; i < ret; i++, j++) {
234 dataTable[i] = ring->ringCache[(consHead + j) % ring->size];
235 }
236
237 sys_arch_compiler_barrier();
238
239 /*
240 * If there are other dequeues in progress that preceded us,
241 * we need to wait for them to complete
242 */
243 while (unlikely(ring->cons.tail != consHead)) {
244 FILLP_RTE_PAUSE();
245
246 /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
247 * for other thread finish. It gives pre-empted thread a chance
248 * to proceed and finish with ring dequeue operation. */
249 #if LF_RING_PAUSE_REP_COUNT
250 if (++rep == LF_RING_PAUSE_REP_COUNT) {
251 rep = 0;
252 (void)SYS_ARCH_SCHED_YIELD();
253 }
254 #endif
255 }
256
257 ring->cons.tail = consNext;
258
259 FILLP_UNUSED_PARA(rep);
260
261 return (FILLP_INT)ret;
262 }
263
FillpRingEmpty(const struct FillpLfRing * r)264 FILLP_INT FillpRingEmpty(const struct FillpLfRing *r)
265 {
266 FILLP_ULONG prodTail = r->prod.tail;
267 FILLP_ULONG consTail = r->cons.tail;
268 return (consTail == prodTail);
269 }
270
271
FillpRingFreeEntries(const struct FillpLfRing * r)272 FILLP_INT FillpRingFreeEntries(const struct FillpLfRing *r)
273 {
274 FILLP_ULONG consTail;
275 FILLP_ULONG prodHead;
276 FILLP_ULONG remain;
277 FILLP_INT cnt;
278
279 if (r == FILLP_NULL_PTR) {
280 FILLP_LOGERR("ring is NULL pointer");
281 return 0;
282 }
283
284 consTail = r->cons.tail;
285 prodHead = r->prod.head;
286
287 remain = (r->size + consTail - prodHead);
288 cnt = (int)remain;
289 if (cnt < 0) {
290 FILLP_LOGERR("cnt is %d, real size:%lu", cnt, remain);
291 cnt = 0;
292 }
293
294 return cnt;
295 }
296
FillpRingValidOnes(struct FillpLfRing * r)297 FILLP_ULONG FillpRingValidOnes(struct FillpLfRing *r)
298 {
299 FILLP_ULONG prodTail;
300 FILLP_ULONG consHead;
301 FILLP_ULONG ret;
302 if (r == FILLP_NULL_PTR) {
303 return 0;
304 }
305
306 prodTail = r->prod.tail;
307 consHead = r->cons.head;
308
309 ret = prodTail - consHead;
310 if (((FILLP_SLONG)ret) < 0) {
311 ret = r->size;
312 }
313 return ret;
314 }
315
316 #ifdef __cplusplus
317 }
318 #endif
319