1  /*
2   * Copyright (C) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "platform/include/thread.h"
17  #include <pthread.h>
18  #include <sched.h>
19  #include <stdlib.h>
20  #include <unistd.h>
21  #include <sys/prctl.h>
22  #include <sys/resource.h>
23  #include <sys/syscall.h>
24  #include "platform/include/mutex.h"
25  #include "platform/include/platform_def.h"
26  #include "platform/include/queue.h"
27  #include "platform/include/reactor.h"
28  #include "platform/include/semaphore.h"
29  #include "securec.h"
30  
31  #define THREAD_QUEUE_SIZE 128
32  const char THREAD_DEFAULT_NAME[THREAD_NAME_SIZE] = "bt-stack";
33  
34  typedef struct Thread {
35      pid_t tid;
36      bool isStopped;
37      pthread_t pthread;
38      Reactor *reactor;
39      Queue *taskQueue;
40      Mutex *apiMutex;
41  } ThreadInternal;
42  
43  typedef struct {
44      Thread *thread;
45      Semaphore *sync;
46      char name[THREAD_NAME_SIZE + 1];
47  } StartPromise;
48  
49  typedef struct {
50      void (*func)(void *context);
51      void *context;
52  } TaskItem;
53  
ReadyToRead(void * queue)54  static void ReadyToRead(void *queue)
55  {
56      ASSERT(queue);
57      TaskItem *item = (TaskItem *)QueueDequeue((Queue *)queue);
58      if (item == NULL) {
59          LOG_ERROR("Thread: Queue Dequeue failed.");
60          return;
61      }
62      item->func(item->context);
63      free(item);
64  }
65  
ThreadStartFunc(void * promise)66  static void *ThreadStartFunc(void *promise)
67  {
68      StartPromise *startPromise = (StartPromise *)promise;
69      Thread *thread = startPromise->thread;
70  
71      thread->tid = (long int)syscall(__NR_gettid);
72      prctl(PR_SET_NAME, startPromise->name);
73      ReactorSetThreadId(thread->reactor, (unsigned long)pthread_self());
74  
75      SemaphorePost(startPromise->sync);
76  
77      int fd = QueueGetDequeueFd(thread->taskQueue);
78      ReactorItem *reactorItem = ReactorRegister(thread->reactor, fd, (void *)thread->taskQueue, ReadyToRead, NULL);
79  
80      // Start Running reactor.
81      if (ReactorStart(thread->reactor) != 0) {
82          LOG_ERROR("ThreadStartFunc: Reactor run failed.");
83      }
84      ReactorUnregister(reactorItem);
85  
86      int num = 0;
87      // Execute all remain tasks in queue after stop Reactor.
88      TaskItem *task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
89      while (num <= THREAD_QUEUE_SIZE && task) {
90          task->func(task->context);
91          free(task);
92          task = (TaskItem *)QueueTryDequeue(thread->taskQueue);
93          num++;
94      }
95  
96      return NULL;
97  }
98  
ThreadIsSelf(const Thread * thread)99  int32_t ThreadIsSelf(const Thread *thread)
100  {
101      ASSERT(thread);
102      if (pthread_equal(thread->pthread, pthread_self()) != 0) {
103          return 0;
104      } else {
105          return -1;
106      }
107  }
108  
ThreadStop(Thread * thread)109  static void ThreadStop(Thread *thread)
110  {
111      MutexLock(thread->apiMutex);
112      if (ThreadIsSelf((const Thread *)thread) == 0) {
113          LOG_ERROR("ThreadStop: Cannot stop thread by itself.");
114      }
115      if (!thread->isStopped) {
116          ReactorStop(thread->reactor);
117          pthread_join(thread->pthread, NULL);
118          thread->isStopped = true;
119      }
120      MutexUnlock(thread->apiMutex);
121  }
122  
ThreadCreate(const char * name)123  Thread *ThreadCreate(const char *name)
124  {
125      Thread *thread = (Thread *)calloc(1, (sizeof(Thread)));
126      if (thread == NULL) {
127          return NULL;
128      }
129  
130      thread->reactor = ReactorCreate();
131      if (thread->reactor == NULL) {
132          goto ERROR;
133      }
134  
135      thread->taskQueue = QueueCreate(THREAD_QUEUE_SIZE);
136      if (thread->taskQueue == NULL) {
137          goto ERROR;
138      }
139  
140      thread->apiMutex = MutexCreate();
141      if (thread->apiMutex == NULL) {
142          goto ERROR;
143      }
144  
145      StartPromise *promise = (StartPromise *)calloc(1, sizeof(StartPromise));
146      promise->thread = thread;
147      promise->sync = SemaphoreCreate(0);
148      if (promise->sync == NULL) {
149          free(promise);
150          goto ERROR;
151      }
152  
153      if (name != NULL) {
154          (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, name, THREAD_NAME_SIZE);
155      } else {
156          (void)strncpy_s(promise->name, THREAD_NAME_SIZE + 1, THREAD_DEFAULT_NAME, THREAD_NAME_SIZE);
157      }
158  
159      (void)pthread_create(&thread->pthread, NULL, ThreadStartFunc, promise);
160  
161      SemaphoreWait(promise->sync);
162      SemaphoreDelete(promise->sync);
163      free(promise);
164  
165      return thread;
166  
167  ERROR:
168      if (thread != NULL) {
169          ReactorDelete(thread->reactor);
170          QueueDelete(thread->taskQueue, free);
171          MutexDelete(thread->apiMutex);
172          free(thread);
173      }
174      return NULL;
175  }
176  
ThreadDelete(Thread * thread)177  void ThreadDelete(Thread *thread)
178  {
179      if (thread == NULL) {
180          return;
181      }
182  
183      ThreadStop(thread);
184      MutexDelete(thread->apiMutex);
185      QueueDelete(thread->taskQueue, free);
186      ReactorDelete(thread->reactor);
187  
188      free(thread);
189  }
190  
ThreadPostTask(Thread * thread,TaskFunc func,void * context)191  void ThreadPostTask(Thread *thread, TaskFunc func, void *context)
192  {
193      ASSERT(thread);
194      ASSERT(func);
195  
196      TaskItem *task = (TaskItem *)malloc(sizeof(TaskItem));
197      if (task == NULL) {
198          return;
199      }
200      task->func = func;
201      task->context = context;
202      QueueEnqueue(thread->taskQueue, task);
203  }
204  
ThreadGetReactor(const Thread * thread)205  Reactor *ThreadGetReactor(const Thread *thread)
206  {
207      ASSERT(thread);
208      return thread->reactor;
209  }
210