1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rpc_process_skeleton.h"
17
18 #include <stdlib.h>
19 #include <sys/time.h>
20 #include <errno.h>
21
22 #include "dbinder_types.h"
23 #include "ipc_proxy_inner.h"
24 #include "ipc_stub_inner.h"
25 #include "rpc_errno.h"
26 #include "rpc_log.h"
27 #include "rpc_session_handle.h"
28 #include "rpc_trans_callback.h"
29 #include "rpc_types.h"
30
31 static RpcSkeleton g_rpcSkeleton = {
32 .lock = PTHREAD_MUTEX_INITIALIZER,
33 .isServerCreated = -1
34 };
35 static pthread_mutex_t g_rpcSkeletonMutex = PTHREAD_MUTEX_INITIALIZER;
36
37 // rpc data cache
38 static StubObjectList g_stubObjectList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
39 static ThreadProcessInfoList g_processInfoList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
40 static SocketThreadLockInfoList g_socketLockInfoList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
41 static IdleDataThreadsList g_idleDataThreadsList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
42 static HandleSessionList g_stubSessionList;
43 static pthread_mutex_t g_stubSessionMutex = PTHREAD_MUTEX_INITIALIZER;
44 static HandleSessionList g_proxySessionList;
45 static pthread_mutex_t g_proxySessionMutex = PTHREAD_MUTEX_INITIALIZER;
46 static HandleToIndexList g_handleToIndexList;
47 static pthread_mutex_t g_handleToIndexMutex = PTHREAD_MUTEX_INITIALIZER;
48 static ThreadMessageInfo g_seqNumberToThread;
49 static pthread_mutex_t g_seqNumberToThreadMutex = PTHREAD_MUTEX_INITIALIZER;
50 static SessionIdList g_sessionIdList = {
51 .mutex = PTHREAD_MUTEX_INITIALIZER,
52 .condition = PTHREAD_COND_INITIALIZER
53 };
54
RpcProcessSkeleton(void)55 int32_t RpcProcessSkeleton(void)
56 {
57 pthread_mutex_lock(&g_rpcSkeletonMutex);
58
59 g_rpcSkeleton.rpcTrans = GetRpcTrans();
60 if (g_rpcSkeleton.rpcTrans == NULL) {
61 RPC_LOG_ERROR("GetRpcTrans return null");
62 pthread_mutex_unlock(&g_rpcSkeletonMutex);
63 return ERR_FAILED;
64 }
65 g_rpcSkeleton.seqNumber = 0;
66
67 UtilsListInit(&g_stubObjectList.stubObjects);
68 UtilsListInit(&g_processInfoList.processInfo);
69 UtilsListInit(&g_socketLockInfoList.socketLockInfo);
70 UtilsListInit(&g_idleDataThreadsList.idleDataThread);
71 UtilsListInit(&g_stubSessionList.list);
72 UtilsListInit(&g_proxySessionList.list);
73 UtilsListInit(&g_handleToIndexList.list);
74 UtilsListInit(&g_seqNumberToThread.list);
75 UtilsListInit(&g_sessionIdList.idList);
76
77 pthread_mutex_unlock(&g_rpcSkeletonMutex);
78 return ERR_NONE;
79 }
80
GetCurrentRpcSkeleton(void)81 RpcSkeleton *GetCurrentRpcSkeleton(void)
82 {
83 return &g_rpcSkeleton;
84 }
85
AddStubByIndex(StubObject * stubObject)86 int32_t AddStubByIndex(StubObject *stubObject)
87 {
88 pthread_mutex_lock(&g_stubObjectList.mutex);
89 UtilsListAdd(&g_stubObjectList.stubObjects, &stubObject->list);
90 pthread_mutex_unlock(&g_stubObjectList.mutex);
91 return ERR_NONE;
92 }
93
QueryStubByIndex(uint64_t stubIndex)94 StubObject *QueryStubByIndex(uint64_t stubIndex)
95 {
96 StubObject *node = NULL;
97 pthread_mutex_lock(&g_stubObjectList.mutex);
98 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_stubObjectList.stubObjects, StubObject, list)
99 {
100 if (node->stubIndex == stubIndex) {
101 pthread_mutex_unlock(&g_stubObjectList.mutex);
102 return node;
103 }
104 }
105 pthread_mutex_unlock(&g_stubObjectList.mutex);
106 return NULL;
107 }
108
AttachThreadLockInfo(SocketThreadLockInfo * threadLockInfo)109 static int32_t AttachThreadLockInfo(SocketThreadLockInfo *threadLockInfo)
110 {
111 pthread_mutex_lock(&g_socketLockInfoList.mutex);
112 UtilsListAdd(&g_socketLockInfoList.socketLockInfo, &threadLockInfo->list);
113 pthread_mutex_unlock(&g_socketLockInfoList.mutex);
114 return ERR_NONE;
115 }
116
QueryThreadLockInfo(pthread_t threadId)117 static SocketThreadLockInfo *QueryThreadLockInfo(pthread_t threadId)
118 {
119 SocketThreadLockInfo *node = NULL;
120 pthread_mutex_lock(&g_socketLockInfoList.mutex);
121 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_socketLockInfoList.socketLockInfo, SocketThreadLockInfo, list)
122 {
123 if (pthread_equal(node->threadId, threadId) != 0) {
124 pthread_mutex_unlock(&g_socketLockInfoList.mutex);
125 return node;
126 }
127 }
128 pthread_mutex_unlock(&g_socketLockInfoList.mutex);
129 return NULL;
130 }
131
AddDataThreadToIdle(IdleDataThread * idleDataThread)132 static int32_t AddDataThreadToIdle(IdleDataThread *idleDataThread)
133 {
134 pthread_mutex_lock(&g_idleDataThreadsList.mutex);
135 UtilsListAdd(&g_idleDataThreadsList.idleDataThread, &idleDataThread->list);
136 pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
137 return ERR_NONE;
138 }
139
DeleteDataThreadFromIdle(IdleDataThread * idleDataThread)140 static void DeleteDataThreadFromIdle(IdleDataThread *idleDataThread)
141 {
142 pthread_mutex_lock(&g_idleDataThreadsList.mutex);
143 UtilsListDelete(&idleDataThread->list);
144 pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
145 }
146
AddDataThreadInWait(pthread_t threadId)147 void AddDataThreadInWait(pthread_t threadId)
148 {
149 SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(threadId);
150 if (threadLockInfo == NULL) {
151 threadLockInfo = (SocketThreadLockInfo *)malloc(sizeof(SocketThreadLockInfo));
152 if (threadLockInfo == NULL) {
153 RPC_LOG_ERROR("SocketThreadLockInfo malloc failed");
154 return;
155 }
156 threadLockInfo->threadId = threadId;
157 if (pthread_mutex_init(&threadLockInfo->mutex, NULL) != 0) {
158 RPC_LOG_ERROR("SocketThreadLockInfo mutex init failed");
159 free(threadLockInfo);
160 return;
161 }
162 if (pthread_cond_init(&threadLockInfo->condition, NULL) != 0) {
163 RPC_LOG_ERROR("SocketThreadLockInfo cond init failed");
164 free(threadLockInfo);
165 return;
166 }
167 if (AttachThreadLockInfo(threadLockInfo) != ERR_NONE) {
168 free(threadLockInfo);
169 return;
170 }
171 }
172
173 pthread_mutex_lock(&threadLockInfo->mutex);
174 IdleDataThread idleDataThread = {.threadId = threadId};
175 if (AddDataThreadToIdle(&idleDataThread) != ERR_NONE) {
176 RPC_LOG_ERROR("AddDataThreadToIdle failed");
177 pthread_mutex_unlock(&threadLockInfo->mutex);
178 return;
179 }
180
181 pthread_cond_wait(&threadLockInfo->condition, &threadLockInfo->mutex);
182 DeleteDataThreadFromIdle(&idleDataThread);
183 pthread_mutex_unlock(&threadLockInfo->mutex);
184 }
185
WakeUpDataThread(pthread_t threadId)186 void WakeUpDataThread(pthread_t threadId)
187 {
188 SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(threadId);
189 if (threadLockInfo != NULL) {
190 pthread_mutex_lock(&threadLockInfo->mutex);
191 pthread_cond_signal(&threadLockInfo->condition);
192 pthread_mutex_unlock(&threadLockInfo->mutex);
193 }
194 }
195
GetIdleDataThread(void)196 IdleDataThread *GetIdleDataThread(void)
197 {
198 IdleDataThread *node = NULL;
199 pthread_mutex_lock(&g_idleDataThreadsList.mutex);
200 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_idleDataThreadsList.idleDataThread, IdleDataThread, list)
201 {
202 pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
203 return node;
204 }
205 pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
206 return NULL;
207 }
208
AddDataInfoToThread(ThreadProcessInfo * processInfo)209 void AddDataInfoToThread(ThreadProcessInfo *processInfo)
210 {
211 pthread_mutex_lock(&g_processInfoList.mutex);
212 UtilsListAdd(&g_processInfoList.processInfo, &processInfo->list);
213 pthread_mutex_unlock(&g_processInfoList.mutex);
214 }
215
PopDataInfoFromThread(pthread_t threadId)216 ThreadProcessInfo *PopDataInfoFromThread(pthread_t threadId)
217 {
218 ThreadProcessInfo *node = NULL;
219 pthread_mutex_lock(&g_processInfoList.mutex);
220 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_processInfoList.processInfo, ThreadProcessInfo, list)
221 {
222 if (pthread_equal(node->threadId, threadId) != 0) {
223 UtilsListDelete(&node->list);
224 pthread_mutex_unlock(&g_processInfoList.mutex);
225 return node;
226 }
227 }
228 pthread_mutex_unlock(&g_processInfoList.mutex);
229 return NULL;
230 }
231
AttachStubSession(HandleSessionList * handleSession)232 int32_t AttachStubSession(HandleSessionList *handleSession)
233 {
234 pthread_mutex_lock(&g_stubSessionMutex);
235 UtilsListAdd(&g_stubSessionList.list, &handleSession->list);
236 pthread_mutex_unlock(&g_stubSessionMutex);
237 return ERR_NONE;
238 }
239
DetachStubSession(HandleSessionList * handleSession)240 void DetachStubSession(HandleSessionList *handleSession)
241 {
242 pthread_mutex_lock(&g_stubSessionMutex);
243 UtilsListDelete(&handleSession->list);
244 pthread_mutex_unlock(&g_stubSessionMutex);
245 }
246
QueryStubSession(uint32_t handle)247 HandleSessionList *QueryStubSession(uint32_t handle)
248 {
249 HandleSessionList *node = NULL;
250 pthread_mutex_lock(&g_stubSessionMutex);
251 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_stubSessionList.list, HandleSessionList, list)
252 {
253 if (node->handle == handle) {
254 pthread_mutex_unlock(&g_stubSessionMutex);
255 return node;
256 }
257 }
258 pthread_mutex_unlock(&g_stubSessionMutex);
259 return NULL;
260 }
261
AttachProxySession(HandleSessionList * handleSession)262 int32_t AttachProxySession(HandleSessionList *handleSession)
263 {
264 pthread_mutex_lock(&g_proxySessionMutex);
265 UtilsListAdd(&g_proxySessionList.list, &handleSession->list);
266 pthread_mutex_unlock(&g_proxySessionMutex);
267 return ERR_NONE;
268 }
269
DetachProxySession(HandleSessionList * handleSession)270 void DetachProxySession(HandleSessionList *handleSession)
271 {
272 pthread_mutex_lock(&g_proxySessionMutex);
273 UtilsListDelete(&handleSession->list);
274 pthread_mutex_unlock(&g_proxySessionMutex);
275 }
276
QueryProxySession(uint32_t handle)277 HandleSessionList *QueryProxySession(uint32_t handle)
278 {
279 HandleSessionList *node = NULL;
280 pthread_mutex_lock(&g_proxySessionMutex);
281 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_proxySessionList.list, HandleSessionList, list)
282 {
283 if (node->handle == handle) {
284 pthread_mutex_unlock(&g_proxySessionMutex);
285 return node;
286 }
287 }
288 pthread_mutex_unlock(&g_proxySessionMutex);
289 return NULL;
290 }
291
QueryProxySessionBySessionId(uint32_t sessionId)292 HandleSessionList *QueryProxySessionBySessionId(uint32_t sessionId)
293 {
294 HandleSessionList *node = NULL;
295 pthread_mutex_lock(&g_proxySessionMutex);
296 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_proxySessionList.list, HandleSessionList, list)
297 {
298 if (node->sessionId == sessionId) {
299 pthread_mutex_unlock(&g_proxySessionMutex);
300 return node;
301 }
302 }
303 pthread_mutex_unlock(&g_proxySessionMutex);
304 return NULL;
305 }
306
ProcessGetSeqNumber()307 uint64_t ProcessGetSeqNumber()
308 {
309 pthread_mutex_lock(&g_rpcSkeleton.lock);
310
311 ++g_rpcSkeleton.seqNumber; // can be overflow, and seqNumber do not use 0
312 if (g_rpcSkeleton.seqNumber == 0) {
313 ++g_rpcSkeleton.seqNumber;
314 }
315
316 pthread_mutex_unlock(&g_rpcSkeleton.lock);
317 return g_rpcSkeleton.seqNumber;
318 }
319
AttachHandleToIndex(HandleToIndexList * handleToIndex)320 int32_t AttachHandleToIndex(HandleToIndexList *handleToIndex)
321 {
322 pthread_mutex_lock(&g_handleToIndexMutex);
323 UtilsListAdd(&g_handleToIndexList.list, &handleToIndex->list);
324 pthread_mutex_unlock(&g_handleToIndexMutex);
325 return ERR_NONE;
326 }
327
DetachHandleToIndex(HandleToIndexList * handleToIndex)328 void DetachHandleToIndex(HandleToIndexList *handleToIndex)
329 {
330 pthread_mutex_lock(&g_handleToIndexMutex);
331 UtilsListDelete(&handleToIndex->list);
332 pthread_mutex_unlock(&g_handleToIndexMutex);
333 }
334
QueryHandleToIndex(uint32_t handle)335 HandleToIndexList *QueryHandleToIndex(uint32_t handle)
336 {
337 HandleToIndexList *node = NULL;
338 pthread_mutex_lock(&g_handleToIndexMutex);
339 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_handleToIndexList.list, HandleToIndexList, list)
340 {
341 if (node->handle == handle) {
342 pthread_mutex_unlock(&g_handleToIndexMutex);
343 return node;
344 }
345 }
346 pthread_mutex_unlock(&g_handleToIndexMutex);
347 return NULL;
348 }
349
AddThreadBySeqNumber(ThreadMessageInfo * messageInfo)350 static int32_t AddThreadBySeqNumber(ThreadMessageInfo *messageInfo)
351 {
352 pthread_mutex_lock(&g_seqNumberToThreadMutex);
353 UtilsListAdd(&g_seqNumberToThread.list, &messageInfo->list);
354 pthread_mutex_unlock(&g_seqNumberToThreadMutex);
355 return ERR_NONE;
356 }
357
AddSendThreadInWait(uint64_t seqNumber,ThreadMessageInfo * messageInfo,uint32_t userWaitTime)358 int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, uint32_t userWaitTime)
359 {
360 if (AddThreadBySeqNumber(messageInfo) != ERR_NONE) {
361 RPC_LOG_ERROR("add seqNumber = %llu failed", seqNumber);
362 return ERR_FAILED;
363 }
364
365 SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(messageInfo->threadId);
366 if (threadLockInfo == NULL) {
367 threadLockInfo = (SocketThreadLockInfo *)malloc(sizeof(SocketThreadLockInfo));
368 if (threadLockInfo == NULL) {
369 RPC_LOG_ERROR("threadLockInfo malloc failed");
370 return ERR_FAILED;
371 }
372
373 pthread_mutex_init(&threadLockInfo->mutex, NULL);
374 pthread_cond_init(&threadLockInfo->condition, NULL);
375 threadLockInfo->threadId = messageInfo->threadId;
376
377 int32_t ret = AttachThreadLockInfo(threadLockInfo);
378 if (ret != ERR_NONE) {
379 RPC_LOG_ERROR("AttachThreadLockInfo fail");
380 free(threadLockInfo);
381 return ERR_FAILED;
382 }
383 }
384
385 pthread_mutex_lock(&threadLockInfo->mutex);
386
387 struct timespec waitTime;
388 struct timeval now;
389 if (gettimeofday(&now, NULL) != 0) {
390 RPC_LOG_ERROR("gettimeofday failed");
391 pthread_mutex_unlock(&threadLockInfo->mutex);
392 return ERR_FAILED;
393 }
394
395 waitTime.tv_sec = now.tv_sec + userWaitTime;
396 waitTime.tv_nsec = now.tv_usec * USECTONSEC;
397 int ret = pthread_cond_timedwait(&threadLockInfo->condition, &threadLockInfo->mutex, &waitTime);
398 pthread_mutex_unlock(&threadLockInfo->mutex);
399 if (ret == ETIMEDOUT) {
400 RPC_LOG_ERROR("send thread wait for reply timeout");
401 return ERR_FAILED;
402 }
403
404 return ERR_NONE;
405 }
406
EraseThreadBySeqNumber(ThreadMessageInfo * messageInfo)407 void EraseThreadBySeqNumber(ThreadMessageInfo *messageInfo)
408 {
409 pthread_mutex_lock(&g_seqNumberToThreadMutex);
410 UtilsListDelete(&messageInfo->list);
411 pthread_mutex_unlock(&g_seqNumberToThreadMutex);
412 }
413
QueryThreadBySeqNumber(uint64_t seqNumber)414 ThreadMessageInfo *QueryThreadBySeqNumber(uint64_t seqNumber)
415 {
416 ThreadMessageInfo *node = NULL;
417 pthread_mutex_lock(&g_seqNumberToThreadMutex);
418 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_seqNumberToThread.list, ThreadMessageInfo, list)
419 {
420 if (node->seqNumber == seqNumber) {
421 pthread_mutex_unlock(&g_seqNumberToThreadMutex);
422 return node;
423 }
424 }
425 pthread_mutex_unlock(&g_seqNumberToThreadMutex);
426 return NULL;
427 }
428
WakeUpThreadBySeqNumber(uint64_t seqNumber,uint32_t handle)429 void WakeUpThreadBySeqNumber(uint64_t seqNumber, uint32_t handle)
430 {
431 ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(seqNumber);
432 if (messageInfo == NULL) {
433 RPC_LOG_ERROR("error! messageInfo is nullptr");
434 return;
435 }
436
437 if (handle != messageInfo->sessionId) {
438 RPC_LOG_ERROR("error! handle is not equal messageInfo, handle = %u, messageFd = %u", handle,
439 messageInfo->sessionId);
440 return;
441 }
442 if (pthread_equal(messageInfo->threadId, pthread_self()) == 0) {
443 SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(messageInfo->threadId);
444 if (threadLockInfo != NULL) {
445 /* wake up this IO thread to process socket stream
446 * Wake up the client processing thread
447 */
448 pthread_mutex_lock(&threadLockInfo->mutex);
449 pthread_cond_signal(&threadLockInfo->condition);
450 pthread_mutex_unlock(&threadLockInfo->mutex);
451 }
452 }
453 }
454
RpcOnRemoteRequestInner(uint32_t code,IpcIo * data,IpcIo * reply,MessageOption option,IpcObjectStub * objectStub)455 int32_t RpcOnRemoteRequestInner(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option,
456 IpcObjectStub *objectStub)
457 {
458 int32_t result;
459 switch (code) {
460 case INVOKE_LISTEN_THREAD: {
461 result = InvokerListenThreadStub(code, data, reply, option, objectStub->func);
462 break;
463 }
464 case GET_UIDPID_INFO: {
465 result = GetPidAndUidInfoStub(code, data, reply, option);
466 break;
467 }
468 case GRANT_DATABUS_NAME: {
469 result = GrantDataBusNameStub(code, data, reply, option);
470 break;
471 }
472 default:
473 result = ERR_NOT_RPC;
474 break;
475 }
476 return result;
477 }
478
UpdateProtoIfNeed(SvcIdentity * svc)479 void UpdateProtoIfNeed(SvcIdentity *svc)
480 {
481 RPC_LOG_INFO("rpc manager update proto, handle %d", svc->handle);
482 UpdateProto(svc);
483 }
484
GetNewStubIndex(void)485 uint64_t GetNewStubIndex(void)
486 {
487 pthread_mutex_lock(&g_rpcSkeleton.lock);
488 uint64_t stubIndex = ++g_rpcSkeleton.stubIndex;
489 pthread_mutex_unlock(&g_rpcSkeleton.lock);
490 return stubIndex;
491 }
492
RpcGetSessionIdList(void)493 SessionIdList *RpcGetSessionIdList(void)
494 {
495 return &g_sessionIdList;
496 }