1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ipc_thread_pool.h"
17 
18 #include <unistd.h>
19 
20 #include "ipc_process_skeleton.h"
21 #include "iremote_invoker.h"
22 #include "rpc_errno.h"
23 #include "rpc_log.h"
24 #include "rpc_os_adapter.h"
25 #include "rpc_types.h"
26 #include "securec.h"
27 
28 #define PROTO_NUM 2
29 
30 static pthread_key_t g_localKey = -1;
31 static RemoteInvoker *g_invoker[PROTO_NUM];
32 
GetCurrentThreadContext(void)33 ThreadContext *GetCurrentThreadContext(void)
34 {
35     ThreadContext *current = NULL;
36     void *curTLS = pthread_getspecific(g_localKey);
37     if (curTLS != NULL) {
38         current = (ThreadContext *)curTLS;
39     } else {
40         current = (ThreadContext *)calloc(1, sizeof(ThreadContext));
41         if (current == NULL) {
42             return NULL;
43         }
44         current->threadId = pthread_self();
45         current->proto = IF_PROT_DEFAULT;
46         current->callerPid = RpcGetPid();
47         current->callerUid = RpcGetUid();
48         pthread_setspecific(g_localKey, current);
49     }
50     return current;
51 }
52 
TlsDestructor(void * args)53 static void TlsDestructor(void *args)
54 {
55     ThreadContext *threadContext = (ThreadContext *)args;
56     RemoteInvoker *invoker = g_invoker[threadContext->proto];
57     free(threadContext);
58     if (invoker != NULL && invoker->ExitCurrentThread != NULL) {
59         (invoker->ExitCurrentThread)();
60     }
61 }
62 
ThreadContextDestructor(int32_t proto)63 static void ThreadContextDestructor(int32_t proto)
64 {
65     ThreadPool *threadPool = GetCurrentSkeleton()->threadPool;
66     pthread_mutex_lock(&threadPool->lock);
67     if (proto == IF_PROT_BINDER) {
68         ++threadPool->idleThreadNum;
69     } else if (proto == IF_PROT_DATABUS) {
70         ++threadPool->idleSocketThreadNum;
71     }
72     pthread_mutex_unlock(&threadPool->lock);
73 }
74 
GetAndUpdateInvoker(int32_t proto)75 static RemoteInvoker *GetAndUpdateInvoker(int32_t proto)
76 {
77     ThreadContext *threadContext = GetCurrentThreadContext();
78     if (threadContext == NULL) {
79         return NULL;
80     }
81     threadContext->proto = proto;
82     return g_invoker[proto];
83 }
84 
ThreadHandler(void * args)85 static void *ThreadHandler(void *args)
86 {
87     ThreadContext *threadContext = (ThreadContext *)args;
88     int32_t proto = threadContext->proto;
89     int32_t policy = threadContext->policy;
90     free(threadContext);
91     threadContext = NULL;
92     RemoteInvoker *invoker = GetAndUpdateInvoker(proto);
93     if (invoker != NULL) {
94         switch (policy) {
95             case SPAWN_PASSIVE:
96                 invoker->JoinThread(false);
97                 break;
98             case SPAWN_ACTIVE:
99                 invoker->JoinThread(true);
100                 break;
101             default:
102                 break;
103         }
104     }
105     ThreadContextDestructor(proto);
106     return NULL;
107 }
108 
InitThreadPool(int32_t maxThreadNum)109 ThreadPool *InitThreadPool(int32_t maxThreadNum)
110 {
111     ThreadPool *threadPool = (ThreadPool*)calloc(1, sizeof(ThreadPool));
112     if (threadPool == NULL) {
113         return NULL;
114     }
115     threadPool->maxThreadNum = maxThreadNum + maxThreadNum;
116     threadPool->idleThreadNum = maxThreadNum;
117     threadPool->idleSocketThreadNum = maxThreadNum;
118     pthread_mutex_init(&threadPool->lock, NULL);
119     pthread_key_create(&g_localKey, TlsDestructor);
120     for (int32_t index = 0; index < PROTO_NUM; ++index) {
121         g_invoker[index] = InitRemoteInvoker(index);
122     }
123     return threadPool;
124 }
125 
DeinitThreadPool(ThreadPool * threadPool)126 void DeinitThreadPool(ThreadPool *threadPool)
127 {
128     if (threadPool == NULL) {
129         return;
130     }
131     pthread_mutex_destroy(&threadPool->lock);
132     pthread_key_delete(g_localKey);
133     free(threadPool);
134     threadPool = NULL;
135     for (int32_t index = 0; index < PROTO_NUM; ++index) {
136         DeinitRemoteInvoker(g_invoker[index], index);
137         g_invoker[index] = NULL;
138     }
139 }
140 
SpawnNewThread(ThreadPool * threadPool,int32_t policy,int32_t proto)141 int32_t SpawnNewThread(ThreadPool *threadPool, int32_t policy, int32_t proto)
142 {
143     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
144         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
145         RPC_LOG_ERROR("thread pool is full.");
146         return ERR_INVALID_PARAM;
147     }
148     pthread_t threadId;
149     if (pthread_mutex_lock(&threadPool->lock) != 0) {
150         RPC_LOG_ERROR("get thread pool lock failed.");
151         return ERR_FAILED;
152     }
153     if (!(proto == IF_PROT_BINDER && threadPool->idleThreadNum > 0) &&
154         !(proto == IF_PROT_DATABUS && threadPool->idleSocketThreadNum > 0)) {
155         pthread_mutex_unlock(&threadPool->lock);
156         RPC_LOG_ERROR("thread pool is full.");
157         return ERR_INVALID_PARAM;
158     }
159     ThreadContext *threadContext = (ThreadContext *)calloc(1, sizeof(ThreadContext));
160     if (threadContext == NULL) {
161         pthread_mutex_unlock(&threadPool->lock);
162         RPC_LOG_ERROR("create thread context failed.");
163         return ERR_FAILED;
164     }
165     threadContext->proto = proto;
166     threadContext->policy = policy;
167     int ret = pthread_create(&threadId, NULL, ThreadHandler, threadContext);
168     if (ret != 0) {
169         pthread_mutex_unlock(&threadPool->lock);
170         free(threadContext);
171         RPC_LOG_ERROR("spawn new thread failed.");
172         return ERR_FAILED;
173     }
174     pthread_detach(threadId);
175     if (proto == IF_PROT_BINDER) {
176         --threadPool->idleThreadNum;
177     } else if (proto == IF_PROT_DATABUS) {
178         --threadPool->idleSocketThreadNum;
179     }
180     pthread_mutex_unlock(&threadPool->lock);
181     return ERR_NONE;
182 }
183 
UpdateMaxThreadNum(ThreadPool * threadPool,int32_t maxThreadNum)184 void UpdateMaxThreadNum(ThreadPool *threadPool, int32_t maxThreadNum)
185 {
186     int32_t totalNum = maxThreadNum + maxThreadNum;
187     if (pthread_mutex_lock(&threadPool->lock) != 0) {
188         RPC_LOG_ERROR("get thread pool lock failed.");
189         return;
190     }
191     int32_t oldThreadNum = threadPool->maxThreadNum;
192     if (totalNum <= oldThreadNum) {
193         pthread_mutex_unlock(&threadPool->lock);
194         RPC_LOG_ERROR("not support set lower max thread num.");
195         return;
196     }
197     int32_t diff = totalNum - oldThreadNum;
198     threadPool->maxThreadNum = totalNum;
199     threadPool->idleThreadNum += diff / PROTO_NUM;
200     threadPool->idleSocketThreadNum += diff / PROTO_NUM;
201     pthread_mutex_unlock(&threadPool->lock);
202 }
203 
GetRemoteInvoker(void)204 RemoteInvoker *GetRemoteInvoker(void)
205 {
206     ThreadContext *threadContext = GetCurrentThreadContext();
207     if (threadContext == NULL) {
208         return NULL;
209     }
210     return g_invoker[threadContext->proto];
211 }