1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "btm_thread.h"
17
18 #include <stddef.h>
19
20 #include "btstack.h"
21 #include "platform/include/allocator.h"
22 #include "platform/include/list.h"
23 #include "platform/include/mutex.h"
24 #include "platform/include/queue.h"
25 #include "platform/include/semaphore.h"
26 #include "platform/include/thread.h"
27 #include "log.h"
28
29 typedef struct {
30 void (*task)(void *context);
31 void *context;
32 } BtmTask;
33
34 typedef struct {
35 uint8_t id;
36 Queue *queue;
37 ReactorItem *reactorItem;
38 } BtmProcessingQueue;
39
40 static Thread *g_processingThread = NULL;
41 static List *g_processingQueueList = NULL;
42 static Mutex *g_processingQueueLock = NULL;
43
AllocTask(void (* task)(void * context),void * context)44 static BtmTask *AllocTask(void (*task)(void *context), void *context)
45 {
46 BtmTask *block = MEM_MALLOC.alloc(sizeof(BtmTask));
47 if (block != NULL) {
48 block->task = task;
49 block->context = context;
50 }
51 return block;
52 }
53
FreeTask(void * task)54 static void FreeTask(void *task)
55 {
56 MEM_MALLOC.free(task);
57 }
58
RunTask(void * context)59 NO_SANITIZE("cfi") static void RunTask(void *context)
60 {
61 BtmTask *task = QueueTryDequeue((Queue *)context);
62 if (task != NULL) {
63 task->task(task->context);
64
65 FreeTask(task);
66 }
67 }
68
AllocProcessingQueue(uint8_t id,uint32_t size)69 static BtmProcessingQueue *AllocProcessingQueue(uint8_t id, uint32_t size)
70 {
71 BtmProcessingQueue *block = MEM_MALLOC.alloc(sizeof(BtmProcessingQueue));
72 if (block != NULL) {
73 block->id = id;
74 block->queue = QueueCreate(size);
75 if (block->queue != NULL) {
76 Reactor *reactor = ThreadGetReactor(g_processingThread);
77 block->reactorItem = ReactorRegister(reactor, QueueGetDequeueFd(block->queue), block->queue, RunTask, NULL);
78 }
79 }
80 return block;
81 }
82
83 typedef struct {
84 Queue *queue;
85 Semaphore *semaphore;
86 } RunAllTaskContext;
87
RunAllTaskInQueueTask(void * param)88 static void RunAllTaskInQueueTask(void *param)
89 {
90 RunAllTaskContext *context = (RunAllTaskContext *)param;
91
92 BtmTask *task = QueueTryDequeue(context->queue);
93 while (task != NULL) {
94 task->task(task->context);
95
96 FreeTask(task);
97
98 task = QueueTryDequeue(context->queue);
99 }
100
101 if (context->semaphore != NULL) {
102 SemaphorePost(context->semaphore);
103 }
104 }
105
RunAllTaskInQueue(Queue * queue)106 static void RunAllTaskInQueue(Queue *queue)
107 {
108 RunAllTaskContext context = {
109 .queue = queue,
110 .semaphore = NULL,
111 };
112
113 if (ThreadIsSelf(g_processingThread) == 0) {
114 RunAllTaskInQueueTask(&context);
115 } else {
116 context.semaphore = SemaphoreCreate(0);
117 ThreadPostTask(g_processingThread, RunAllTaskInQueueTask, &context);
118 SemaphoreWait(context.semaphore);
119 SemaphoreDelete(context.semaphore);
120 }
121 }
122
FreeProcessingQueue(void * queue)123 static void FreeProcessingQueue(void *queue)
124 {
125 BtmProcessingQueue *block = (BtmProcessingQueue *)queue;
126 if (block->reactorItem != NULL) {
127 ReactorUnregister(block->reactorItem);
128 block->reactorItem = NULL;
129 }
130 if (block->queue != NULL) {
131 QueueDelete(block->queue, FreeTask);
132 block->queue = NULL;
133 }
134 MEM_MALLOC.free(queue);
135 }
136
FindProcessingQueueById(uint8_t queueId)137 static BtmProcessingQueue *FindProcessingQueueById(uint8_t queueId)
138 {
139 BtmProcessingQueue *queue = NULL;
140
141 ListNode *node = NULL;
142 node = ListGetFirstNode(g_processingQueueList);
143 while (node != NULL) {
144 queue = ListGetNodeData(node);
145 if (queue->id == queueId) {
146 break;
147 } else {
148 queue = NULL;
149 }
150 node = ListGetNextNode(node);
151 }
152
153 return queue;
154 }
155
BtmInitThread()156 void BtmInitThread()
157 {
158 g_processingThread = ThreadCreate("Stack");
159 g_processingQueueList = ListCreate(FreeProcessingQueue);
160 g_processingQueueLock = MutexCreate();
161 }
162
BtmCloseThread()163 void BtmCloseThread()
164 {
165 if (g_processingQueueList != NULL) {
166 ListDelete(g_processingQueueList);
167 g_processingQueueList = NULL;
168 }
169
170 if (g_processingThread != NULL) {
171 ThreadDelete(g_processingThread);
172 g_processingThread = NULL;
173 }
174
175 if (g_processingQueueLock != NULL) {
176 MutexDelete(g_processingQueueLock);
177 g_processingQueueLock = NULL;
178 }
179 }
180
BTM_GetProcessingThread()181 Thread *BTM_GetProcessingThread()
182 {
183 return g_processingThread;
184 }
185
BTM_CreateProcessingQueue(uint8_t queueId,uint16_t size)186 int BTM_CreateProcessingQueue(uint8_t queueId, uint16_t size)
187 {
188 int result = BT_SUCCESS;
189 MutexLock(g_processingQueueLock);
190
191 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
192 if (queue != NULL) {
193 result = BT_BAD_STATUS;
194 } else {
195 queue = AllocProcessingQueue(queueId, size);
196 ListAddLast(g_processingQueueList, queue);
197 }
198
199 MutexUnlock(g_processingQueueLock);
200 return result;
201 }
202
BTM_DeleteProcessingQueue(uint8_t queueId)203 int BTM_DeleteProcessingQueue(uint8_t queueId)
204 {
205 int result = BT_SUCCESS;
206
207 Queue *taskQueue = NULL;
208
209 MutexLock(g_processingQueueLock);
210 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
211 if (queue != NULL) {
212 taskQueue = queue->queue;
213 queue->queue = NULL;
214 ListRemoveNode(g_processingQueueList, queue);
215 } else {
216 result = BT_BAD_STATUS;
217 }
218 MutexUnlock(g_processingQueueLock);
219
220 if (taskQueue != NULL) {
221 RunAllTaskInQueue(taskQueue);
222 QueueDelete(taskQueue, FreeTask);
223 taskQueue = NULL;
224 }
225
226 return result;
227 }
228
BTM_RunTaskInProcessingQueue(uint8_t queueId,void (* task)(void * context),void * context)229 int BTM_RunTaskInProcessingQueue(uint8_t queueId, void (*task)(void *context), void *context)
230 {
231 HILOGD("%{public}d ,start process queueId is ", queueId);
232 int result = BT_SUCCESS;
233 MutexLock(g_processingQueueLock);
234 BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
235 if (queue != NULL) {
236 BtmTask *block = AllocTask(task, context);
237 if (task != NULL) {
238 QueueEnqueue(queue->queue, block);
239 } else {
240 result = BT_NO_MEMORY;
241 }
242 } else {
243 result = BT_BAD_STATUS;
244 }
245 MutexUnlock(g_processingQueueLock);
246 HILOGD("%{public}d ,end process queueId is ", queueId);
247 return result;
248 }
249