1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "platform/include/thread.h" 17 #include <pthread.h> 18 #include <sched.h> 19 #include <stdlib.h> 20 #include <unistd.h> 21 #include <sys/prctl.h> 22 #include <sys/resource.h> 23 #include <sys/syscall.h> 24 #include "platform/include/mutex.h" 25 #include "platform/include/platform_def.h" 26 #include "platform/include/queue.h" 27 #include "platform/include/reactor.h" 28 #include "platform/include/semaphore.h" 29 #include "securec.h" 30 31 #define THREAD_QUEUE_SIZE 128 32 const char THREAD_DEFAULT_NAME[THREAD_NAME_SIZE] = "bt-stack"; 33 34 typedef struct Thread { 35 pid_t tid; 36 bool isStopped; 37 pthread_t pthread; 38 Reactor *reactor; 39 Queue *taskQueue; 40 Mutex *apiMutex; 41 } ThreadInternal; 42 43 typedef struct { 44 Thread *thread; 45 Semaphore *sync; 46 char name[THREAD_NAME_SIZE + 1]; 47 } StartPromise; 48 49 typedef struct { 50 void (*func)(void *context); 51 void *context; 52 } TaskItem; 53 ReadyToRead(void * queue)54 static void ReadyToRead(void *queue) 55 { 56 ASSERT(queue); 57 TaskItem *item = (TaskItem *)QueueDequeue((Queue *)queue); 58 if (item == NULL) { 59 LOG_ERROR("Thread: Queue Dequeue failed."); 60 return; 61 } 62 item->func(item->context); 63 free(item); 64 } 65 ThreadStartFunc(void * promise)66 static void *ThreadStartFunc(void *promise) 67 { 68 StartPromise *startPromise = (StartPromise *)promise; 69 Thread *thread = startPromise->thread; 70 71 thread->tid = (long int)syscall(__NR_gettid); 72 prctl(PR_SET_NAME, startPromise->name); 73 ReactorSetThreadId(thread->reactor, (unsigned long)pthread_self()); 74 75 SemaphorePost(startPromise->sync); 76 77 int fd = QueueGetDequeueFd(thread->taskQueue); 78 ReactorItem *reactorItem = ReactorRegister(thread->reactor, fd, (void *)thread->taskQueue, ReadyToRead, NULL); 79 80 // Start Running reactor. 81 if (ReactorStart(thread->reactor) != 0) { 82 LOG_ERROR("ThreadStartFunc: Reactor run failed."); 83 } 84 ReactorUnregister(reactorItem); 85 86 int num = 0; 87 // Execute all remain tasks in queue after stop Reactor. 88 TaskItem *task = (TaskItem *)QueueTryDequeue(thread->taskQueue); 89 while (num <= THREAD_QUEUE_SIZE && task) { 90 task->func(task->context); 91 free(task); 92 task = (TaskItem *)QueueTryDequeue(thread->taskQueue); 93 num++; 94 } 95 96 return NULL; 97 } 98 ThreadIsSelf(const Thread * thread)99 int32_t ThreadIsSelf(const Thread *thread) 100 { 101 ASSERT(thread); 102 if (pthread_equal(thread->pthread, pthread_self()) != 0) { 103 return 0; 104 } else { 105 return -1; 106 } 107 } 108 ThreadStop(Thread * thread)109 static void ThreadStop(Thread *thread) 110 { 111 MutexLock(thread->apiMutex); 112 if (ThreadIsSelf((const Thread *)thread) == 0) { 113 LOG_ERROR("ThreadStop: Cannot stop thread by itself."); 114 } 115 if (!thread->isStopped) { 116 ReactorStop(thread->reactor); 117 pthread_join(thread->pthread, NULL); 118 thread->isStopped = true; 119 } 120 MutexUnlock(thread->apiMutex); 121 } 122 ThreadCreate(const char * name)123 Thread *ThreadCreate(const char *name) 124 { 125 Thread *thread = (Thread *)calloc(1, (sizeof(Thread))); 126 if (thread == NULL) { 127 return NULL; 128 } 129 130 thread->reactor = ReactorCreate(); 131 if (thread->reactor == NULL) { 132 goto ERROR; 133 } 134 135 thread->taskQueue = QueueCreate(THREAD_QUEUE_SIZE); 136 if (thread->taskQueue == NULL) { 137 goto ERROR; 138 } 139 140 thread->apiMutex = MutexCreate(); 141 if (thread->apiMutex == NULL) { 142 goto ERROR; 143 } 144 145 StartPromise *promise = (StartPromise *)calloc(1, sizeof(StartPromise)); 146 promise->thread = thread; 147 promise->sync = SemaphoreCreate(0); 148 if (promise->sync == NULL) { 149 free(promise); 150 goto ERROR; 151 } 152 153 if (name != NULL) { 154 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, name, THREAD_NAME_SIZE); 155 } else { 156 (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, THREAD_DEFAULT_NAME, THREAD_NAME_SIZE); 157 } 158 159 (void)pthread_create(&thread->pthread, NULL, ThreadStartFunc, promise); 160 161 SemaphoreWait(promise->sync); 162 SemaphoreDelete(promise->sync); 163 free(promise); 164 165 return thread; 166 167 ERROR: 168 if (thread != NULL) { 169 ReactorDelete(thread->reactor); 170 QueueDelete(thread->taskQueue, free); 171 MutexDelete(thread->apiMutex); 172 free(thread); 173 } 174 return NULL; 175 } 176 ThreadDelete(Thread * thread)177 void ThreadDelete(Thread *thread) 178 { 179 if (thread == NULL) { 180 return; 181 } 182 183 ThreadStop(thread); 184 MutexDelete(thread->apiMutex); 185 QueueDelete(thread->taskQueue, free); 186 ReactorDelete(thread->reactor); 187 188 free(thread); 189 } 190 ThreadPostTask(Thread * thread,TaskFunc func,void * context)191 void ThreadPostTask(Thread *thread, TaskFunc func, void *context) 192 { 193 ASSERT(thread); 194 ASSERT(func); 195 196 TaskItem *task = (TaskItem *)malloc(sizeof(TaskItem)); 197 if (task == NULL) { 198 return; 199 } 200 task->func = func; 201 task->context = context; 202 QueueEnqueue(thread->taskQueue, task); 203 } 204 ThreadGetReactor(const Thread * thread)205 Reactor *ThreadGetReactor(const Thread *thread) 206 { 207 ASSERT(thread); 208 return thread->reactor; 209 } 210