1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "utils_work_queue.h"
17 
18 #include <pthread.h>
19 #include <stddef.h>
20 #include <sys/prctl.h>
21 
22 #include "securec.h"
23 
24 #include "utils_dslm_list.h"
25 #include "utils_log.h"
26 #include "utils_mem.h"
27 
28 #define RUN 0
29 #define DIE 1
30 
31 typedef struct WorkQueue {
32     ListHead head;
33     pthread_mutex_t mutex;
34     pthread_cond_t cond;
35     volatile int32_t state;
36     uint32_t capacity;
37     uint32_t size;
38     pthread_t pthreadId;
39     const char *name;
40 } WorkQueue;
41 
42 typedef struct {
43     ListNode linkNode;
44     WorkProcess process; // callback func
45     uint32_t dataLen;
46     uint8_t *dataBuff; // user data ptr
47 } Worker;
48 
WorkQueueThread(void * data)49 static void *WorkQueueThread(void *data)
50 {
51     WorkQueue *queue = (WorkQueue *)data;
52     Worker *worker = NULL;
53 
54 #ifndef L0_MINI
55     prctl(PR_SET_NAME, queue->name, 0, 0, 0);
56 #endif
57 
58     (void)pthread_mutex_lock(&queue->mutex);
59     while (queue->state == RUN) {
60         while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
61             pthread_cond_wait(&queue->cond, &queue->mutex);
62         }
63         // need to check again
64         if (queue->state != RUN) {
65             break;
66         }
67 
68         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
69         RemoveListNode(&worker->linkNode);
70         queue->size--;
71 
72         pthread_mutex_unlock(&queue->mutex);
73         worker->process(worker->dataBuff, worker->dataLen);
74         FREE(worker);
75         (void)pthread_mutex_lock(&queue->mutex);
76     }
77 
78     // now the queue is stopped, just remove the nodes.
79     while (!IsEmptyList(&queue->head)) {
80         worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
81         RemoveListNode(&worker->linkNode);
82         queue->size--;
83         FREE(worker);
84     }
85 
86     pthread_mutex_unlock(&queue->mutex);
87     return NULL;
88 }
89 
90 #ifndef L0_MINI
CreateWorkQueue(uint32_t capacity,const char * name)91 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
92 {
93     WorkQueue *queue = MALLOC(sizeof(WorkQueue));
94     if (queue == NULL) {
95         return NULL;
96     }
97     (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
98 
99     InitListHead(&(queue->head));
100     queue->state = RUN;
101     queue->capacity = capacity;
102     queue->size = 0;
103     queue->name = name;
104 
105     int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
106     if (iRet != 0) {
107         FREE(queue);
108         return NULL;
109     }
110 
111     iRet = pthread_cond_init(&queue->cond, NULL);
112     if (iRet != 0) {
113         (void)pthread_mutex_destroy(&(queue->mutex));
114         FREE(queue);
115         return NULL;
116     }
117 
118     iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
119     if (iRet != 0) {
120         (void)pthread_cond_destroy(&(queue->cond));
121         (void)pthread_mutex_destroy(&(queue->mutex));
122         FREE(queue);
123         return NULL;
124     }
125 
126     return queue;
127 }
128 #else
CreateWorkQueue(uint32_t capacity,const char * name)129 WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
130 {
131     pthread_attr_t attr;
132     WorkQueue *queue = MALLOC(sizeof(WorkQueue));
133     if (queue == NULL) {
134         return NULL;
135     }
136     (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));
137 
138     InitListHead(&(queue->head));
139     queue->state = RUN;
140     queue->capacity = capacity;
141     queue->size = 0;
142     queue->name = name;
143 
144     int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
145     if (iRet != 0) {
146         FREE(queue);
147         return NULL;
148     }
149 
150     iRet = pthread_cond_init(&queue->cond, NULL);
151     if (iRet != 0) {
152         (void)pthread_mutex_destroy(&(queue->mutex));
153         FREE(queue);
154         return NULL;
155     }
156 
157     iRet = pthread_attr_init(&attr);
158     if (iRet != 0) {
159         (void)pthread_cond_destroy(&(queue->cond));
160         (void)pthread_mutex_destroy(&(queue->mutex));
161         (void)pthread_attr_destroy(&attr);
162         FREE(queue);
163         return NULL;
164     }
165 
166     iRet = pthread_attr_setstacksize(&attr, 0x10000);
167     if (iRet != 0) {
168         (void)pthread_cond_destroy(&(queue->cond));
169         (void)pthread_mutex_destroy(&(queue->mutex));
170         (void)pthread_attr_destroy(&attr);
171         FREE(queue);
172         return NULL;
173     }
174 
175     iRet = pthread_create(&queue->pthreadId, &attr, WorkQueueThread, queue);
176     if (iRet != 0) {
177         (void)pthread_cond_destroy(&(queue->cond));
178         (void)pthread_mutex_destroy(&(queue->mutex));
179         (void)pthread_attr_destroy(&attr);
180         FREE(queue);
181         return NULL;
182     }
183     (void)pthread_attr_destroy(&attr);
184 
185     return queue;
186 }
187 #endif
188 
DestroyWorkQueue(WorkQueue * queue)189 uint32_t DestroyWorkQueue(WorkQueue *queue)
190 {
191     if (queue == NULL) {
192         return WORK_QUEUE_NULL_PTR;
193     }
194 
195     (void)pthread_mutex_lock(&queue->mutex);
196     queue->state = DIE;
197     int32_t iRet = pthread_cond_broadcast(&queue->cond);
198     if (iRet != 0) {
199         (void)pthread_mutex_unlock(&queue->mutex);
200         return WORK_QUEUE_THREAD_COND_ERR;
201     }
202     (void)pthread_mutex_unlock(&queue->mutex);
203 
204     iRet = pthread_join(queue->pthreadId, NULL);
205     if (iRet != 0) {
206         return WORK_QUEUE_THREAD_JOIN_ERR;
207     }
208 
209     FREE(queue);
210     return WORK_QUEUE_OK;
211 }
212 
QueueWork(WorkQueue * queue,WorkProcess process,uint8_t * data,uint32_t length)213 uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
214 {
215     if ((queue == NULL) || (process == NULL)) {
216         return WORK_QUEUE_NULL_PTR;
217     }
218     if (queue->state != RUN) {
219         return WORK_QUEUE_STATE_ERR;
220     }
221     if (queue->size >= queue->capacity) {
222         return WORK_QUEUE_FULL;
223     }
224 
225     Worker *worker = MALLOC(sizeof(Worker));
226     if (worker == NULL) {
227         return WORK_QUEUE_MALLOC_ERR;
228     }
229     (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));
230 
231     InitListHead(&worker->linkNode);
232     worker->dataLen = length;
233     worker->dataBuff = data;
234     worker->process = process;
235 
236     (void)pthread_mutex_lock(&queue->mutex);
237     AddListNodeBefore(&worker->linkNode, &queue->head);
238     queue->size++;
239 
240     (void)pthread_mutex_unlock(&queue->mutex);
241     (void)pthread_cond_broadcast(&queue->cond);
242     return WORK_QUEUE_OK;
243 }
244