1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "thread_manager.h"
16 
17 #include <errno.h>
18 #include <pthread.h>
19 #include <stdatomic.h>
20 
21 #include "appspawn_utils.h"
22 #include "list.h"
23 
24 typedef struct {
25     atomic_uint threadExit;
26     uint32_t index;
27     pthread_t threadId;
28 } ThreadNode;
29 
30 typedef struct {
31     pthread_mutex_t mutex;        // 保护执行队列
32     pthread_cond_t cond;          // 线程等待条件
33     ListNode taskList;            // 任务队列,任务还没有启动
34     ListNode waitingTaskQueue;    // 启动的任务,排队等待执行
35     ListNode executingTaskQueue;  // 正在执行
36     ListNode executorQueue;       // 执行节点,保存 TaskExecuteNode
37     uint32_t executorCount;
38     uint32_t maxThreadCount;
39     uint32_t currTaskId;
40     struct timespec lastAdjust;
41     uint32_t validThreadCount;
42     ThreadNode threadNode[1];  // 线程信息,控制线程的退出和结束
43 } ThreadManager;
44 
45 typedef struct {
46     uint32_t taskId;
47     ListNode node;
48     ListNode executorList;
49     uint32_t totalTask;
50     atomic_uint taskFlags;  // 表示任务是否被取消,各线程检查后决定任务线程是否结束
51     atomic_uint finishTaskCount;
52     const ThreadContext *context;
53     TaskFinishProcessor finishProcess;
54     pthread_mutex_t mutex;  // 保护执行队列
55     pthread_cond_t cond;    // 同步执行时,等待确认
56 } TaskNode;
57 
58 typedef struct {
59     ListNode node;         // 保存sub task到对应的task,方便管理
60     ListNode executeNode;  // 等待处理的任务节点
61     TaskNode *task;
62     const ThreadContext *context;
63     TaskExecutor executor;
64 } TaskExecuteNode;
65 
66 static ThreadManager *g_threadManager = NULL;
67 
68 static void *ManagerThreadProc(void *args);
69 static void *ThreadExecute(void *args);
70 
SetCondAttr(pthread_cond_t * cond)71 static void SetCondAttr(pthread_cond_t *cond)
72 {
73     pthread_condattr_t attr;
74     pthread_condattr_init(&attr);
75     pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
76     pthread_cond_init(cond, &attr);
77     pthread_condattr_destroy(&attr);
78 }
79 
ConvertToTimespec(int time,struct timespec * tm)80 static void ConvertToTimespec(int time, struct timespec *tm)
81 {
82     struct timespec start;
83     clock_gettime(CLOCK_MONOTONIC, &start);
84     uint64_t ns = time;
85     ns *= APPSPAWN_MSEC_TO_NSEC;
86     ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec;
87     tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC;
88     tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC;
89 }
90 
PopTaskExecutor(ThreadManager * mgr)91 static TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr)
92 {
93     TaskExecuteNode *executor = NULL;
94     pthread_mutex_lock(&mgr->mutex);
95     ListNode *node = mgr->executorQueue.next;
96     if (node != &mgr->executorQueue) {
97         OH_ListRemove(node);
98         OH_ListInit(node);
99         executor = ListEntry(node, TaskExecuteNode, executeNode);
100         mgr->executorCount--;
101     }
102     pthread_mutex_unlock(&mgr->mutex);
103     return executor;
104 }
105 
AddExecutor(ThreadManager * mgr,const TaskNode * task)106 static int AddExecutor(ThreadManager *mgr, const TaskNode *task)
107 {
108     ListNode *node = task->executorList.next;
109     while (node != &task->executorList) {
110         TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
111         APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u",
112             task->taskId, mgr->executorCount, executor->task->taskId);
113 
114         // 插入尾部执行
115         pthread_mutex_lock(&mgr->mutex);
116         OH_ListRemove(&executor->executeNode);
117         OH_ListInit(&executor->executeNode);
118         OH_ListAddTail(&mgr->executorQueue, &executor->executeNode);
119         mgr->executorCount++;
120         pthread_mutex_unlock(&mgr->mutex);
121 
122         node = node->next;
123     }
124     return 0;
125 }
126 
RunExecutor(ThreadManager * mgr,ThreadNode * threadNode,uint32_t maxCount)127 static void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount)
128 {
129     APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ",
130         threadNode->index, mgr->executorCount);
131     TaskExecuteNode *executor = PopTaskExecutor(mgr);
132     uint32_t count = 0;
133     while (executor != NULL && !threadNode->threadExit) {
134         APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId);
135         atomic_fetch_add(&executor->task->finishTaskCount, 1);
136         executor->executor(executor->task->taskId, executor->context);
137         count++;
138         if (count >= maxCount) {
139             break;
140         }
141         executor = PopTaskExecutor(mgr);
142     }
143     APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount);
144 }
145 
TaskCompareTaskId(ListNode * node,void * data)146 static int TaskCompareTaskId(ListNode *node, void *data)
147 {
148     TaskNode *task = ListEntry(node, TaskNode, node);
149     return task->taskId - *(uint32_t *)data;
150 }
151 
GetTask(ThreadManager * mgr,ListNode * queue,uint32_t taskId)152 static TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId)
153 {
154     ListNode *node = NULL;
155     pthread_mutex_lock(&mgr->mutex);
156     node = OH_ListFind(queue, &taskId, TaskCompareTaskId);
157     pthread_mutex_unlock(&mgr->mutex);
158     if (node == NULL) {
159         return NULL;
160     }
161     return ListEntry(node, TaskNode, node);
162 }
163 
DeleteTask(TaskNode * task)164 static void DeleteTask(TaskNode *task)
165 {
166     APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId);
167 
168     if (!ListEmpty(task->node)) {
169         return;
170     }
171     OH_ListRemoveAll(&task->executorList, NULL);
172     pthread_cond_destroy(&task->cond);
173     pthread_mutex_destroy(&task->mutex);
174     free(task);
175 }
176 
PopTask(ThreadManager * mgr,ListNode * queue)177 static TaskNode *PopTask(ThreadManager *mgr, ListNode *queue)
178 {
179     TaskNode *task = NULL;
180     pthread_mutex_lock(&mgr->mutex);
181     ListNode *node = queue->next;
182     if (node != queue) {
183         OH_ListRemove(node);
184         OH_ListInit(node);
185         task = ListEntry(node, TaskNode, node);
186     }
187     pthread_mutex_unlock(&mgr->mutex);
188     return task;
189 }
190 
PushTask(ThreadManager * mgr,TaskNode * task,ListNode * queue)191 static void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue)
192 {
193     pthread_mutex_lock(&mgr->mutex);
194     OH_ListAddTail(queue, &task->node);
195     pthread_cond_broadcast(&mgr->cond);
196     pthread_mutex_unlock(&mgr->mutex);
197 }
198 
SafeRemoveTask(ThreadManager * mgr,TaskNode * task)199 static void SafeRemoveTask(ThreadManager *mgr, TaskNode *task)
200 {
201     pthread_mutex_lock(&mgr->mutex);
202     OH_ListRemove(&task->node);
203     OH_ListInit(&task->node);
204     pthread_mutex_unlock(&mgr->mutex);
205 
206     ListNode *node = task->executorList.next;
207     while (node != &task->executorList) {
208         OH_ListRemove(node);
209         OH_ListInit(node);
210         TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
211         pthread_mutex_lock(&mgr->mutex);
212         if (!ListEmpty(executor->executeNode)) {
213             OH_ListRemove(&executor->executeNode);
214             OH_ListInit(&executor->executeNode);
215             mgr->executorCount--;
216         }
217         pthread_mutex_unlock(&mgr->mutex);
218         free(executor);
219 
220         node = task->executorList.next;
221     }
222 }
223 
ExecuteTask(ThreadManager * mgr)224 static void ExecuteTask(ThreadManager *mgr)
225 {
226     TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue);
227     if (task == NULL) {
228         return;
229     }
230 
231     APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId);
232     AddExecutor(mgr, task);
233     PushTask(mgr, task, &mgr->executingTaskQueue);
234     return;
235 }
236 
CheckTaskComplete(ThreadManager * mgr)237 static void CheckTaskComplete(ThreadManager *mgr)
238 {
239     TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue);
240     if (task == NULL) {
241         return;
242     }
243     if (task->totalTask <= atomic_load(&task->finishTaskCount)) {
244         if (task->finishProcess != NULL) {
245             task->finishProcess(task->taskId, task->context);
246             DeleteTask(task);
247             return;
248         }
249         pthread_mutex_lock(&task->mutex);
250         pthread_cond_signal(&task->cond);
251         pthread_mutex_unlock(&task->mutex);
252         return;
253     }
254     PushTask(mgr, task, &mgr->executingTaskQueue);
255     return;
256 }
257 
TaskQueueDestroyProc(ListNode * node)258 static void TaskQueueDestroyProc(ListNode *node)
259 {
260     OH_ListRemove(node);
261     TaskNode *task = ListEntry(node, TaskNode, node);
262     DeleteTask(task);
263 }
264 
CreateThreadMgr(uint32_t maxThreadCount,ThreadMgr * instance)265 int CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance)
266 {
267     if (g_threadManager != NULL) {
268         *instance = (ThreadMgr)g_threadManager;
269         return 0;
270     }
271 
272     ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode));
273     APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager");
274 
275     mgr->executorCount = 0;
276     mgr->currTaskId = 0;
277     mgr->validThreadCount = 0;
278     mgr->maxThreadCount = maxThreadCount;
279     OH_ListInit(&mgr->taskList);
280     OH_ListInit(&mgr->waitingTaskQueue);
281     OH_ListInit(&mgr->executingTaskQueue);
282     OH_ListInit(&mgr->executorQueue);
283     pthread_mutex_init(&mgr->mutex, NULL);
284     SetCondAttr(&mgr->cond);
285 
286     for (uint32_t index = 0; index < maxThreadCount + 1; index++) {
287         mgr->threadNode[index].index = index;
288         mgr->threadNode[index].threadId = INVALID_THREAD_ID;
289         atomic_init(&mgr->threadNode[index].threadExit, 0);
290     }
291     g_threadManager = mgr;
292     int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]);
293     if (ret != 0) {
294         APPSPAWN_LOGE("Failed to create thread for manager");
295         g_threadManager = NULL;
296         free(mgr);
297         return -1;
298     }
299     *instance = (ThreadMgr)mgr;
300     APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount);
301     return 0;
302 }
303 
DestroyThreadMgr(ThreadMgr instance)304 int DestroyThreadMgr(ThreadMgr instance)
305 {
306     APPSPAWN_LOGV("DestroyThreadMgr");
307     ThreadManager *mgr = (ThreadManager *)instance;
308     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
309 
310     for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
311         if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
312             atomic_store(&mgr->threadNode[index].threadExit, 1);
313             APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit);
314         }
315     }
316     pthread_mutex_lock(&mgr->mutex);
317     pthread_cond_broadcast(&mgr->cond);
318     pthread_mutex_unlock(&mgr->mutex);
319     for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
320         if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
321             pthread_join(mgr->threadNode[index].threadId, NULL);
322             APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index);
323         }
324     }
325 
326     pthread_mutex_lock(&mgr->mutex);
327     OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc);
328     OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc);
329     OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc);
330     OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc);
331     pthread_mutex_unlock(&mgr->mutex);
332 
333     pthread_cond_destroy(&mgr->cond);
334     pthread_mutex_destroy(&mgr->mutex);
335     return 0;
336 }
337 
ThreadMgrAddTask(ThreadMgr instance,ThreadTaskHandle * taskHandle)338 int ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle)
339 {
340     ThreadManager *mgr = (ThreadManager *)instance;
341     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
342     TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode));
343     APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task");
344 
345     task->context = NULL;
346     task->finishProcess = NULL;
347     task->totalTask = 0;
348     atomic_init(&task->taskFlags, 0);
349     atomic_init(&task->finishTaskCount, 0);
350     OH_ListInit(&task->node);
351     OH_ListInit(&task->executorList);
352     pthread_mutex_init(&task->mutex, NULL);
353     SetCondAttr(&task->cond);
354 
355     pthread_mutex_lock(&mgr->mutex);
356     task->taskId = mgr->currTaskId++;
357     OH_ListAddTail(&mgr->taskList, &task->node);
358     pthread_mutex_unlock(&mgr->mutex);
359     *taskHandle = task->taskId;
360     APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId);
361     return 0;
362 }
363 
ThreadMgrAddExecutor(ThreadMgr instance,ThreadTaskHandle taskHandle,TaskExecutor executor,const ThreadContext * context)364 int ThreadMgrAddExecutor(ThreadMgr instance,
365     ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context)
366 {
367     ThreadManager *mgr = (ThreadManager *)instance;
368     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
369     TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
370     APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
371 
372     TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode));
373     APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle);
374     node->task = task;
375     OH_ListInit(&node->node);
376     OH_ListInit(&node->executeNode);
377     node->context = context;
378     node->executor = executor;
379     task->totalTask++;
380     OH_ListAddTail(&task->executorList, &node->node);
381     return 0;
382 }
383 
ThreadMgrCancelTask(ThreadMgr instance,ThreadTaskHandle taskHandle)384 int ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle)
385 {
386     ThreadManager *mgr = (ThreadManager *)instance;
387     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
388     TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
389     if (task != NULL) {
390         SafeRemoveTask(mgr, task);
391         DeleteTask(task);
392         return 0;
393     }
394     task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle);
395     if (task != NULL) {
396         SafeRemoveTask(mgr, task);
397         DeleteTask(task);
398         return 0;
399     }
400     task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle);
401     if (task != NULL) {
402         SafeRemoveTask(mgr, task);
403         DeleteTask(task);
404         return 0;
405     }
406     return 0;
407 }
408 
TaskSyncExecute(ThreadMgr instance,ThreadTaskHandle taskHandle)409 int TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle)
410 {
411     ThreadManager *mgr = (ThreadManager *)instance;
412     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
413     TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
414     APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
415 
416     pthread_mutex_lock(&task->mutex);
417     OH_ListRemove(&task->node);
418     OH_ListInit(&task->node);
419     OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
420     pthread_cond_broadcast(&mgr->cond);
421     pthread_mutex_unlock(&task->mutex);
422     APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId);
423     struct timespec abstime;
424     int ret = 0;
425     do {
426         ConvertToTimespec(60 * 1000, &abstime);  // wait 60 * 1000 60s
427         pthread_mutex_lock(&task->mutex);
428         ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime);
429         pthread_mutex_unlock(&task->mutex);
430         APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret);
431     } while (ret == ETIMEDOUT);
432 
433     DeleteTask(task);
434     return ret;
435 }
436 
TaskExecute(ThreadMgr instance,ThreadTaskHandle taskHandle,TaskFinishProcessor process,const ThreadContext * context)437 int TaskExecute(ThreadMgr instance,
438     ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context)
439 {
440     ThreadManager *mgr = (ThreadManager *)instance;
441     APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
442     TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
443     APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
444 
445     task->finishProcess = process;
446     task->context = context;
447     pthread_mutex_lock(&mgr->mutex);
448     OH_ListRemove(&task->node);
449     OH_ListInit(&task->node);
450     OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
451     pthread_cond_broadcast(&mgr->cond);
452     pthread_mutex_unlock(&mgr->mutex);
453     APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId);
454     return 0;
455 }
456 
CheckAndCreateNewThread(ThreadManager * mgr)457 static void CheckAndCreateNewThread(ThreadManager *mgr)
458 {
459     if (mgr->maxThreadCount <= mgr->validThreadCount) {
460         return;
461     }
462     if (mgr->executorCount <= mgr->validThreadCount) {
463         return;
464     }
465     APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u",
466         mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount);
467 
468     uint32_t totalThread = mgr->maxThreadCount;
469     if (mgr->executorCount <= mgr->maxThreadCount) {
470         totalThread = mgr->executorCount;
471     }
472 
473     for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
474         if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
475             continue;
476         }
477         int ret = pthread_create(&mgr->threadNode[index].threadId,
478             NULL, ThreadExecute, (void *)&(mgr->threadNode[index]));
479         APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index);
480         APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index);
481         mgr->validThreadCount++;
482         if (mgr->validThreadCount >= totalThread) {
483             return;
484         }
485     }
486     return;
487 }
488 
ManagerThreadProc(void * args)489 static void *ManagerThreadProc(void *args)
490 {
491     ThreadManager *mgr = g_threadManager;
492     ThreadNode *threadNode = (ThreadNode *)args;
493     struct timespec abstime;
494     while (!threadNode->threadExit) {
495         pthread_mutex_lock(&mgr->mutex);
496         do {
497             uint32_t timeout = 60 * 1000; // 60 * 1000 60s
498             if (!ListEmpty(mgr->waitingTaskQueue)) {
499                 break;
500             }
501             if (!ListEmpty(mgr->executingTaskQueue)) {
502                 timeout = 500; // 500ms
503             }
504             ConvertToTimespec(timeout, &abstime);
505             int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
506             if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) {
507                 break;
508             }
509             if (threadNode->threadExit) {
510                 break;
511             }
512         } while (1);
513         pthread_mutex_unlock(&mgr->mutex);
514 
515         ExecuteTask(mgr);
516         CheckAndCreateNewThread(mgr);
517 
518         if (mgr->validThreadCount == 0) {
519             RunExecutor(mgr, threadNode, 5); // 5 max thread
520         }
521         CheckTaskComplete(mgr);
522     }
523     return 0;
524 }
525 
ThreadExecute(void * args)526 static void *ThreadExecute(void *args)
527 {
528     ThreadManager *mgr = g_threadManager;
529     ThreadNode *threadNode = (ThreadNode *)args;
530     struct timespec abstime;
531     while (!threadNode->threadExit) {
532         pthread_mutex_lock(&mgr->mutex);
533         while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) {
534             ConvertToTimespec(60 * 1000, &abstime); // 60 * 1000 60s
535             pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
536         }
537         pthread_mutex_unlock(&mgr->mutex);
538         APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit);
539         RunExecutor(mgr, threadNode, 1);
540     }
541     return NULL;
542 }
543