1 /*
2  * Copyright (C) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "platform/include/queue.h"
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include "platform/include/list.h"
20 #include "platform/include/mutex.h"
21 #include "platform/include/platform_def.h"
22 #include "platform/include/semaphore.h"
23 
24 typedef struct Queue {
25     uint32_t capacity;
26     Mutex *mutex;
27     Semaphore *enqueueSem;
28     Semaphore *dequeueSem;
29     List *list;
30 } QueueInternal;
31 
QueueCreate(uint32_t capacity)32 Queue *QueueCreate(uint32_t capacity)
33 {
34     if (capacity == 0) {
35         LOG_WARN("[QueueCreate]queue capacity can't be 0 or less than 0");
36         return NULL;
37     }
38     Queue *queue = (Queue *)calloc(1, (sizeof(Queue)));
39     if (queue != NULL) {
40         queue->capacity = capacity;
41 
42         queue->mutex = MutexCreate();
43         if (queue->mutex == NULL) {
44             goto ERROR;
45         }
46         queue->enqueueSem = SemaphoreCreate(capacity);
47         if (queue->enqueueSem == NULL) {
48             goto ERROR;
49         }
50         queue->dequeueSem = SemaphoreCreate(0);
51         if (queue->dequeueSem == NULL) {
52             goto ERROR;
53         }
54         queue->list = ListCreate(NULL);
55         if (queue->list == NULL) {
56             goto ERROR;
57         }
58     }
59     return queue;
60 ERROR:
61     if (queue != NULL) {
62         MutexDelete(queue->mutex);
63         SemaphoreDelete(queue->enqueueSem);
64         SemaphoreDelete(queue->dequeueSem);
65         ListDelete(queue->list);
66         free(queue);
67     }
68     return NULL;
69 }
70 
QueueDelete(Queue * queue,NodeDataFreeCb cb)71 void QueueDelete(Queue *queue, NodeDataFreeCb cb)
72 {
73     if (queue == NULL) {
74         return;
75     }
76     if (cb) {
77         int i = 0;
78         ListNode *node = ListGetFirstNode(queue->list);
79         for (; i < ListGetSize(queue->list); i++) {
80             cb(ListGetNodeData(node));
81             node = ListGetNextNode(node);
82         }
83     }
84 
85     ListDelete(queue->list);
86     SemaphoreDelete(queue->enqueueSem);
87     SemaphoreDelete(queue->dequeueSem);
88     MutexDelete(queue->mutex);
89     free(queue);
90 }
91 
QueueEnqueue(Queue * queue,void * data)92 void QueueEnqueue(Queue *queue, void *data)
93 {
94     ASSERT(queue);
95     ASSERT(data);
96     SemaphoreWait(queue->enqueueSem);
97 
98     MutexLock(queue->mutex);
99     ListAddLast(queue->list, data);
100     MutexUnlock(queue->mutex);
101 
102     SemaphorePost(queue->dequeueSem);
103 }
104 
QueueTryEnqueue(Queue * queue,void * data)105 bool QueueTryEnqueue(Queue *queue, void *data)
106 {
107     ASSERT(queue);
108     ASSERT(data);
109 
110     if (SemaphoreTryWait(queue->enqueueSem) != 0) {
111         return false;
112     }
113 
114     MutexLock(queue->mutex);
115     ListAddLast(queue->list, data);
116     MutexUnlock(queue->mutex);
117 
118     SemaphorePost(queue->dequeueSem);
119 
120     return true;
121 }
122 
QueueDequeue(Queue * queue)123 void *QueueDequeue(Queue *queue)
124 {
125     ASSERT(queue);
126     void *data = NULL;
127     ListNode *listNode = NULL;
128     SemaphoreWait(queue->dequeueSem);
129 
130     MutexLock(queue->mutex);
131     listNode = ListGetFirstNode(queue->list);
132     data = ListGetNodeData(listNode);
133     ListRemoveFirst(queue->list);
134     MutexUnlock(queue->mutex);
135 
136     SemaphorePost(queue->enqueueSem);
137 
138     return data;
139 }
140 
QueueTryDequeue(Queue * queue)141 void *QueueTryDequeue(Queue *queue)
142 {
143     ASSERT(queue);
144     ListNode *listNode = NULL;
145     void *data = NULL;
146 
147     if (SemaphoreTryWait(queue->dequeueSem) != 0) {
148         return NULL;
149     }
150 
151     MutexLock(queue->mutex);
152     listNode = ListGetFirstNode(queue->list);
153     data = ListGetNodeData(listNode);
154     ListRemoveFirst(queue->list);
155     MutexUnlock(queue->mutex);
156 
157     SemaphorePost(queue->enqueueSem);
158 
159     return data;
160 }
161 
QueueGetEnqueueFd(const Queue * queue)162 int32_t QueueGetEnqueueFd(const Queue *queue)
163 {
164     ASSERT(queue);
165     return SemaphoreGetfd(queue->enqueueSem);
166 }
167 
QueueGetDequeueFd(const Queue * queue)168 int32_t QueueGetDequeueFd(const Queue *queue)
169 {
170     ASSERT(queue);
171     return SemaphoreGetfd(queue->dequeueSem);
172 }
173 
QueueFlush(Queue * queue,NodeDataFreeCb cb)174 void QueueFlush(Queue* queue, NodeDataFreeCb cb)
175 {
176     ASSERT(queue);
177 
178     while (!QueueIsEmpty(queue)) {
179         void* data = QueueTryDequeue(queue);
180         if (cb != NULL) {
181             cb(data);
182         }
183     }
184 }
185 
QueueIsEmpty(Queue * queue)186 bool QueueIsEmpty(Queue* queue)
187 {
188     ASSERT(queue);
189     bool ret = false;
190     MutexLock(queue->mutex);
191     ret = ListIsEmpty(queue->list);
192     MutexUnlock(queue->mutex);
193     return ret;
194 }
195 
QueueGetSize(Queue * queue)196 int32_t QueueGetSize(Queue *queue)
197 {
198     ASSERT(queue);
199     int32_t ret = 0;
200     MutexLock(queue->mutex);
201     ret = ListGetSize(queue->list);
202     MutexUnlock(queue->mutex);
203     return ret;
204 }
205