1 /*
2  * Copyright (c) 2020 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "queue_adapter.h"
16 #include <ohos_errno.h>
17 #include <pthread.h>
18 #include "memory_adapter.h"
19 #include "lock_free_queue.h"
20 
21 typedef struct LockFreeBlockQueue LockFreeBlockQueue;
22 struct LockFreeBlockQueue {
23     pthread_mutex_t wMutex;
24     pthread_mutex_t rMutex;
25     pthread_cond_t cond;
26     LockFreeQueue *queue;
27 };
28 
QUEUE_Create(const char * name,int size,int count)29 MQueueId QUEUE_Create(const char *name, int size, int count)
30 {
31     LockFreeBlockQueue *queue = (LockFreeBlockQueue *)SAMGR_Malloc(sizeof(LockFreeBlockQueue));
32     if (queue == NULL) {
33         return NULL;
34     }
35     queue->queue = LFQUE_Create(size, count);
36     if (queue->queue == NULL) {
37         SAMGR_Free(queue);
38         return NULL;
39     }
40     pthread_mutex_init(&queue->wMutex, NULL);
41     pthread_mutex_init(&queue->rMutex, NULL);
42     pthread_cond_init(&queue->cond, NULL);
43     return (MQueueId)queue;
44 }
45 
QUEUE_Put(MQueueId queueId,const void * element,uint8 pri,int timeout)46 int QUEUE_Put(MQueueId queueId, const void *element, uint8 pri, int timeout)
47 {
48     if (queueId == NULL || element == NULL || timeout > 0) {
49         return EC_INVALID;
50     }
51     LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
52     pthread_mutex_lock(&queue->wMutex);
53     int ret = LFQUE_Push(queue->queue, element, pri);
54     pthread_mutex_unlock(&queue->wMutex);
55     pthread_mutex_lock(&queue->rMutex);
56     pthread_cond_broadcast(&queue->cond);
57     pthread_mutex_unlock(&queue->rMutex);
58     return ret;
59 }
60 
QUEUE_Pop(MQueueId queueId,void * element,uint8 * pri,int timeout)61 int QUEUE_Pop(MQueueId queueId, void *element, uint8 *pri, int timeout)
62 {
63     if (queueId == NULL || element == NULL || timeout > 0) {
64         return EC_INVALID;
65     }
66 
67     LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
68     pthread_mutex_lock(&queue->rMutex);
69     while (LFQUE_Pop(queue->queue, element, pri) != EC_SUCCESS) {
70         pthread_cond_wait(&queue->cond, &queue->rMutex);
71     }
72     pthread_mutex_unlock(&queue->rMutex);
73     return EC_SUCCESS;
74 }
75 
QUEUE_Destroy(MQueueId queueId)76 int QUEUE_Destroy(MQueueId queueId)
77 {
78     if (queueId == NULL) {
79         return EC_INVALID;
80     }
81 
82     LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
83     pthread_mutex_destroy(&queue->wMutex);
84     pthread_mutex_destroy(&queue->rMutex);
85     pthread_cond_destroy(&queue->cond);
86     SAMGR_Free(queue->queue);
87     SAMGR_Free(queue);
88     return EC_SUCCESS;
89 }
90