1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef SOFTBUS_QUEUE_H
17 #define SOFTBUS_QUEUE_H
18
19 #include <stdbool.h>
20 #include <stdint.h>
21 #include "securec.h"
22
23 #include "softbus_adapter_cpu.h"
24 #include "softbus_adapter_atomic.h"
25
26 #ifdef __cplusplus
27 #if __cplusplus
28 extern "C" {
29 #endif
30 #endif
31
32 typedef enum {
33 QUEUE_INVAL = -10,
34 QUEUE_FULL,
35 QUEUE_EMPTY
36 } QueueError;
37
38 /** @brief Lock free ring queue */
39 typedef struct {
40 uint32_t magic;
41 uint32_t unitNum; /* queue node limit */
42 uint8_t pad[56]; /* cache line pad */
43
44 /* producer status */
45 struct {
46 uint32_t size; /* queue size */
47 uint32_t mask; /* mask(size-1). */
48 volatile uint32_t head; /* producer head */
49 volatile uint32_t tail; /* producer tail */
50 uint8_t pad[48]; /* cache line pad */
51 } producer;
52
53 /* consumer status */
54 struct {
55 uint32_t size; /* queue size */
56 uint32_t mask; /* mask(size-1). */
57 volatile uint32_t head; /* consumer head */
58 volatile uint32_t tail; /* consumer tail */
59 uint8_t pad[48]; /* cache line pad */
60 } consumer;
61
62 uintptr_t nodes[0]; /* queue nodes */
63 } LockFreeQueue;
64
65 extern int32_t QueueSizeCalc(uint32_t unitNum, uint32_t* queueSize);
66
67 extern int32_t QueueCountGet(const LockFreeQueue* queue, uint32_t* count);
68
69 extern int32_t QueueInit(LockFreeQueue* queue, uint32_t unitNum);
70
71 extern LockFreeQueue* CreateQueue(uint32_t unitNum);
72
QueueIsEmpty(LockFreeQueue * queue)73 static inline int32_t QueueIsEmpty(LockFreeQueue* queue)
74 {
75 uint32_t producerTail;
76 uint32_t consumerTail;
77 uint32_t mask;
78
79 if (queue == NULL) {
80 return QUEUE_INVAL;
81 }
82
83 producerTail = queue->producer.tail;
84 consumerTail = queue->consumer.tail;
85 mask = queue->producer.mask;
86
87 if (((producerTail - consumerTail) & mask) == 0) {
88 return 0;
89 }
90 return -1;
91 }
92
93 /** @brief Enqueue operation, thread unsafe */
QueueSingleProducerEnqueue(LockFreeQueue * queue,const void * node)94 static inline int32_t QueueSingleProducerEnqueue(LockFreeQueue *queue, const void *node)
95 {
96 uint32_t producerHead;
97 uint32_t producerNext;
98 uint32_t consumerTail;
99 uint32_t availableCount;
100 uint32_t mask;
101
102 if (queue == NULL || node == NULL) {
103 return QUEUE_INVAL;
104 }
105 mask = queue->producer.mask;
106
107 producerHead = queue->producer.head;
108 RMB();
109 consumerTail = queue->consumer.tail;
110
111 /*
112 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
113 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
114 * producerHead < consumerTail + mask - 0xFFFFFFFF
115 * The subtraction of two 32-bit integers results in 32-bit modulo.
116 * Therefore, the availableCount must be between 0 and the queue length.
117 */
118 availableCount = (mask + consumerTail) - producerHead;
119
120 if (availableCount < 1) {
121 return QUEUE_FULL;
122 }
123
124 producerNext = producerHead + 1;
125 queue->producer.head = producerNext;
126
127 queue->nodes[producerHead & mask] = (uintptr_t)node;
128
129 /*
130 * Make sure that the queue is filled with elements before updating the producer tail.
131 * Prevents problems when the producer tail is updated first:
132 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
133 * but the consumer actually reads dirty elements.
134 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
135 */
136 WMB();
137
138 queue->producer.tail = producerNext;
139 return 0;
140 }
141
142 /** @brief Enqueue operation, thread safe */
QueueMultiProducerEnqueue(LockFreeQueue * queue,const void * node)143 static inline int32_t QueueMultiProducerEnqueue(LockFreeQueue* queue, const void* node)
144 {
145 uint32_t producerHead;
146 uint32_t producerNext;
147 uint32_t consumerTail;
148 uint32_t availableCount;
149 bool success = false;
150 uint32_t mask;
151
152 if (queue == NULL || node == NULL) {
153 return QUEUE_INVAL;
154 }
155
156 mask = queue->producer.mask;
157 do {
158 producerHead = queue->producer.head;
159 /*
160 * Make sure the producer's head is read before the consumer's tail.
161 * If the consumer tail is read first, then the consumer consumes the queue,and then other producers
162 * produce the queue, the producer header may cross the consumer tail reversely.
163 */
164 RMB();
165 consumerTail = queue->consumer.tail;
166
167 /*
168 * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
169 * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
170 * producerHead < consumerTail + mask - 0xFFFFFFFF
171 * The subtraction of two 32-bit integers results in 32-bit modulo.
172 * Therefore, the availableCount must be between 0 and the queue length.
173 */
174
175 availableCount = (mask + consumerTail) - producerHead;
176
177 if (availableCount < 1) {
178 return QUEUE_FULL;
179 }
180
181 producerNext = producerHead + 1;
182 success = SoftBusAtomicCmpAndSwap32(&queue->producer.head, producerHead, producerNext);
183 } while (success == false);
184
185 queue->nodes[producerHead & mask] = (uintptr_t)node;
186
187 /*
188 * Make sure that the queue is filled with elements before updating the producer tail.
189 * Prevents problems when the producer tail is updated first:
190 * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
191 * but the consumer actually reads dirty elements.
192 * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
193 */
194
195 WMB();
196
197 /* Waiting for other producers to complete enqueuing. */
198 while (queue->producer.tail != producerHead) {
199 SoftBusYieldCpu();
200 }
201
202 queue->producer.tail += 1;
203 return 0;
204 }
205
206 /** @brief Dequeue operation, thread unsafe */
QueueSingleConsumerDequeue(LockFreeQueue * queue,void ** node)207 static inline int32_t QueueSingleConsumerDequeue(LockFreeQueue* queue, void** node)
208 {
209 uint32_t consumerHead;
210 uint32_t producerTail;
211 uint32_t consumerNext;
212 uint32_t availableCount;
213 uint32_t mask;
214
215 if (queue == NULL || node == NULL) {
216 return QUEUE_INVAL;
217 }
218 mask = queue->producer.mask;
219
220 consumerHead = queue->consumer.head;
221
222 /* Prevent producerTail from being read before consumerHead, causing queue head and tail reversal. */
223 RMB();
224
225 producerTail = queue->producer.tail;
226
227 /*
228 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
229 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
230 * producerTail < consumerHead + mask - 0xFFFFFFFF
231 * The subtraction of two 32-bit integers results in 32-bit modulo.
232 * Therefore, the availableCount must be between 0 and the queue length.
233 */
234 availableCount = (producerTail - consumerHead);
235
236 if (availableCount < 1) {
237 return QUEUE_EMPTY;
238 }
239
240 consumerNext = consumerHead + 1;
241 queue->consumer.head = consumerNext;
242
243 /* Prevent the read of queue->nodes before the read of ProdTail. */
244 RMB();
245
246 *node = (void *)(queue->nodes[consumerHead & mask]);
247
248 /*
249 * Ensure that the queue element is dequeued before updating the consumer's tail.
250 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
251 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
252 */
253 RMB();
254
255 queue->consumer.tail = consumerNext;
256 return 0;
257 }
258
259 /** @brief Dequeue operation, thread safe */
QueueMultiConsumerDequeue(LockFreeQueue * queue,void ** node)260 static inline int32_t QueueMultiConsumerDequeue(LockFreeQueue *queue, void **node)
261 {
262 bool success = false;
263 uint32_t consumerHead;
264 uint32_t producerTail;
265 uint32_t consumerNext;
266 uint32_t availableCount;
267 uint32_t mask;
268
269 if (queue == NULL || node == NULL) {
270 return QUEUE_INVAL;
271 }
272 mask = queue->producer.mask;
273
274 do {
275 consumerHead = queue->consumer.head;
276
277 /*
278 * Make sure the consumer's head is read before the producer's tail.
279 * If the producer tail is read first, then other consumers consume the queue,
280 * and finally the generator produces the queue, the consumer head may cross the producer tail.
281 */
282 RMB();
283
284 producerTail = queue->producer.tail;
285
286 /*
287 * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
288 * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
289 * producerTail < consumerHead + mask - 0xFFFFFFFF
290 * The subtraction of two 32-bit integers results in 32-bit modulo.
291 * Therefore, the availableCount must be between 0 and the queue length.
292 */
293
294 availableCount = (producerTail - consumerHead);
295
296 if (availableCount < 1) {
297 return QUEUE_EMPTY;
298 }
299
300 consumerNext = consumerHead + 1;
301 success = SoftBusAtomicCmpAndSwap32(&queue->consumer.head, consumerHead, consumerNext);
302 } while (success == false);
303
304 /* Prevent the read of queue->nodes before the read of ProdTail. */
305 RMB();
306
307 *node = (void *)(queue->nodes[consumerHead & mask]);
308
309 /*
310 * Ensure that the queue element is dequeued before updating the consumer's tail.
311 * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
312 * and can fill in new elements, which actually overwrites the elements that are not dequeued.
313 */
314 RMB();
315
316 /* Waiting for other consumers to finish dequeuing. */
317 while (queue->consumer.tail != consumerHead) {
318 SoftBusYieldCpu();
319 }
320
321 queue->consumer.tail += 1;
322
323 return 0;
324 }
325
326 #ifdef __cplusplus
327 #if __cplusplus
328 }
329 #endif /* __cplusplus */
330 #endif /* __cplusplus */
331 #endif // SOFTBUS_QUEUE_H
332