1 /*
2 * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "platform/include/queue.h"
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include "platform/include/list.h"
20 #include "platform/include/mutex.h"
21 #include "platform/include/platform_def.h"
22 #include "platform/include/semaphore.h"
23
24 typedef struct Queue {
25 uint32_t capacity;
26 Mutex *mutex;
27 Semaphore *enqueueSem;
28 Semaphore *dequeueSem;
29 List *list;
30 } QueueInternal;
31
QueueCreate(uint32_t capacity)32 Queue *QueueCreate(uint32_t capacity)
33 {
34 if (capacity == 0) {
35 LOG_WARN("[QueueCreate]queue capacity can't be 0 or less than 0");
36 return NULL;
37 }
38 Queue *queue = (Queue *)calloc(1, (sizeof(Queue)));
39 if (queue != NULL) {
40 queue->capacity = capacity;
41
42 queue->mutex = MutexCreate();
43 if (queue->mutex == NULL) {
44 goto ERROR;
45 }
46 queue->enqueueSem = SemaphoreCreate(capacity);
47 if (queue->enqueueSem == NULL) {
48 goto ERROR;
49 }
50 queue->dequeueSem = SemaphoreCreate(0);
51 if (queue->dequeueSem == NULL) {
52 goto ERROR;
53 }
54 queue->list = ListCreate(NULL);
55 if (queue->list == NULL) {
56 goto ERROR;
57 }
58 }
59 return queue;
60 ERROR:
61 if (queue != NULL) {
62 MutexDelete(queue->mutex);
63 SemaphoreDelete(queue->enqueueSem);
64 SemaphoreDelete(queue->dequeueSem);
65 ListDelete(queue->list);
66 free(queue);
67 }
68 return NULL;
69 }
70
QueueDelete(Queue * queue,NodeDataFreeCb cb)71 void QueueDelete(Queue *queue, NodeDataFreeCb cb)
72 {
73 if (queue == NULL) {
74 return;
75 }
76 if (cb) {
77 int i = 0;
78 ListNode *node = ListGetFirstNode(queue->list);
79 for (; i < ListGetSize(queue->list); i++) {
80 cb(ListGetNodeData(node));
81 node = ListGetNextNode(node);
82 }
83 }
84
85 ListDelete(queue->list);
86 SemaphoreDelete(queue->enqueueSem);
87 SemaphoreDelete(queue->dequeueSem);
88 MutexDelete(queue->mutex);
89 free(queue);
90 }
91
QueueEnqueue(Queue * queue,void * data)92 void QueueEnqueue(Queue *queue, void *data)
93 {
94 ASSERT(queue);
95 ASSERT(data);
96 SemaphoreWait(queue->enqueueSem);
97
98 MutexLock(queue->mutex);
99 ListAddLast(queue->list, data);
100 MutexUnlock(queue->mutex);
101
102 SemaphorePost(queue->dequeueSem);
103 }
104
QueueTryEnqueue(Queue * queue,void * data)105 bool QueueTryEnqueue(Queue *queue, void *data)
106 {
107 ASSERT(queue);
108 ASSERT(data);
109
110 if (SemaphoreTryWait(queue->enqueueSem) != 0) {
111 return false;
112 }
113
114 MutexLock(queue->mutex);
115 ListAddLast(queue->list, data);
116 MutexUnlock(queue->mutex);
117
118 SemaphorePost(queue->dequeueSem);
119
120 return true;
121 }
122
QueueDequeue(Queue * queue)123 void *QueueDequeue(Queue *queue)
124 {
125 ASSERT(queue);
126 void *data = NULL;
127 ListNode *listNode = NULL;
128 SemaphoreWait(queue->dequeueSem);
129
130 MutexLock(queue->mutex);
131 listNode = ListGetFirstNode(queue->list);
132 data = ListGetNodeData(listNode);
133 ListRemoveFirst(queue->list);
134 MutexUnlock(queue->mutex);
135
136 SemaphorePost(queue->enqueueSem);
137
138 return data;
139 }
140
QueueTryDequeue(Queue * queue)141 void *QueueTryDequeue(Queue *queue)
142 {
143 ASSERT(queue);
144 ListNode *listNode = NULL;
145 void *data = NULL;
146
147 if (SemaphoreTryWait(queue->dequeueSem) != 0) {
148 return NULL;
149 }
150
151 MutexLock(queue->mutex);
152 listNode = ListGetFirstNode(queue->list);
153 data = ListGetNodeData(listNode);
154 ListRemoveFirst(queue->list);
155 MutexUnlock(queue->mutex);
156
157 SemaphorePost(queue->enqueueSem);
158
159 return data;
160 }
161
QueueGetEnqueueFd(const Queue * queue)162 int32_t QueueGetEnqueueFd(const Queue *queue)
163 {
164 ASSERT(queue);
165 return SemaphoreGetfd(queue->enqueueSem);
166 }
167
QueueGetDequeueFd(const Queue * queue)168 int32_t QueueGetDequeueFd(const Queue *queue)
169 {
170 ASSERT(queue);
171 return SemaphoreGetfd(queue->dequeueSem);
172 }
173
QueueFlush(Queue * queue,NodeDataFreeCb cb)174 void QueueFlush(Queue* queue, NodeDataFreeCb cb)
175 {
176 ASSERT(queue);
177
178 while (!QueueIsEmpty(queue)) {
179 void* data = QueueTryDequeue(queue);
180 if (cb != NULL) {
181 cb(data);
182 }
183 }
184 }
185
QueueIsEmpty(Queue * queue)186 bool QueueIsEmpty(Queue* queue)
187 {
188 ASSERT(queue);
189 bool ret = false;
190 MutexLock(queue->mutex);
191 ret = ListIsEmpty(queue->list);
192 MutexUnlock(queue->mutex);
193 return ret;
194 }
195
QueueGetSize(Queue * queue)196 int32_t QueueGetSize(Queue *queue)
197 {
198 ASSERT(queue);
199 int32_t ret = 0;
200 MutexLock(queue->mutex);
201 ret = ListGetSize(queue->list);
202 MutexUnlock(queue->mutex);
203 return ret;
204 }
205