1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "btm_thread.h"
17 
18 #include <stddef.h>
19 
20 #include "btstack.h"
21 #include "platform/include/allocator.h"
22 #include "platform/include/list.h"
23 #include "platform/include/mutex.h"
24 #include "platform/include/queue.h"
25 #include "platform/include/semaphore.h"
26 #include "platform/include/thread.h"
27 #include "log.h"
28 
29 typedef struct {
30     void (*task)(void *context);
31     void *context;
32 } BtmTask;
33 
34 typedef struct {
35     uint8_t id;
36     Queue *queue;
37     ReactorItem *reactorItem;
38 } BtmProcessingQueue;
39 
40 static Thread *g_processingThread = NULL;
41 static List *g_processingQueueList = NULL;
42 static Mutex *g_processingQueueLock = NULL;
43 
AllocTask(void (* task)(void * context),void * context)44 static BtmTask *AllocTask(void (*task)(void *context), void *context)
45 {
46     BtmTask *block = MEM_MALLOC.alloc(sizeof(BtmTask));
47     if (block != NULL) {
48         block->task = task;
49         block->context = context;
50     }
51     return block;
52 }
53 
FreeTask(void * task)54 static void FreeTask(void *task)
55 {
56     MEM_MALLOC.free(task);
57 }
58 
RunTask(void * context)59 NO_SANITIZE("cfi") static void RunTask(void *context)
60 {
61     BtmTask *task = QueueTryDequeue((Queue *)context);
62     if (task != NULL) {
63         task->task(task->context);
64 
65         FreeTask(task);
66     }
67 }
68 
AllocProcessingQueue(uint8_t id,uint32_t size)69 static BtmProcessingQueue *AllocProcessingQueue(uint8_t id, uint32_t size)
70 {
71     BtmProcessingQueue *block = MEM_MALLOC.alloc(sizeof(BtmProcessingQueue));
72     if (block != NULL) {
73         block->id = id;
74         block->queue = QueueCreate(size);
75         if (block->queue != NULL) {
76             Reactor *reactor = ThreadGetReactor(g_processingThread);
77             block->reactorItem = ReactorRegister(reactor, QueueGetDequeueFd(block->queue), block->queue, RunTask, NULL);
78         }
79     }
80     return block;
81 }
82 
83 typedef struct {
84     Queue *queue;
85     Semaphore *semaphore;
86 } RunAllTaskContext;
87 
RunAllTaskInQueueTask(void * param)88 static void RunAllTaskInQueueTask(void *param)
89 {
90     RunAllTaskContext *context = (RunAllTaskContext *)param;
91 
92     BtmTask *task = QueueTryDequeue(context->queue);
93     while (task != NULL) {
94         task->task(task->context);
95 
96         FreeTask(task);
97 
98         task = QueueTryDequeue(context->queue);
99     }
100 
101     if (context->semaphore != NULL) {
102         SemaphorePost(context->semaphore);
103     }
104 }
105 
RunAllTaskInQueue(Queue * queue)106 static void RunAllTaskInQueue(Queue *queue)
107 {
108     RunAllTaskContext context = {
109         .queue = queue,
110         .semaphore = NULL,
111     };
112 
113     if (ThreadIsSelf(g_processingThread) == 0) {
114         RunAllTaskInQueueTask(&context);
115     } else {
116         context.semaphore = SemaphoreCreate(0);
117         ThreadPostTask(g_processingThread, RunAllTaskInQueueTask, &context);
118         SemaphoreWait(context.semaphore);
119         SemaphoreDelete(context.semaphore);
120     }
121 }
122 
FreeProcessingQueue(void * queue)123 static void FreeProcessingQueue(void *queue)
124 {
125     BtmProcessingQueue *block = (BtmProcessingQueue *)queue;
126     if (block->reactorItem != NULL) {
127         ReactorUnregister(block->reactorItem);
128         block->reactorItem = NULL;
129     }
130     if (block->queue != NULL) {
131         QueueDelete(block->queue, FreeTask);
132         block->queue = NULL;
133     }
134     MEM_MALLOC.free(queue);
135 }
136 
FindProcessingQueueById(uint8_t queueId)137 static BtmProcessingQueue *FindProcessingQueueById(uint8_t queueId)
138 {
139     BtmProcessingQueue *queue = NULL;
140 
141     ListNode *node = NULL;
142     node = ListGetFirstNode(g_processingQueueList);
143     while (node != NULL) {
144         queue = ListGetNodeData(node);
145         if (queue->id == queueId) {
146             break;
147         } else {
148             queue = NULL;
149         }
150         node = ListGetNextNode(node);
151     }
152 
153     return queue;
154 }
155 
BtmInitThread()156 void BtmInitThread()
157 {
158     g_processingThread = ThreadCreate("Stack");
159     g_processingQueueList = ListCreate(FreeProcessingQueue);
160     g_processingQueueLock = MutexCreate();
161 }
162 
BtmCloseThread()163 void BtmCloseThread()
164 {
165     if (g_processingQueueList != NULL) {
166         ListDelete(g_processingQueueList);
167         g_processingQueueList = NULL;
168     }
169 
170     if (g_processingThread != NULL) {
171         ThreadDelete(g_processingThread);
172         g_processingThread = NULL;
173     }
174 
175     if (g_processingQueueLock != NULL) {
176         MutexDelete(g_processingQueueLock);
177         g_processingQueueLock = NULL;
178     }
179 }
180 
BTM_GetProcessingThread()181 Thread *BTM_GetProcessingThread()
182 {
183     return g_processingThread;
184 }
185 
BTM_CreateProcessingQueue(uint8_t queueId,uint16_t size)186 int BTM_CreateProcessingQueue(uint8_t queueId, uint16_t size)
187 {
188     int result = BT_SUCCESS;
189     MutexLock(g_processingQueueLock);
190 
191     BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
192     if (queue != NULL) {
193         result = BT_BAD_STATUS;
194     } else {
195         queue = AllocProcessingQueue(queueId, size);
196         ListAddLast(g_processingQueueList, queue);
197     }
198 
199     MutexUnlock(g_processingQueueLock);
200     return result;
201 }
202 
BTM_DeleteProcessingQueue(uint8_t queueId)203 int BTM_DeleteProcessingQueue(uint8_t queueId)
204 {
205     int result = BT_SUCCESS;
206 
207     Queue *taskQueue = NULL;
208 
209     MutexLock(g_processingQueueLock);
210     BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
211     if (queue != NULL) {
212         taskQueue = queue->queue;
213         queue->queue = NULL;
214         ListRemoveNode(g_processingQueueList, queue);
215     } else {
216         result = BT_BAD_STATUS;
217     }
218     MutexUnlock(g_processingQueueLock);
219 
220     if (taskQueue != NULL) {
221         RunAllTaskInQueue(taskQueue);
222         QueueDelete(taskQueue, FreeTask);
223         taskQueue = NULL;
224     }
225 
226     return result;
227 }
228 
BTM_RunTaskInProcessingQueue(uint8_t queueId,void (* task)(void * context),void * context)229 int BTM_RunTaskInProcessingQueue(uint8_t queueId, void (*task)(void *context), void *context)
230 {
231     HILOGD("%{public}d ,start process queueId is ", queueId);
232     int result = BT_SUCCESS;
233     MutexLock(g_processingQueueLock);
234     BtmProcessingQueue *queue = FindProcessingQueueById(queueId);
235     if (queue != NULL) {
236         BtmTask *block = AllocTask(task, context);
237         if (task != NULL) {
238             QueueEnqueue(queue->queue, block);
239         } else {
240             result = BT_NO_MEMORY;
241         }
242     } else {
243         result = BT_BAD_STATUS;
244     }
245     MutexUnlock(g_processingQueueLock);
246     HILOGD("%{public}d ,end process queueId is ", queueId);
247     return result;
248 }
249