1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "thread_manager.h"
16
17 #include <errno.h>
18 #include <pthread.h>
19 #include <stdatomic.h>
20
21 #include "appspawn_utils.h"
22 #include "list.h"
23
24 typedef struct {
25 atomic_uint threadExit;
26 uint32_t index;
27 pthread_t threadId;
28 } ThreadNode;
29
30 typedef struct {
31 pthread_mutex_t mutex; // 保护执行队列
32 pthread_cond_t cond; // 线程等待条件
33 ListNode taskList; // 任务队列,任务还没有启动
34 ListNode waitingTaskQueue; // 启动的任务,排队等待执行
35 ListNode executingTaskQueue; // 正在执行
36 ListNode executorQueue; // 执行节点,保存 TaskExecuteNode
37 uint32_t executorCount;
38 uint32_t maxThreadCount;
39 uint32_t currTaskId;
40 struct timespec lastAdjust;
41 uint32_t validThreadCount;
42 ThreadNode threadNode[1]; // 线程信息,控制线程的退出和结束
43 } ThreadManager;
44
45 typedef struct {
46 uint32_t taskId;
47 ListNode node;
48 ListNode executorList;
49 uint32_t totalTask;
50 atomic_uint taskFlags; // 表示任务是否被取消,各线程检查后决定任务线程是否结束
51 atomic_uint finishTaskCount;
52 const ThreadContext *context;
53 TaskFinishProcessor finishProcess;
54 pthread_mutex_t mutex; // 保护执行队列
55 pthread_cond_t cond; // 同步执行时,等待确认
56 } TaskNode;
57
58 typedef struct {
59 ListNode node; // 保存sub task到对应的task,方便管理
60 ListNode executeNode; // 等待处理的任务节点
61 TaskNode *task;
62 const ThreadContext *context;
63 TaskExecutor executor;
64 } TaskExecuteNode;
65
66 static ThreadManager *g_threadManager = NULL;
67
68 static void *ManagerThreadProc(void *args);
69 static void *ThreadExecute(void *args);
70
SetCondAttr(pthread_cond_t * cond)71 static void SetCondAttr(pthread_cond_t *cond)
72 {
73 pthread_condattr_t attr;
74 pthread_condattr_init(&attr);
75 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
76 pthread_cond_init(cond, &attr);
77 pthread_condattr_destroy(&attr);
78 }
79
ConvertToTimespec(int time,struct timespec * tm)80 static void ConvertToTimespec(int time, struct timespec *tm)
81 {
82 struct timespec start;
83 clock_gettime(CLOCK_MONOTONIC, &start);
84 uint64_t ns = time;
85 ns *= APPSPAWN_MSEC_TO_NSEC;
86 ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec;
87 tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC;
88 tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC;
89 }
90
PopTaskExecutor(ThreadManager * mgr)91 static TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr)
92 {
93 TaskExecuteNode *executor = NULL;
94 pthread_mutex_lock(&mgr->mutex);
95 ListNode *node = mgr->executorQueue.next;
96 if (node != &mgr->executorQueue) {
97 OH_ListRemove(node);
98 OH_ListInit(node);
99 executor = ListEntry(node, TaskExecuteNode, executeNode);
100 mgr->executorCount--;
101 }
102 pthread_mutex_unlock(&mgr->mutex);
103 return executor;
104 }
105
AddExecutor(ThreadManager * mgr,const TaskNode * task)106 static int AddExecutor(ThreadManager *mgr, const TaskNode *task)
107 {
108 ListNode *node = task->executorList.next;
109 while (node != &task->executorList) {
110 TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
111 APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u",
112 task->taskId, mgr->executorCount, executor->task->taskId);
113
114 // 插入尾部执行
115 pthread_mutex_lock(&mgr->mutex);
116 OH_ListRemove(&executor->executeNode);
117 OH_ListInit(&executor->executeNode);
118 OH_ListAddTail(&mgr->executorQueue, &executor->executeNode);
119 mgr->executorCount++;
120 pthread_mutex_unlock(&mgr->mutex);
121
122 node = node->next;
123 }
124 return 0;
125 }
126
RunExecutor(ThreadManager * mgr,ThreadNode * threadNode,uint32_t maxCount)127 static void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount)
128 {
129 APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ",
130 threadNode->index, mgr->executorCount);
131 TaskExecuteNode *executor = PopTaskExecutor(mgr);
132 uint32_t count = 0;
133 while (executor != NULL && !threadNode->threadExit) {
134 APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId);
135 atomic_fetch_add(&executor->task->finishTaskCount, 1);
136 executor->executor(executor->task->taskId, executor->context);
137 count++;
138 if (count >= maxCount) {
139 break;
140 }
141 executor = PopTaskExecutor(mgr);
142 }
143 APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount);
144 }
145
TaskCompareTaskId(ListNode * node,void * data)146 static int TaskCompareTaskId(ListNode *node, void *data)
147 {
148 TaskNode *task = ListEntry(node, TaskNode, node);
149 return task->taskId - *(uint32_t *)data;
150 }
151
GetTask(ThreadManager * mgr,ListNode * queue,uint32_t taskId)152 static TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId)
153 {
154 ListNode *node = NULL;
155 pthread_mutex_lock(&mgr->mutex);
156 node = OH_ListFind(queue, &taskId, TaskCompareTaskId);
157 pthread_mutex_unlock(&mgr->mutex);
158 if (node == NULL) {
159 return NULL;
160 }
161 return ListEntry(node, TaskNode, node);
162 }
163
DeleteTask(TaskNode * task)164 static void DeleteTask(TaskNode *task)
165 {
166 APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId);
167
168 if (!ListEmpty(task->node)) {
169 return;
170 }
171 OH_ListRemoveAll(&task->executorList, NULL);
172 pthread_cond_destroy(&task->cond);
173 pthread_mutex_destroy(&task->mutex);
174 free(task);
175 }
176
PopTask(ThreadManager * mgr,ListNode * queue)177 static TaskNode *PopTask(ThreadManager *mgr, ListNode *queue)
178 {
179 TaskNode *task = NULL;
180 pthread_mutex_lock(&mgr->mutex);
181 ListNode *node = queue->next;
182 if (node != queue) {
183 OH_ListRemove(node);
184 OH_ListInit(node);
185 task = ListEntry(node, TaskNode, node);
186 }
187 pthread_mutex_unlock(&mgr->mutex);
188 return task;
189 }
190
PushTask(ThreadManager * mgr,TaskNode * task,ListNode * queue)191 static void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue)
192 {
193 pthread_mutex_lock(&mgr->mutex);
194 OH_ListAddTail(queue, &task->node);
195 pthread_cond_broadcast(&mgr->cond);
196 pthread_mutex_unlock(&mgr->mutex);
197 }
198
SafeRemoveTask(ThreadManager * mgr,TaskNode * task)199 static void SafeRemoveTask(ThreadManager *mgr, TaskNode *task)
200 {
201 pthread_mutex_lock(&mgr->mutex);
202 OH_ListRemove(&task->node);
203 OH_ListInit(&task->node);
204 pthread_mutex_unlock(&mgr->mutex);
205
206 ListNode *node = task->executorList.next;
207 while (node != &task->executorList) {
208 OH_ListRemove(node);
209 OH_ListInit(node);
210 TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
211 pthread_mutex_lock(&mgr->mutex);
212 if (!ListEmpty(executor->executeNode)) {
213 OH_ListRemove(&executor->executeNode);
214 OH_ListInit(&executor->executeNode);
215 mgr->executorCount--;
216 }
217 pthread_mutex_unlock(&mgr->mutex);
218 free(executor);
219
220 node = task->executorList.next;
221 }
222 }
223
ExecuteTask(ThreadManager * mgr)224 static void ExecuteTask(ThreadManager *mgr)
225 {
226 TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue);
227 if (task == NULL) {
228 return;
229 }
230
231 APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId);
232 AddExecutor(mgr, task);
233 PushTask(mgr, task, &mgr->executingTaskQueue);
234 return;
235 }
236
CheckTaskComplete(ThreadManager * mgr)237 static void CheckTaskComplete(ThreadManager *mgr)
238 {
239 TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue);
240 if (task == NULL) {
241 return;
242 }
243 if (task->totalTask <= atomic_load(&task->finishTaskCount)) {
244 if (task->finishProcess != NULL) {
245 task->finishProcess(task->taskId, task->context);
246 DeleteTask(task);
247 return;
248 }
249 pthread_mutex_lock(&task->mutex);
250 pthread_cond_signal(&task->cond);
251 pthread_mutex_unlock(&task->mutex);
252 return;
253 }
254 PushTask(mgr, task, &mgr->executingTaskQueue);
255 return;
256 }
257
TaskQueueDestroyProc(ListNode * node)258 static void TaskQueueDestroyProc(ListNode *node)
259 {
260 OH_ListRemove(node);
261 TaskNode *task = ListEntry(node, TaskNode, node);
262 DeleteTask(task);
263 }
264
CreateThreadMgr(uint32_t maxThreadCount,ThreadMgr * instance)265 int CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance)
266 {
267 if (g_threadManager != NULL) {
268 *instance = (ThreadMgr)g_threadManager;
269 return 0;
270 }
271
272 ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode));
273 APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager");
274
275 mgr->executorCount = 0;
276 mgr->currTaskId = 0;
277 mgr->validThreadCount = 0;
278 mgr->maxThreadCount = maxThreadCount;
279 OH_ListInit(&mgr->taskList);
280 OH_ListInit(&mgr->waitingTaskQueue);
281 OH_ListInit(&mgr->executingTaskQueue);
282 OH_ListInit(&mgr->executorQueue);
283 pthread_mutex_init(&mgr->mutex, NULL);
284 SetCondAttr(&mgr->cond);
285
286 for (uint32_t index = 0; index < maxThreadCount + 1; index++) {
287 mgr->threadNode[index].index = index;
288 mgr->threadNode[index].threadId = INVALID_THREAD_ID;
289 atomic_init(&mgr->threadNode[index].threadExit, 0);
290 }
291 g_threadManager = mgr;
292 int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]);
293 if (ret != 0) {
294 APPSPAWN_LOGE("Failed to create thread for manager");
295 g_threadManager = NULL;
296 free(mgr);
297 return -1;
298 }
299 *instance = (ThreadMgr)mgr;
300 APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount);
301 return 0;
302 }
303
DestroyThreadMgr(ThreadMgr instance)304 int DestroyThreadMgr(ThreadMgr instance)
305 {
306 APPSPAWN_LOGV("DestroyThreadMgr");
307 ThreadManager *mgr = (ThreadManager *)instance;
308 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
309
310 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
311 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
312 atomic_store(&mgr->threadNode[index].threadExit, 1);
313 APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit);
314 }
315 }
316 pthread_mutex_lock(&mgr->mutex);
317 pthread_cond_broadcast(&mgr->cond);
318 pthread_mutex_unlock(&mgr->mutex);
319 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
320 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
321 pthread_join(mgr->threadNode[index].threadId, NULL);
322 APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index);
323 }
324 }
325
326 pthread_mutex_lock(&mgr->mutex);
327 OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc);
328 OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc);
329 OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc);
330 OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc);
331 pthread_mutex_unlock(&mgr->mutex);
332
333 pthread_cond_destroy(&mgr->cond);
334 pthread_mutex_destroy(&mgr->mutex);
335 return 0;
336 }
337
ThreadMgrAddTask(ThreadMgr instance,ThreadTaskHandle * taskHandle)338 int ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle)
339 {
340 ThreadManager *mgr = (ThreadManager *)instance;
341 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
342 TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode));
343 APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task");
344
345 task->context = NULL;
346 task->finishProcess = NULL;
347 task->totalTask = 0;
348 atomic_init(&task->taskFlags, 0);
349 atomic_init(&task->finishTaskCount, 0);
350 OH_ListInit(&task->node);
351 OH_ListInit(&task->executorList);
352 pthread_mutex_init(&task->mutex, NULL);
353 SetCondAttr(&task->cond);
354
355 pthread_mutex_lock(&mgr->mutex);
356 task->taskId = mgr->currTaskId++;
357 OH_ListAddTail(&mgr->taskList, &task->node);
358 pthread_mutex_unlock(&mgr->mutex);
359 *taskHandle = task->taskId;
360 APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId);
361 return 0;
362 }
363
ThreadMgrAddExecutor(ThreadMgr instance,ThreadTaskHandle taskHandle,TaskExecutor executor,const ThreadContext * context)364 int ThreadMgrAddExecutor(ThreadMgr instance,
365 ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context)
366 {
367 ThreadManager *mgr = (ThreadManager *)instance;
368 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
369 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
370 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
371
372 TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode));
373 APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle);
374 node->task = task;
375 OH_ListInit(&node->node);
376 OH_ListInit(&node->executeNode);
377 node->context = context;
378 node->executor = executor;
379 task->totalTask++;
380 OH_ListAddTail(&task->executorList, &node->node);
381 return 0;
382 }
383
ThreadMgrCancelTask(ThreadMgr instance,ThreadTaskHandle taskHandle)384 int ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle)
385 {
386 ThreadManager *mgr = (ThreadManager *)instance;
387 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
388 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
389 if (task != NULL) {
390 SafeRemoveTask(mgr, task);
391 DeleteTask(task);
392 return 0;
393 }
394 task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle);
395 if (task != NULL) {
396 SafeRemoveTask(mgr, task);
397 DeleteTask(task);
398 return 0;
399 }
400 task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle);
401 if (task != NULL) {
402 SafeRemoveTask(mgr, task);
403 DeleteTask(task);
404 return 0;
405 }
406 return 0;
407 }
408
TaskSyncExecute(ThreadMgr instance,ThreadTaskHandle taskHandle)409 int TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle)
410 {
411 ThreadManager *mgr = (ThreadManager *)instance;
412 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
413 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
414 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
415
416 pthread_mutex_lock(&task->mutex);
417 OH_ListRemove(&task->node);
418 OH_ListInit(&task->node);
419 OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
420 pthread_cond_broadcast(&mgr->cond);
421 pthread_mutex_unlock(&task->mutex);
422 APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId);
423 struct timespec abstime;
424 int ret = 0;
425 do {
426 ConvertToTimespec(60 * 1000, &abstime); // wait 60 * 1000 60s
427 pthread_mutex_lock(&task->mutex);
428 ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime);
429 pthread_mutex_unlock(&task->mutex);
430 APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret);
431 } while (ret == ETIMEDOUT);
432
433 DeleteTask(task);
434 return ret;
435 }
436
TaskExecute(ThreadMgr instance,ThreadTaskHandle taskHandle,TaskFinishProcessor process,const ThreadContext * context)437 int TaskExecute(ThreadMgr instance,
438 ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context)
439 {
440 ThreadManager *mgr = (ThreadManager *)instance;
441 APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
442 TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
443 APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
444
445 task->finishProcess = process;
446 task->context = context;
447 pthread_mutex_lock(&mgr->mutex);
448 OH_ListRemove(&task->node);
449 OH_ListInit(&task->node);
450 OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
451 pthread_cond_broadcast(&mgr->cond);
452 pthread_mutex_unlock(&mgr->mutex);
453 APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId);
454 return 0;
455 }
456
CheckAndCreateNewThread(ThreadManager * mgr)457 static void CheckAndCreateNewThread(ThreadManager *mgr)
458 {
459 if (mgr->maxThreadCount <= mgr->validThreadCount) {
460 return;
461 }
462 if (mgr->executorCount <= mgr->validThreadCount) {
463 return;
464 }
465 APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u",
466 mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount);
467
468 uint32_t totalThread = mgr->maxThreadCount;
469 if (mgr->executorCount <= mgr->maxThreadCount) {
470 totalThread = mgr->executorCount;
471 }
472
473 for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
474 if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
475 continue;
476 }
477 int ret = pthread_create(&mgr->threadNode[index].threadId,
478 NULL, ThreadExecute, (void *)&(mgr->threadNode[index]));
479 APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index);
480 APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index);
481 mgr->validThreadCount++;
482 if (mgr->validThreadCount >= totalThread) {
483 return;
484 }
485 }
486 return;
487 }
488
ManagerThreadProc(void * args)489 static void *ManagerThreadProc(void *args)
490 {
491 ThreadManager *mgr = g_threadManager;
492 ThreadNode *threadNode = (ThreadNode *)args;
493 struct timespec abstime;
494 while (!threadNode->threadExit) {
495 pthread_mutex_lock(&mgr->mutex);
496 do {
497 uint32_t timeout = 60 * 1000; // 60 * 1000 60s
498 if (!ListEmpty(mgr->waitingTaskQueue)) {
499 break;
500 }
501 if (!ListEmpty(mgr->executingTaskQueue)) {
502 timeout = 500; // 500ms
503 }
504 ConvertToTimespec(timeout, &abstime);
505 int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
506 if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) {
507 break;
508 }
509 if (threadNode->threadExit) {
510 break;
511 }
512 } while (1);
513 pthread_mutex_unlock(&mgr->mutex);
514
515 ExecuteTask(mgr);
516 CheckAndCreateNewThread(mgr);
517
518 if (mgr->validThreadCount == 0) {
519 RunExecutor(mgr, threadNode, 5); // 5 max thread
520 }
521 CheckTaskComplete(mgr);
522 }
523 return 0;
524 }
525
ThreadExecute(void * args)526 static void *ThreadExecute(void *args)
527 {
528 ThreadManager *mgr = g_threadManager;
529 ThreadNode *threadNode = (ThreadNode *)args;
530 struct timespec abstime;
531 while (!threadNode->threadExit) {
532 pthread_mutex_lock(&mgr->mutex);
533 while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) {
534 ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s
535 pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
536 }
537 pthread_mutex_unlock(&mgr->mutex);
538 APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit);
539 RunExecutor(mgr, threadNode, 1);
540 }
541 return NULL;
542 }
543