1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rpc_process_skeleton.h"
17 
18 #include <stdlib.h>
19 #include <sys/time.h>
20 #include <errno.h>
21 
22 #include "dbinder_types.h"
23 #include "ipc_proxy_inner.h"
24 #include "ipc_stub_inner.h"
25 #include "rpc_errno.h"
26 #include "rpc_log.h"
27 #include "rpc_session_handle.h"
28 #include "rpc_trans_callback.h"
29 #include "rpc_types.h"
30 
31 static RpcSkeleton g_rpcSkeleton = {
32     .lock = PTHREAD_MUTEX_INITIALIZER,
33     .isServerCreated = -1
34 };
35 static pthread_mutex_t g_rpcSkeletonMutex = PTHREAD_MUTEX_INITIALIZER;
36 
37 // rpc data cache
38 static StubObjectList g_stubObjectList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
39 static ThreadProcessInfoList g_processInfoList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
40 static SocketThreadLockInfoList g_socketLockInfoList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
41 static IdleDataThreadsList g_idleDataThreadsList = {.mutex = PTHREAD_MUTEX_INITIALIZER};
42 static HandleSessionList g_stubSessionList;
43 static pthread_mutex_t g_stubSessionMutex = PTHREAD_MUTEX_INITIALIZER;
44 static HandleSessionList g_proxySessionList;
45 static pthread_mutex_t g_proxySessionMutex = PTHREAD_MUTEX_INITIALIZER;
46 static HandleToIndexList g_handleToIndexList;
47 static pthread_mutex_t g_handleToIndexMutex = PTHREAD_MUTEX_INITIALIZER;
48 static ThreadMessageInfo g_seqNumberToThread;
49 static pthread_mutex_t g_seqNumberToThreadMutex = PTHREAD_MUTEX_INITIALIZER;
50 static SessionIdList g_sessionIdList = {
51     .mutex = PTHREAD_MUTEX_INITIALIZER,
52     .condition = PTHREAD_COND_INITIALIZER
53 };
54 
RpcProcessSkeleton(void)55 int32_t RpcProcessSkeleton(void)
56 {
57     pthread_mutex_lock(&g_rpcSkeletonMutex);
58 
59     g_rpcSkeleton.rpcTrans = GetRpcTrans();
60     if (g_rpcSkeleton.rpcTrans == NULL) {
61         RPC_LOG_ERROR("GetRpcTrans return null");
62         pthread_mutex_unlock(&g_rpcSkeletonMutex);
63         return ERR_FAILED;
64     }
65     g_rpcSkeleton.seqNumber = 0;
66 
67     UtilsListInit(&g_stubObjectList.stubObjects);
68     UtilsListInit(&g_processInfoList.processInfo);
69     UtilsListInit(&g_socketLockInfoList.socketLockInfo);
70     UtilsListInit(&g_idleDataThreadsList.idleDataThread);
71     UtilsListInit(&g_stubSessionList.list);
72     UtilsListInit(&g_proxySessionList.list);
73     UtilsListInit(&g_handleToIndexList.list);
74     UtilsListInit(&g_seqNumberToThread.list);
75     UtilsListInit(&g_sessionIdList.idList);
76 
77     pthread_mutex_unlock(&g_rpcSkeletonMutex);
78     return ERR_NONE;
79 }
80 
GetCurrentRpcSkeleton(void)81 RpcSkeleton *GetCurrentRpcSkeleton(void)
82 {
83     return &g_rpcSkeleton;
84 }
85 
AddStubByIndex(StubObject * stubObject)86 int32_t AddStubByIndex(StubObject *stubObject)
87 {
88     pthread_mutex_lock(&g_stubObjectList.mutex);
89     UtilsListAdd(&g_stubObjectList.stubObjects, &stubObject->list);
90     pthread_mutex_unlock(&g_stubObjectList.mutex);
91     return ERR_NONE;
92 }
93 
QueryStubByIndex(uint64_t stubIndex)94 StubObject *QueryStubByIndex(uint64_t stubIndex)
95 {
96     StubObject *node = NULL;
97     pthread_mutex_lock(&g_stubObjectList.mutex);
98     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_stubObjectList.stubObjects, StubObject, list)
99     {
100         if (node->stubIndex == stubIndex) {
101             pthread_mutex_unlock(&g_stubObjectList.mutex);
102             return node;
103         }
104     }
105     pthread_mutex_unlock(&g_stubObjectList.mutex);
106     return NULL;
107 }
108 
AttachThreadLockInfo(SocketThreadLockInfo * threadLockInfo)109 static int32_t AttachThreadLockInfo(SocketThreadLockInfo *threadLockInfo)
110 {
111     pthread_mutex_lock(&g_socketLockInfoList.mutex);
112     UtilsListAdd(&g_socketLockInfoList.socketLockInfo, &threadLockInfo->list);
113     pthread_mutex_unlock(&g_socketLockInfoList.mutex);
114     return ERR_NONE;
115 }
116 
QueryThreadLockInfo(pthread_t threadId)117 static SocketThreadLockInfo *QueryThreadLockInfo(pthread_t threadId)
118 {
119     SocketThreadLockInfo *node = NULL;
120     pthread_mutex_lock(&g_socketLockInfoList.mutex);
121     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_socketLockInfoList.socketLockInfo, SocketThreadLockInfo, list)
122     {
123         if (pthread_equal(node->threadId, threadId) != 0) {
124             pthread_mutex_unlock(&g_socketLockInfoList.mutex);
125             return node;
126         }
127     }
128     pthread_mutex_unlock(&g_socketLockInfoList.mutex);
129     return NULL;
130 }
131 
AddDataThreadToIdle(IdleDataThread * idleDataThread)132 static int32_t AddDataThreadToIdle(IdleDataThread *idleDataThread)
133 {
134     pthread_mutex_lock(&g_idleDataThreadsList.mutex);
135     UtilsListAdd(&g_idleDataThreadsList.idleDataThread, &idleDataThread->list);
136     pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
137     return ERR_NONE;
138 }
139 
DeleteDataThreadFromIdle(IdleDataThread * idleDataThread)140 static void DeleteDataThreadFromIdle(IdleDataThread *idleDataThread)
141 {
142     pthread_mutex_lock(&g_idleDataThreadsList.mutex);
143     UtilsListDelete(&idleDataThread->list);
144     pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
145 }
146 
AddDataThreadInWait(pthread_t threadId)147 void AddDataThreadInWait(pthread_t threadId)
148 {
149     SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(threadId);
150     if (threadLockInfo == NULL) {
151         threadLockInfo = (SocketThreadLockInfo *)malloc(sizeof(SocketThreadLockInfo));
152         if (threadLockInfo == NULL) {
153             RPC_LOG_ERROR("SocketThreadLockInfo malloc failed");
154             return;
155         }
156         threadLockInfo->threadId = threadId;
157         if (pthread_mutex_init(&threadLockInfo->mutex, NULL) != 0) {
158             RPC_LOG_ERROR("SocketThreadLockInfo mutex init failed");
159             free(threadLockInfo);
160             return;
161         }
162         if (pthread_cond_init(&threadLockInfo->condition, NULL) != 0) {
163             RPC_LOG_ERROR("SocketThreadLockInfo cond init failed");
164             free(threadLockInfo);
165             return;
166         }
167         if (AttachThreadLockInfo(threadLockInfo) != ERR_NONE) {
168             free(threadLockInfo);
169             return;
170         }
171     }
172 
173     pthread_mutex_lock(&threadLockInfo->mutex);
174     IdleDataThread idleDataThread = {.threadId = threadId};
175     if (AddDataThreadToIdle(&idleDataThread) != ERR_NONE) {
176         RPC_LOG_ERROR("AddDataThreadToIdle failed");
177         pthread_mutex_unlock(&threadLockInfo->mutex);
178         return;
179     }
180 
181     pthread_cond_wait(&threadLockInfo->condition, &threadLockInfo->mutex);
182     DeleteDataThreadFromIdle(&idleDataThread);
183     pthread_mutex_unlock(&threadLockInfo->mutex);
184 }
185 
WakeUpDataThread(pthread_t threadId)186 void WakeUpDataThread(pthread_t threadId)
187 {
188     SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(threadId);
189     if (threadLockInfo != NULL) {
190         pthread_mutex_lock(&threadLockInfo->mutex);
191         pthread_cond_signal(&threadLockInfo->condition);
192         pthread_mutex_unlock(&threadLockInfo->mutex);
193     }
194 }
195 
GetIdleDataThread(void)196 IdleDataThread *GetIdleDataThread(void)
197 {
198     IdleDataThread *node = NULL;
199     pthread_mutex_lock(&g_idleDataThreadsList.mutex);
200     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_idleDataThreadsList.idleDataThread, IdleDataThread, list)
201     {
202         pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
203         return node;
204     }
205     pthread_mutex_unlock(&g_idleDataThreadsList.mutex);
206     return NULL;
207 }
208 
AddDataInfoToThread(ThreadProcessInfo * processInfo)209 void AddDataInfoToThread(ThreadProcessInfo *processInfo)
210 {
211     pthread_mutex_lock(&g_processInfoList.mutex);
212     UtilsListAdd(&g_processInfoList.processInfo, &processInfo->list);
213     pthread_mutex_unlock(&g_processInfoList.mutex);
214 }
215 
PopDataInfoFromThread(pthread_t threadId)216 ThreadProcessInfo *PopDataInfoFromThread(pthread_t threadId)
217 {
218     ThreadProcessInfo *node = NULL;
219     pthread_mutex_lock(&g_processInfoList.mutex);
220     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_processInfoList.processInfo, ThreadProcessInfo, list)
221     {
222         if (pthread_equal(node->threadId, threadId) != 0) {
223             UtilsListDelete(&node->list);
224             pthread_mutex_unlock(&g_processInfoList.mutex);
225             return node;
226         }
227     }
228     pthread_mutex_unlock(&g_processInfoList.mutex);
229     return NULL;
230 }
231 
AttachStubSession(HandleSessionList * handleSession)232 int32_t AttachStubSession(HandleSessionList *handleSession)
233 {
234     pthread_mutex_lock(&g_stubSessionMutex);
235     UtilsListAdd(&g_stubSessionList.list, &handleSession->list);
236     pthread_mutex_unlock(&g_stubSessionMutex);
237     return ERR_NONE;
238 }
239 
DetachStubSession(HandleSessionList * handleSession)240 void DetachStubSession(HandleSessionList *handleSession)
241 {
242     pthread_mutex_lock(&g_stubSessionMutex);
243     UtilsListDelete(&handleSession->list);
244     pthread_mutex_unlock(&g_stubSessionMutex);
245 }
246 
QueryStubSession(uint32_t handle)247 HandleSessionList *QueryStubSession(uint32_t handle)
248 {
249     HandleSessionList *node = NULL;
250     pthread_mutex_lock(&g_stubSessionMutex);
251     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_stubSessionList.list, HandleSessionList, list)
252     {
253         if (node->handle == handle) {
254             pthread_mutex_unlock(&g_stubSessionMutex);
255             return node;
256         }
257     }
258     pthread_mutex_unlock(&g_stubSessionMutex);
259     return NULL;
260 }
261 
AttachProxySession(HandleSessionList * handleSession)262 int32_t AttachProxySession(HandleSessionList *handleSession)
263 {
264     pthread_mutex_lock(&g_proxySessionMutex);
265     UtilsListAdd(&g_proxySessionList.list, &handleSession->list);
266     pthread_mutex_unlock(&g_proxySessionMutex);
267     return ERR_NONE;
268 }
269 
DetachProxySession(HandleSessionList * handleSession)270 void DetachProxySession(HandleSessionList *handleSession)
271 {
272     pthread_mutex_lock(&g_proxySessionMutex);
273     UtilsListDelete(&handleSession->list);
274     pthread_mutex_unlock(&g_proxySessionMutex);
275 }
276 
QueryProxySession(uint32_t handle)277 HandleSessionList *QueryProxySession(uint32_t handle)
278 {
279     HandleSessionList *node = NULL;
280     pthread_mutex_lock(&g_proxySessionMutex);
281     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_proxySessionList.list, HandleSessionList, list)
282     {
283         if (node->handle == handle) {
284             pthread_mutex_unlock(&g_proxySessionMutex);
285             return node;
286         }
287     }
288     pthread_mutex_unlock(&g_proxySessionMutex);
289     return NULL;
290 }
291 
QueryProxySessionBySessionId(uint32_t sessionId)292 HandleSessionList *QueryProxySessionBySessionId(uint32_t sessionId)
293 {
294     HandleSessionList *node = NULL;
295     pthread_mutex_lock(&g_proxySessionMutex);
296     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_proxySessionList.list, HandleSessionList, list)
297     {
298         if (node->sessionId == sessionId) {
299             pthread_mutex_unlock(&g_proxySessionMutex);
300             return node;
301         }
302     }
303     pthread_mutex_unlock(&g_proxySessionMutex);
304     return NULL;
305 }
306 
ProcessGetSeqNumber()307 uint64_t ProcessGetSeqNumber()
308 {
309     pthread_mutex_lock(&g_rpcSkeleton.lock);
310 
311     ++g_rpcSkeleton.seqNumber; // can be overflow, and seqNumber do not use 0
312     if (g_rpcSkeleton.seqNumber == 0) {
313         ++g_rpcSkeleton.seqNumber;
314     }
315 
316     pthread_mutex_unlock(&g_rpcSkeleton.lock);
317     return g_rpcSkeleton.seqNumber;
318 }
319 
AttachHandleToIndex(HandleToIndexList * handleToIndex)320 int32_t AttachHandleToIndex(HandleToIndexList *handleToIndex)
321 {
322     pthread_mutex_lock(&g_handleToIndexMutex);
323     UtilsListAdd(&g_handleToIndexList.list, &handleToIndex->list);
324     pthread_mutex_unlock(&g_handleToIndexMutex);
325     return ERR_NONE;
326 }
327 
DetachHandleToIndex(HandleToIndexList * handleToIndex)328 void DetachHandleToIndex(HandleToIndexList *handleToIndex)
329 {
330     pthread_mutex_lock(&g_handleToIndexMutex);
331     UtilsListDelete(&handleToIndex->list);
332     pthread_mutex_unlock(&g_handleToIndexMutex);
333 }
334 
QueryHandleToIndex(uint32_t handle)335 HandleToIndexList *QueryHandleToIndex(uint32_t handle)
336 {
337     HandleToIndexList *node = NULL;
338     pthread_mutex_lock(&g_handleToIndexMutex);
339     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_handleToIndexList.list, HandleToIndexList, list)
340     {
341         if (node->handle == handle) {
342             pthread_mutex_unlock(&g_handleToIndexMutex);
343             return node;
344         }
345     }
346     pthread_mutex_unlock(&g_handleToIndexMutex);
347     return NULL;
348 }
349 
AddThreadBySeqNumber(ThreadMessageInfo * messageInfo)350 static int32_t AddThreadBySeqNumber(ThreadMessageInfo *messageInfo)
351 {
352     pthread_mutex_lock(&g_seqNumberToThreadMutex);
353     UtilsListAdd(&g_seqNumberToThread.list, &messageInfo->list);
354     pthread_mutex_unlock(&g_seqNumberToThreadMutex);
355     return ERR_NONE;
356 }
357 
AddSendThreadInWait(uint64_t seqNumber,ThreadMessageInfo * messageInfo,uint32_t userWaitTime)358 int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, uint32_t userWaitTime)
359 {
360     if (AddThreadBySeqNumber(messageInfo) != ERR_NONE) {
361         RPC_LOG_ERROR("add seqNumber = %llu failed", seqNumber);
362         return ERR_FAILED;
363     }
364 
365     SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(messageInfo->threadId);
366     if (threadLockInfo == NULL) {
367         threadLockInfo = (SocketThreadLockInfo *)malloc(sizeof(SocketThreadLockInfo));
368         if (threadLockInfo == NULL) {
369             RPC_LOG_ERROR("threadLockInfo malloc failed");
370             return ERR_FAILED;
371         }
372 
373         pthread_mutex_init(&threadLockInfo->mutex, NULL);
374         pthread_cond_init(&threadLockInfo->condition, NULL);
375         threadLockInfo->threadId = messageInfo->threadId;
376 
377         int32_t ret = AttachThreadLockInfo(threadLockInfo);
378         if (ret != ERR_NONE) {
379             RPC_LOG_ERROR("AttachThreadLockInfo fail");
380             free(threadLockInfo);
381             return ERR_FAILED;
382         }
383     }
384 
385     pthread_mutex_lock(&threadLockInfo->mutex);
386 
387     struct timespec waitTime;
388     struct timeval now;
389     if (gettimeofday(&now, NULL) != 0) {
390         RPC_LOG_ERROR("gettimeofday failed");
391         pthread_mutex_unlock(&threadLockInfo->mutex);
392         return ERR_FAILED;
393     }
394 
395     waitTime.tv_sec = now.tv_sec + userWaitTime;
396     waitTime.tv_nsec = now.tv_usec * USECTONSEC;
397     int ret = pthread_cond_timedwait(&threadLockInfo->condition, &threadLockInfo->mutex, &waitTime);
398     pthread_mutex_unlock(&threadLockInfo->mutex);
399     if (ret == ETIMEDOUT) {
400         RPC_LOG_ERROR("send thread wait for reply timeout");
401         return ERR_FAILED;
402     }
403 
404     return ERR_NONE;
405 }
406 
EraseThreadBySeqNumber(ThreadMessageInfo * messageInfo)407 void EraseThreadBySeqNumber(ThreadMessageInfo *messageInfo)
408 {
409     pthread_mutex_lock(&g_seqNumberToThreadMutex);
410     UtilsListDelete(&messageInfo->list);
411     pthread_mutex_unlock(&g_seqNumberToThreadMutex);
412 }
413 
QueryThreadBySeqNumber(uint64_t seqNumber)414 ThreadMessageInfo *QueryThreadBySeqNumber(uint64_t seqNumber)
415 {
416     ThreadMessageInfo *node = NULL;
417     pthread_mutex_lock(&g_seqNumberToThreadMutex);
418     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &g_seqNumberToThread.list, ThreadMessageInfo, list)
419     {
420         if (node->seqNumber == seqNumber) {
421             pthread_mutex_unlock(&g_seqNumberToThreadMutex);
422             return node;
423         }
424     }
425     pthread_mutex_unlock(&g_seqNumberToThreadMutex);
426     return NULL;
427 }
428 
WakeUpThreadBySeqNumber(uint64_t seqNumber,uint32_t handle)429 void WakeUpThreadBySeqNumber(uint64_t seqNumber, uint32_t handle)
430 {
431     ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(seqNumber);
432     if (messageInfo == NULL) {
433         RPC_LOG_ERROR("error! messageInfo is nullptr");
434         return;
435     }
436 
437     if (handle != messageInfo->sessionId) {
438         RPC_LOG_ERROR("error! handle is not equal messageInfo, handle = %u, messageFd = %u", handle,
439             messageInfo->sessionId);
440         return;
441     }
442     if (pthread_equal(messageInfo->threadId, pthread_self()) == 0) {
443         SocketThreadLockInfo *threadLockInfo = QueryThreadLockInfo(messageInfo->threadId);
444         if (threadLockInfo != NULL) {
445             /* wake up this IO thread to process socket stream
446              * Wake up the client processing thread
447              */
448             pthread_mutex_lock(&threadLockInfo->mutex);
449             pthread_cond_signal(&threadLockInfo->condition);
450             pthread_mutex_unlock(&threadLockInfo->mutex);
451         }
452     }
453 }
454 
RpcOnRemoteRequestInner(uint32_t code,IpcIo * data,IpcIo * reply,MessageOption option,IpcObjectStub * objectStub)455 int32_t RpcOnRemoteRequestInner(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option,
456     IpcObjectStub *objectStub)
457 {
458     int32_t result;
459     switch (code) {
460         case INVOKE_LISTEN_THREAD: {
461             result = InvokerListenThreadStub(code, data, reply, option, objectStub->func);
462             break;
463             }
464         case GET_UIDPID_INFO: {
465             result = GetPidAndUidInfoStub(code, data, reply, option);
466             break;
467         }
468         case GRANT_DATABUS_NAME: {
469             result = GrantDataBusNameStub(code, data, reply, option);
470             break;
471         }
472         default:
473             result = ERR_NOT_RPC;
474             break;
475     }
476     return result;
477 }
478 
UpdateProtoIfNeed(SvcIdentity * svc)479 void UpdateProtoIfNeed(SvcIdentity *svc)
480 {
481     RPC_LOG_INFO("rpc manager update proto, handle %d", svc->handle);
482     UpdateProto(svc);
483 }
484 
GetNewStubIndex(void)485 uint64_t GetNewStubIndex(void)
486 {
487     pthread_mutex_lock(&g_rpcSkeleton.lock);
488     uint64_t stubIndex = ++g_rpcSkeleton.stubIndex;
489     pthread_mutex_unlock(&g_rpcSkeleton.lock);
490     return stubIndex;
491 }
492 
RpcGetSessionIdList(void)493 SessionIdList *RpcGetSessionIdList(void)
494 {
495     return &g_sessionIdList;
496 }