1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SOFTBUS_QUEUE_H
17 #define SOFTBUS_QUEUE_H
18 
19 #include <stdbool.h>
20 #include <stdint.h>
21 #include "securec.h"
22 
23 #include "softbus_adapter_cpu.h"
24 #include "softbus_adapter_atomic.h"
25 
26 #ifdef __cplusplus
27 #if __cplusplus
28 extern "C" {
29 #endif
30 #endif
31 
32 typedef enum {
33     QUEUE_INVAL = -10,
34     QUEUE_FULL,
35     QUEUE_EMPTY
36 } QueueError;
37 
38 /** @brief Lock free ring queue */
39 typedef struct {
40     uint32_t magic;
41     uint32_t unitNum;  /* queue node limit */
42     uint8_t pad[56];   /* cache line pad */
43 
44     /* producer status */
45     struct {
46         uint32_t size;              /* queue size */
47         uint32_t mask;              /* mask(size-1). */
48         volatile uint32_t head;     /* producer head */
49         volatile uint32_t tail;     /* producer tail */
50         uint8_t pad[48];            /* cache line pad */
51     } producer;
52 
53     /* consumer status */
54     struct {
55         uint32_t size;              /* queue size */
56         uint32_t mask;              /* mask(size-1). */
57         volatile uint32_t head;     /* consumer head */
58         volatile uint32_t tail;     /* consumer tail */
59         uint8_t pad[48];            /* cache line pad */
60     } consumer;
61 
62     uintptr_t nodes[0]; /* queue nodes */
63 } LockFreeQueue;
64 
65 extern int32_t QueueSizeCalc(uint32_t unitNum, uint32_t* queueSize);
66 
67 extern int32_t QueueCountGet(const LockFreeQueue* queue, uint32_t* count);
68 
69 extern int32_t QueueInit(LockFreeQueue* queue, uint32_t unitNum);
70 
71 extern LockFreeQueue* CreateQueue(uint32_t unitNum);
72 
QueueIsEmpty(LockFreeQueue * queue)73 static inline int32_t QueueIsEmpty(LockFreeQueue* queue)
74 {
75     uint32_t producerTail;
76     uint32_t consumerTail;
77     uint32_t mask;
78 
79     if (queue == NULL) {
80         return QUEUE_INVAL;
81     }
82 
83     producerTail = queue->producer.tail;
84     consumerTail = queue->consumer.tail;
85     mask = queue->producer.mask;
86 
87     if (((producerTail - consumerTail) & mask) == 0) {
88         return 0;
89     }
90     return -1;
91 }
92 
93 /** @brief Enqueue operation, thread unsafe */
QueueSingleProducerEnqueue(LockFreeQueue * queue,const void * node)94 static inline int32_t QueueSingleProducerEnqueue(LockFreeQueue *queue, const void *node)
95 {
96     uint32_t producerHead;
97     uint32_t producerNext;
98     uint32_t consumerTail;
99     uint32_t availableCount;
100     uint32_t mask;
101 
102     if (queue == NULL || node == NULL) {
103         return QUEUE_INVAL;
104     }
105     mask = queue->producer.mask;
106 
107     producerHead = queue->producer.head;
108     RMB();
109     consumerTail = queue->consumer.tail;
110 
111     /*
112      * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
113      * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
114      *    producerHead < consumerTail + mask - 0xFFFFFFFF
115      * The subtraction of two 32-bit integers results in 32-bit modulo.
116      * Therefore, the availableCount must be between 0 and the queue length.
117      */
118     availableCount = (mask + consumerTail) - producerHead;
119 
120     if (availableCount < 1) {
121         return QUEUE_FULL;
122     }
123 
124     producerNext = producerHead + 1;
125     queue->producer.head = producerNext;
126 
127     queue->nodes[producerHead & mask] = (uintptr_t)node;
128 
129     /*
130      * Make sure that the queue is filled with elements before updating the producer tail.
131      * Prevents problems when the producer tail is updated first:
132      * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
133      *    but the consumer actually reads dirty elements.
134      * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
135      */
136     WMB();
137 
138     queue->producer.tail = producerNext;
139     return 0;
140 }
141 
142 /** @brief Enqueue operation, thread safe */
QueueMultiProducerEnqueue(LockFreeQueue * queue,const void * node)143 static inline int32_t QueueMultiProducerEnqueue(LockFreeQueue* queue, const void* node)
144 {
145     uint32_t producerHead;
146     uint32_t producerNext;
147     uint32_t consumerTail;
148     uint32_t availableCount;
149     bool success = false;
150     uint32_t mask;
151 
152     if (queue == NULL || node == NULL) {
153         return QUEUE_INVAL;
154     }
155 
156     mask = queue->producer.mask;
157     do {
158         producerHead = queue->producer.head;
159         /*
160          * Make sure the producer's head is read before the consumer's tail.
161          * If the consumer tail is read first, then the consumer consumes the queue,and then other producers
162          * produce the queue, the producer header may cross the consumer tail reversely.
163          */
164         RMB();
165         consumerTail = queue->consumer.tail;
166 
167         /*
168          * 1. In normal cases, producerHead > consumerTail and producerHead < consumerTail + mask
169          * 2. If only producerHead is reversed, producerHead > consumerTail - 0xFFFFFFFF and
170          *    producerHead < consumerTail + mask - 0xFFFFFFFF
171          * The subtraction of two 32-bit integers results in 32-bit modulo.
172          * Therefore, the availableCount must be between 0 and the queue length.
173          */
174 
175         availableCount = (mask + consumerTail) - producerHead;
176 
177         if (availableCount < 1) {
178             return QUEUE_FULL;
179         }
180 
181         producerNext = producerHead + 1;
182         success = SoftBusAtomicCmpAndSwap32(&queue->producer.head, producerHead, producerNext);
183     } while (success == false);
184 
185     queue->nodes[producerHead & mask] = (uintptr_t)node;
186 
187     /*
188      * Make sure that the queue is filled with elements before updating the producer tail.
189      * Prevents problems when the producer tail is updated first:
190      * 1. The consumer thinks that the elements in this area have been queued and can be consumed,
191      *    but the consumer actually reads dirty elements.
192      * 2. The process is abnormal. In this case, elements in the memory block in the queue are dirty elements.
193      */
194 
195     WMB();
196 
197     /* Waiting for other producers to complete enqueuing. */
198     while (queue->producer.tail != producerHead) {
199         SoftBusYieldCpu();
200     }
201 
202     queue->producer.tail += 1;
203     return 0;
204 }
205 
206 /** @brief Dequeue operation, thread unsafe */
QueueSingleConsumerDequeue(LockFreeQueue * queue,void ** node)207 static inline int32_t QueueSingleConsumerDequeue(LockFreeQueue* queue, void** node)
208 {
209     uint32_t consumerHead;
210     uint32_t producerTail;
211     uint32_t consumerNext;
212     uint32_t availableCount;
213     uint32_t mask;
214 
215     if (queue == NULL || node == NULL) {
216         return QUEUE_INVAL;
217     }
218     mask = queue->producer.mask;
219 
220     consumerHead = queue->consumer.head;
221 
222     /* Prevent producerTail from being read before consumerHead, causing queue head and tail reversal. */
223     RMB();
224 
225     producerTail = queue->producer.tail;
226 
227     /*
228      * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
229      * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
230      *    producerTail < consumerHead + mask - 0xFFFFFFFF
231      * The subtraction of two 32-bit integers results in 32-bit modulo.
232      * Therefore, the availableCount must be between 0 and the queue length.
233      */
234     availableCount = (producerTail - consumerHead);
235 
236     if (availableCount < 1) {
237         return QUEUE_EMPTY;
238     }
239 
240     consumerNext = consumerHead + 1;
241     queue->consumer.head = consumerNext;
242 
243     /* Prevent the read of queue->nodes before the read of ProdTail. */
244     RMB();
245 
246     *node = (void *)(queue->nodes[consumerHead & mask]);
247 
248     /*
249      * Ensure that the queue element is dequeued before updating the consumer's tail.
250      * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
251      * and can fill in new elements, which actually overwrites the elements that are not dequeued.
252      */
253     RMB();
254 
255     queue->consumer.tail = consumerNext;
256     return 0;
257 }
258 
259 /** @brief Dequeue operation, thread safe */
QueueMultiConsumerDequeue(LockFreeQueue * queue,void ** node)260 static inline int32_t QueueMultiConsumerDequeue(LockFreeQueue *queue, void **node)
261 {
262     bool success = false;
263     uint32_t consumerHead;
264     uint32_t producerTail;
265     uint32_t consumerNext;
266     uint32_t availableCount;
267     uint32_t mask;
268 
269     if (queue == NULL || node == NULL) {
270         return QUEUE_INVAL;
271     }
272     mask = queue->producer.mask;
273 
274     do {
275         consumerHead = queue->consumer.head;
276 
277         /*
278          * Make sure the consumer's head is read before the producer's tail.
279          * If the producer tail is read first, then other consumers consume the queue,
280          * and finally the generator produces the queue, the consumer head may cross the producer tail.
281          */
282         RMB();
283 
284         producerTail = queue->producer.tail;
285 
286         /*
287          * 1. In normal cases, producerTail > consumerHead and producerTail < consumerHead + mask
288          * 2. If only producerTail is reversed, producerTail > consumerHead - 0xFFFFFFFF and
289          *    producerTail < consumerHead + mask - 0xFFFFFFFF
290          * The subtraction of two 32-bit integers results in 32-bit modulo.
291          * Therefore, the availableCount must be between 0 and the queue length.
292          */
293 
294         availableCount = (producerTail - consumerHead);
295 
296         if (availableCount < 1) {
297             return QUEUE_EMPTY;
298         }
299 
300         consumerNext = consumerHead + 1;
301         success = SoftBusAtomicCmpAndSwap32(&queue->consumer.head, consumerHead, consumerNext);
302     } while (success == false);
303 
304     /* Prevent the read of queue->nodes before the read of ProdTail. */
305     RMB();
306 
307     *node = (void *)(queue->nodes[consumerHead & mask]);
308 
309     /*
310      * Ensure that the queue element is dequeued before updating the consumer's tail.
311      * After the consumer tail is updated, the producer considers that the elements in this area have been dequeued
312      * and can fill in new elements, which actually overwrites the elements that are not dequeued.
313      */
314     RMB();
315 
316     /* Waiting for other consumers to finish dequeuing. */
317     while (queue->consumer.tail != consumerHead) {
318         SoftBusYieldCpu();
319     }
320 
321     queue->consumer.tail += 1;
322 
323     return 0;
324 }
325 
326 #ifdef __cplusplus
327 #if __cplusplus
328 }
329 #endif /* __cplusplus */
330 #endif /* __cplusplus */
331 #endif // SOFTBUS_QUEUE_H
332