1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ipc_thread_pool.h"
17
18 #include <unistd.h>
19
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27
28 #define PROTO_NUM 2
29
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35 ThreadContext *current = NULL;
36 void *curTLS = pthread_getspecific(g_localKey);
37 if (curTLS != NULL) {
38 current = (ThreadContext *)curTLS;
39 } else {
40 current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41 if (current == NULL) {
42 return NULL;
43 }
44 current->threadId = pthread_self();
45 current->proto = IF_PROT_DEFAULT;
46 current->callerPid = RpcGetPid();
47 current->callerUid = RpcGetUid();
48 pthread_setspecific(g_localKey, current);
49 }
50 return current;
51 }
52
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55 ThreadContext *threadContext = (ThreadContext *)args;
56 RemoteInvoker *invoker = g_invoker[threadContext->proto];
57 free(threadContext);
58 if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59 (invoker->ExitCurrentThread)();
60 }
61 }
62
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65 ThreadPool *threadPool = GetCurrentSkeleton()->threadPool;
66 pthread_mutex_lock(&threadPool->lock);
67 if (proto == IF_PROT_BINDER) {
68 ++threadPool->idleThreadNum;
69 } else if (proto == IF_PROT_DATABUS) {
70 ++threadPool->idleSocketThreadNum;
71 }
72 pthread_mutex_unlock(&threadPool->lock);
73 }
74
GetAndUpdateInvoker(int32_t proto)75 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
76 {
77 ThreadContext *threadContext = GetCurrentThreadContext();
78 if (threadContext == NULL) {
79 return NULL;
80 }
81 threadContext->proto = proto;
82 return g_invoker[proto];
83 }
84
ThreadHandler(void * args)85 static void *ThreadHandler(void *args)
86 {
87 ThreadContext *threadContext = (ThreadContext *)args;
88 int32_t proto = threadContext->proto;
89 int32_t policy = threadContext->policy;
90 free(threadContext);
91 threadContext = NULL;
92 RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
93 if (invoker != NULL) {
94 switch (policy) {
95 case SPAWN_PASSIVE:
96 invoker->JoinThread(false);
97 break;
98 case SPAWN_ACTIVE:
99 invoker->JoinThread(true);
100 break;
101 default:
102 break;
103 }
104 }
105 ThreadContextDestructor(proto);
106 return NULL;
107 }
108
InitThreadPool(int32_t maxThreadNum)109 ThreadPool *InitThreadPool(int32_t maxThreadNum)
110 {
111 ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
112 if (threadPool == NULL) {
113 return NULL;
114 }
115 threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
116 threadPool->idleThreadNum = maxThreadNum;
117 threadPool->idleSocketThreadNum = maxThreadNum;
118 pthread_mutex_init(&threadPool->lock, NULL);
119 pthread_key_create(&g_localKey, TlsDestructor);
120 for (int32_t index = 0; index < PROTO_NUM; ++index) {
121 g_invoker[index] = InitRemoteInvoker(index);
122 }
123 return threadPool;
124 }
125
DeinitThreadPool(ThreadPool * threadPool)126 void DeinitThreadPool(ThreadPool *threadPool)
127 {
128 if (threadPool == NULL) {
129 return;
130 }
131 pthread_mutex_destroy(&threadPool->lock);
132 pthread_key_delete(g_localKey);
133 free(threadPool);
134 threadPool = NULL;
135 for (int32_t index = 0; index < PROTO_NUM; ++index) {
136 DeinitRemoteInvoker(g_invoker[index], index);
137 g_invoker[index] = NULL;
138 }
139 }
140
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)141 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
142 {
143 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
144 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
145 RPC_LOG_ERROR("thread pool is full.");
146 return ERR_INVALID_PARAM;
147 }
148 pthread_t threadId;
149 if (pthread_mutex_lock(&threadPool->lock) != 0) {
150 RPC_LOG_ERROR("get thread pool lock failed.");
151 return ERR_FAILED;
152 }
153 if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
154 !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
155 pthread_mutex_unlock(&threadPool->lock);
156 RPC_LOG_ERROR("thread pool is full.");
157 return ERR_INVALID_PARAM;
158 }
159 ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
160 if (threadContext == NULL) {
161 pthread_mutex_unlock(&threadPool->lock);
162 RPC_LOG_ERROR("create thread context failed.");
163 return ERR_FAILED;
164 }
165 threadContext->proto = proto;
166 threadContext->policy = policy;
167 int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
168 if (ret != 0) {
169 pthread_mutex_unlock(&threadPool->lock);
170 free(threadContext);
171 RPC_LOG_ERROR("spawn new thread failed.");
172 return ERR_FAILED;
173 }
174 pthread_detach(threadId);
175 if (proto == IF_PROT_BINDER) {
176 --threadPool->idleThreadNum;
177 } else if (proto == IF_PROT_DATABUS) {
178 --threadPool->idleSocketThreadNum;
179 }
180 pthread_mutex_unlock(&threadPool->lock);
181 return ERR_NONE;
182 }
183
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)184 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
185 {
186 int32_t totalNum = maxThreadNum + maxThreadNum;
187 if (pthread_mutex_lock(&threadPool->lock) != 0) {
188 RPC_LOG_ERROR("get thread pool lock failed.");
189 return;
190 }
191 int32_t oldThreadNum = threadPool->maxThreadNum;
192 if (totalNum <= oldThreadNum) {
193 pthread_mutex_unlock(&threadPool->lock);
194 RPC_LOG_ERROR("not support set lower max thread num.");
195 return;
196 }
197 int32_t diff = totalNum - oldThreadNum;
198 threadPool->maxThreadNum = totalNum;
199 threadPool->idleThreadNum += diff / PROTO_NUM;
200 threadPool->idleSocketThreadNum += diff / PROTO_NUM;
201 pthread_mutex_unlock(&threadPool->lock);
202 }
203
GetRemoteInvoker(void)204 RemoteInvoker *GetRemoteInvoker(void)
205 {
206 ThreadContext *threadContext = GetCurrentThreadContext();
207 if (threadContext == NULL) {
208 return NULL;
209 }
210 return g_invoker[threadContext->proto];
211 }