1 /*
2 * Copyright (c) 2020 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "queue_adapter.h"
16 #include <ohos_errno.h>
17 #include <pthread.h>
18 #include "memory_adapter.h"
19 #include "lock_free_queue.h"
20
21 typedef struct LockFreeBlockQueue LockFreeBlockQueue;
22 struct LockFreeBlockQueue {
23 pthread_mutex_t wMutex;
24 pthread_mutex_t rMutex;
25 pthread_cond_t cond;
26 LockFreeQueue *queue;
27 };
28
QUEUE_Create(const char * name,int size,int count)29 MQueueId QUEUE_Create(const char *name, int size, int count)
30 {
31 LockFreeBlockQueue *queue = (LockFreeBlockQueue *)SAMGR_Malloc(sizeof(LockFreeBlockQueue));
32 if (queue == NULL) {
33 return NULL;
34 }
35 queue->queue = LFQUE_Create(size, count);
36 if (queue->queue == NULL) {
37 SAMGR_Free(queue);
38 return NULL;
39 }
40 pthread_mutex_init(&queue->wMutex, NULL);
41 pthread_mutex_init(&queue->rMutex, NULL);
42 pthread_cond_init(&queue->cond, NULL);
43 return (MQueueId)queue;
44 }
45
QUEUE_Put(MQueueId queueId,const void * element,uint8 pri,int timeout)46 int QUEUE_Put(MQueueId queueId, const void *element, uint8 pri, int timeout)
47 {
48 if (queueId == NULL || element == NULL || timeout > 0) {
49 return EC_INVALID;
50 }
51 LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
52 pthread_mutex_lock(&queue->wMutex);
53 int ret = LFQUE_Push(queue->queue, element, pri);
54 pthread_mutex_unlock(&queue->wMutex);
55 pthread_mutex_lock(&queue->rMutex);
56 pthread_cond_broadcast(&queue->cond);
57 pthread_mutex_unlock(&queue->rMutex);
58 return ret;
59 }
60
QUEUE_Pop(MQueueId queueId,void * element,uint8 * pri,int timeout)61 int QUEUE_Pop(MQueueId queueId, void *element, uint8 *pri, int timeout)
62 {
63 if (queueId == NULL || element == NULL || timeout > 0) {
64 return EC_INVALID;
65 }
66
67 LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
68 pthread_mutex_lock(&queue->rMutex);
69 while (LFQUE_Pop(queue->queue, element, pri) != EC_SUCCESS) {
70 pthread_cond_wait(&queue->cond, &queue->rMutex);
71 }
72 pthread_mutex_unlock(&queue->rMutex);
73 return EC_SUCCESS;
74 }
75
QUEUE_Destroy(MQueueId queueId)76 int QUEUE_Destroy(MQueueId queueId)
77 {
78 if (queueId == NULL) {
79 return EC_INVALID;
80 }
81
82 LockFreeBlockQueue *queue = (LockFreeBlockQueue *)queueId;
83 pthread_mutex_destroy(&queue->wMutex);
84 pthread_mutex_destroy(&queue->rMutex);
85 pthread_cond_destroy(&queue->cond);
86 SAMGR_Free(queue->queue);
87 SAMGR_Free(queue);
88 return EC_SUCCESS;
89 }
90