1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "utils_work_queue.h"
17
18 #include <pthread.h>
19 #include <stddef.h>
20 #include <sys/prctl.h>
21
22 #include "securec.h"
23
24 #include "utils_dslm_list.h"
25 #include "utils_log.h"
26 #include "utils_mem.h"
27
28 #define RUN 0
29 #define DIE 1
30
31 typedef struct WorkQueue {
32 ListHead head;
33 pthread_mutex_t mutex;
34 pthread_cond_t cond;
35 volatile int32_t state;
36 uint32_t capacity;
37 uint32_t size;
38 pthread_t pthreadId;
39 const char *name;
40 } WorkQueue;
41
42 typedef struct {
43 ListNode linkNode;
44 WorkProcess process; // callback func
45 uint32_t dataLen;
46 uint8_t *dataBuff; // user data ptr
47 } Worker;
48
WorkQueueThread(void * data)49 static void *WorkQueueThread(void *data)
50 {
51 WorkQueue *queue = (WorkQueue *)data;
52 Worker *worker = NULL;
53
54 #ifndef L0_MINI
55 prctl(PR_SET_NAME, queue->name, 0, 0, 0);
56 #endif
57
58 (void)pthread_mutex_lock(&queue->mutex);
59 while (queue->state == RUN) {
60 while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
61 pthread_cond_wait(&queue->cond, &queue->mutex);
62 }
63 // need to check again
64 if (queue->state != RUN) {
65 break;
66 }
67
68 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
69 RemoveListNode(&worker->linkNode);
70 queue->size--;
71
72 pthread_mutex_unlock(&queue->mutex);
73 worker->process(worker->dataBuff, worker->dataLen);
74 FREE(worker);
75 (void)pthread_mutex_lock(&queue->mutex);
76 }
77
78 // now the queue is stopped, just remove the nodes.
79 while (!IsEmptyList(&queue->head)) {
80 worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
81 RemoveListNode(&worker->linkNode);
82 queue->size--;
83 FREE(worker);
84 }
85
86 pthread_mutex_unlock(&queue->mutex);
87 return NULL;
88 }
89
90 #ifndef L0_MINI
CreateWorkQueue(uint32_t capacity,const char * name)91 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
92 {
93 WorkQueue *queue = MALLOC(sizeof(WorkQueue));
94 if (queue == NULL) {
95 return NULL;
96 }
97 (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
98
99 InitListHead(&(queue->head));
100 queue->state = RUN;
101 queue->capacity = capacity;
102 queue->size = 0;
103 queue->name = name;
104
105 int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
106 if (iRet != 0) {
107 FREE(queue);
108 return NULL;
109 }
110
111 iRet = pthread_cond_init(&queue->cond, NULL);
112 if (iRet != 0) {
113 (void)pthread_mutex_destroy(&(queue->mutex));
114 FREE(queue);
115 return NULL;
116 }
117
118 iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
119 if (iRet != 0) {
120 (void)pthread_cond_destroy(&(queue->cond));
121 (void)pthread_mutex_destroy(&(queue->mutex));
122 FREE(queue);
123 return NULL;
124 }
125
126 return queue;
127 }
128 #else
CreateWorkQueue(uint32_t capacity,const char * name)129 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
130 {
131 pthread_attr_t attr;
132 WorkQueue *queue = MALLOC(sizeof(WorkQueue));
133 if (queue == NULL) {
134 return NULL;
135 }
136 (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
137
138 InitListHead(&(queue->head));
139 queue->state = RUN;
140 queue->capacity = capacity;
141 queue->size = 0;
142 queue->name = name;
143
144 int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
145 if (iRet != 0) {
146 FREE(queue);
147 return NULL;
148 }
149
150 iRet = pthread_cond_init(&queue->cond, NULL);
151 if (iRet != 0) {
152 (void)pthread_mutex_destroy(&(queue->mutex));
153 FREE(queue);
154 return NULL;
155 }
156
157 iRet = pthread_attr_init(&attr);
158 if (iRet != 0) {
159 (void)pthread_cond_destroy(&(queue->cond));
160 (void)pthread_mutex_destroy(&(queue->mutex));
161 (void)pthread_attr_destroy(&attr);
162 FREE(queue);
163 return NULL;
164 }
165
166 iRet = pthread_attr_setstacksize(&attr, 0x10000);
167 if (iRet != 0) {
168 (void)pthread_cond_destroy(&(queue->cond));
169 (void)pthread_mutex_destroy(&(queue->mutex));
170 (void)pthread_attr_destroy(&attr);
171 FREE(queue);
172 return NULL;
173 }
174
175 iRet = pthread_create(&queue->pthreadId, &attr, WorkQueueThread, queue);
176 if (iRet != 0) {
177 (void)pthread_cond_destroy(&(queue->cond));
178 (void)pthread_mutex_destroy(&(queue->mutex));
179 (void)pthread_attr_destroy(&attr);
180 FREE(queue);
181 return NULL;
182 }
183 (void)pthread_attr_destroy(&attr);
184
185 return queue;
186 }
187 #endif
188
DestroyWorkQueue(WorkQueue * queue)189 uint32_t DestroyWorkQueue(WorkQueue *queue)
190 {
191 if (queue == NULL) {
192 return WORK_QUEUE_NULL_PTR;
193 }
194
195 (void)pthread_mutex_lock(&queue->mutex);
196 queue->state = DIE;
197 int32_t iRet = pthread_cond_broadcast(&queue->cond);
198 if (iRet != 0) {
199 (void)pthread_mutex_unlock(&queue->mutex);
200 return WORK_QUEUE_THREAD_COND_ERR;
201 }
202 (void)pthread_mutex_unlock(&queue->mutex);
203
204 iRet = pthread_join(queue->pthreadId, NULL);
205 if (iRet != 0) {
206 return WORK_QUEUE_THREAD_JOIN_ERR;
207 }
208
209 FREE(queue);
210 return WORK_QUEUE_OK;
211 }
212
QueueWork(WorkQueue * queue,WorkProcess process,uint8_t * data,uint32_t length)213 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
214 {
215 if ((queue == NULL) || (process == NULL)) {
216 return WORK_QUEUE_NULL_PTR;
217 }
218 if (queue->state != RUN) {
219 return WORK_QUEUE_STATE_ERR;
220 }
221 if (queue->size >= queue->capacity) {
222 return WORK_QUEUE_FULL;
223 }
224
225 Worker *worker = MALLOC(sizeof(Worker));
226 if (worker == NULL) {
227 return WORK_QUEUE_MALLOC_ERR;
228 }
229 (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
230
231 InitListHead(&worker->linkNode);
232 worker->dataLen = length;
233 worker->dataBuff = data;
234 worker->process = process;
235
236 (void)pthread_mutex_lock(&queue->mutex);
237 AddListNodeBefore(&worker->linkNode, &queue->head);
238 queue->size++;
239
240 (void)pthread_mutex_unlock(&queue->mutex);
241 (void)pthread_cond_broadcast(&queue->cond);
242 return WORK_QUEUE_OK;
243 }
244