1 /*
2  * Copyright (C) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dbinder_invoker.h"
17 
18 #include <inttypes.h>
19 #include <unistd.h>
20 
21 #include "securec.h"
22 #include "utils_list.h"
23 
24 #include "dbinder_types.h"
25 #include "ipc_process_skeleton.h"
26 #include "ipc_skeleton.h"
27 #include "ipc_thread_pool.h"
28 #include "rpc_errno.h"
29 #include "rpc_log.h"
30 #include "rpc_process_skeleton.h"
31 #include "rpc_session_handle.h"
32 #include "rpc_trans.h"
33 #include "rpc_trans_callback.h"
34 #include "rpc_types.h"
35 
36 #define BC_TRANSACTION 1076388608
37 #define BC_REPLY 1076388609
38 
39 static const int DBINDER_MAGICWORD = 0X4442494E;
40 static const int SOCKET_MAX_BUFF_SIZE = 1024 * 1024;
41 static RemoteInvoker *g_rpcInvoker = NULL;
42 
DeleteRpcInvoker(RemoteInvoker * remoteInvoker)43 void DeleteRpcInvoker(RemoteInvoker *remoteInvoker)
44 {
45     if (remoteInvoker == NULL) {
46         return;
47     }
48     free(remoteInvoker);
49 }
50 
GetSessionObject(uint32_t handle,uint32_t sessionId)51 static HandleSessionList *GetSessionObject(uint32_t handle, uint32_t sessionId)
52 {
53     if (handle != 0) {
54         /* transact case */
55         return QueryProxySession(handle);
56     } else {
57         /* reply case */
58         return QueryStubSession(sessionId);
59     }
60 }
61 
GetUniqueSeqNumber(int cmd)62 static uint64_t GetUniqueSeqNumber(int cmd)
63 {
64     if (cmd == BC_TRANSACTION) {
65         return ProcessGetSeqNumber();
66     } else if (cmd == BC_REPLY) {
67         ThreadContext *threadContext = GetCurrentThreadContext();
68         if (threadContext == NULL) {
69             RPC_LOG_ERROR("GetUniqueSeqNumber threadContext is null");
70             return 0;
71         }
72         return threadContext->seqNumber;
73     }
74     return 0;
75 }
76 
ToTransData(const IpcIo * data,dbinder_transaction_data * buf)77 static void ToTransData(const IpcIo *data, dbinder_transaction_data *buf)
78 {
79     buf->buffer_size = (data == NULL) ? 0 : (data->bufferCur - data->bufferBase);
80     buf->offsets = buf->buffer_size;
81     buf->offsets_size = (data == NULL) ? 0 :
82         (uint64_t)((data->offsetsCur - data->offsetsBase) * sizeof(size_t));
83 }
84 
ToIpcData(const dbinder_transaction_data * tr,IpcIo * data)85 static void ToIpcData(const dbinder_transaction_data *tr, IpcIo *data)
86 {
87     data->bufferBase = data->bufferCur = (char *)(tr->buffer);
88     data->offsetsBase = data->offsetsCur = (size_t *)(data->bufferBase + (size_t)tr->buffer_size);
89     data->bufferLeft = (size_t)tr->buffer_size;
90     data->offsetsLeft = (tr->offsets_size) / sizeof(size_t);
91     data->flag = IPC_IO_INITIALIZED;
92 }
93 
MoveIpcToTransData(IpcIo * data,dbinder_transaction_data * transData,int status)94 static int32_t MoveIpcToTransData(IpcIo *data, dbinder_transaction_data *transData, int status)
95 {
96     if (data != NULL) {
97         ToTransData(data, transData);
98         if (memcpy_s(transData->buffer, transData->buffer_size, data->bufferBase, transData->buffer_size) != EOK) {
99             RPC_LOG_ERROR("transData buffer memset failed");
100             return ERR_FAILED;
101         }
102         uint32 offsetsSize = transData->sizeOfSelf - sizeof(dbinder_transaction_data) - transData->buffer_size;
103         if (offsetsSize > 0) {
104             if (memcpy_s(transData->buffer + transData->buffer_size, offsetsSize,
105                 data->offsetsBase, offsetsSize) != EOK) {
106                 RPC_LOG_ERROR("transData buffer memset failed");
107                 return ERR_FAILED;
108             }
109         }
110     } else {
111         transData->flags |= TF_OP_STATUS_CODE;
112         transData->buffer_size = sizeof(size_t);
113         transData->offsets_size = (size_t)status;
114         transData->offsets = transData->buffer_size;
115     }
116     return ERR_NONE;
117 }
118 
ProcessNormalData(IpcIo * data,int32_t handle,int status,dbinder_transaction_data * transData)119 static int32_t ProcessNormalData(IpcIo *data, int32_t handle, int status, dbinder_transaction_data *transData)
120 {
121     if (transData == NULL) {
122         RPC_LOG_ERROR("ProcessNormalData transData is null");
123         return ERR_FAILED;
124     }
125 
126     uint32_t dataSize = (uint32_t)(data->offsetsCur - data->offsetsBase) +
127         (uint32_t)(data->bufferCur - data->bufferBase);
128     transData->buffer = (char *)malloc(dataSize);
129     if (transData->buffer == NULL) {
130         RPC_LOG_ERROR("transData buffer malloc failed");
131         return ERR_FAILED;
132     }
133     transData->sizeOfSelf = sizeof(dbinder_transaction_data) + dataSize;
134 
135     if (handle == 0) {
136         transData->cookie = 0;
137     } else {
138         HandleToIndexList *handleToIndex = QueryHandleToIndex(handle);
139         if (handleToIndex == NULL) {
140             RPC_LOG_ERROR("stubIndex not found for handle %d", handle);
141             return ERR_FAILED;
142         }
143         transData->cookie = handleToIndex->index;
144     }
145 
146     if (MoveIpcToTransData(data, transData, status) != ERR_NONE) {
147         RPC_LOG_ERROR("move parcel to transData failed, handle = %d", handle);
148         return ERR_FAILED;
149     }
150 
151     return ERR_NONE;
152 }
153 
MoveTransData2Buffer(HandleSessionList * sessionObject,dbinder_transaction_data * transData)154 static int32_t MoveTransData2Buffer(HandleSessionList *sessionObject, dbinder_transaction_data *transData)
155 {
156     sessionObject->buffer = (char *)malloc((size_t)transData->sizeOfSelf);
157     if (sessionObject->buffer == NULL) {
158         RPC_LOG_ERROR("sessionObject buffer malloc failed");
159         return ERR_FAILED;
160     }
161     sessionObject->len = transData->sizeOfSelf;
162 
163     if (memcpy_s(sessionObject->buffer, sizeof(dbinder_transaction_data),
164         transData, sizeof(dbinder_transaction_data)) != EOK) {
165         RPC_LOG_ERROR("sessionObject buffer memset failed");
166         free(sessionObject->buffer);
167         return ERR_FAILED;
168     }
169 
170     if (memcpy_s(sessionObject->buffer + sizeof(dbinder_transaction_data),
171         transData->buffer_size, transData->buffer, transData->buffer_size) != EOK) {
172         RPC_LOG_ERROR("sessionObject buffer memset failed");
173         free(sessionObject->buffer);
174         return ERR_FAILED;
175     }
176 
177     return ERR_NONE;
178 }
179 
WriteTransaction(int32_t cmd,MessageOption option,int32_t handle,int32_t sessionId,uint32_t code,IpcIo * data,uint64_t * seqNumber,int status)180 static HandleSessionList *WriteTransaction(int32_t cmd, MessageOption option, int32_t handle,
181     int32_t sessionId, uint32_t code, IpcIo *data, uint64_t *seqNumber, int status)
182 {
183     HandleSessionList *sessionObject = GetSessionObject(handle, sessionId);
184     if (sessionObject == NULL) {
185         RPC_LOG_ERROR("session is not exist for sessionId = %d, handle = %d", sessionId, handle);
186         return NULL;
187     }
188 
189     uint64_t seqNum = GetUniqueSeqNumber(cmd);
190     if (seqNum == 0) {
191         RPC_LOG_ERROR("seqNum invalid");
192         if (sessionObject->buffer != NULL) {
193             free(sessionObject->buffer);
194         }
195         return NULL;
196     }
197     *seqNumber = seqNum;
198 
199     dbinder_transaction_data transData = {
200         .magic = DBINDER_MAGICWORD,
201         .version = VERSION_NUM,
202         .cmd = cmd,
203         .code = code,
204         .flags = option.flags,
205         .seqNumber = *seqNumber,
206         .buffer = NULL
207     };
208 
209     if (ProcessNormalData(data, handle, status, &transData) != ERR_NONE) {
210         RPC_LOG_ERROR("ProcessNormalData failed");
211         if (transData.buffer != NULL) {
212             free(transData.buffer);
213             return NULL;
214         }
215     }
216 
217     if (MoveTransData2Buffer(sessionObject, &transData) != ERR_NONE) {
218         RPC_LOG_ERROR("move transaction data to buffer failed");
219         free(transData.buffer);
220         return NULL;
221     }
222 
223     free(transData.buffer);
224     return sessionObject;
225 }
226 
OnSendMessage(HandleSessionList * sessionOfPeer)227 static int32_t OnSendMessage(HandleSessionList *sessionOfPeer)
228 {
229     if (sessionOfPeer == NULL || sessionOfPeer->buffer == NULL) {
230         RPC_LOG_ERROR("sessionOfPeer or buffer is null");
231         return ERR_FAILED;
232     }
233     RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
234     if (rpcSkeleton == NULL) {
235         RPC_LOG_ERROR("RpcSkeleton is null");
236         return ERR_FAILED;
237     }
238 
239     int32_t ret = rpcSkeleton->rpcTrans->Send((int)sessionOfPeer->sessionId,
240         (void *)sessionOfPeer->buffer, (uint32_t)sessionOfPeer->len);
241 
242     free(sessionOfPeer->buffer);
243     return ret;
244 }
245 
MakeThreadMessageInfo(uint64_t seqNumber,uint32_t handle)246 static ThreadMessageInfo *MakeThreadMessageInfo(uint64_t seqNumber, uint32_t handle)
247 {
248     ThreadMessageInfo *messageInfo = (ThreadMessageInfo *)malloc(sizeof(ThreadMessageInfo));
249     if (messageInfo == NULL) {
250         RPC_LOG_ERROR("messageInfo malloc failed");
251         return NULL;
252     }
253 
254     messageInfo->threadId = pthread_self();
255     messageInfo->seqNumber = seqNumber;
256     messageInfo->buffer = NULL;
257     messageInfo->offsets = 0;
258     messageInfo->sessionId = handle;
259     return messageInfo;
260 }
261 
HandleReply(uint64_t seqNumber,IpcIo * reply,uintptr_t * buffer)262 static int32_t HandleReply(uint64_t seqNumber, IpcIo *reply, uintptr_t *buffer)
263 {
264     if (reply == NULL) {
265         RPC_LOG_ERROR("no need reply, free the buffer");
266         return ERR_FAILED;
267     }
268 
269     ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(seqNumber);
270     if (messageInfo == NULL) {
271         RPC_LOG_ERROR("receive buffer is nullptr");
272         return ERR_NONE;
273     }
274 
275     if (messageInfo->flags & TF_OP_STATUS_CODE) {
276         int32_t err = messageInfo->offsetsSize;
277         return err;
278     }
279 
280     dbinder_transaction_data transData = {
281         .buffer_size = messageInfo->bufferSize,
282         .offsets_size = messageInfo->offsetsSize,
283         .offsets = messageInfo->offsets,
284         .buffer = messageInfo->buffer
285     };
286     ToIpcData(&transData, reply);
287     *buffer = (uintptr_t)messageInfo->buffer;
288 
289     return ERR_NONE;
290 }
291 
WaitForReply(uint64_t seqNumber,IpcIo * reply,uint32_t handle,uint32_t userWaitTime,uintptr_t * buffer)292 static int32_t WaitForReply(uint64_t seqNumber, IpcIo *reply, uint32_t handle, uint32_t userWaitTime, uintptr_t *buffer)
293 {
294     if (reply == NULL || userWaitTime == 0) {
295         return ERR_NONE;
296     }
297 
298     ThreadMessageInfo *messageInfo = MakeThreadMessageInfo(seqNumber, handle);
299     if (messageInfo == NULL) {
300         RPC_LOG_ERROR("make thread message info failed, no memory");
301         return ERR_FAILED;
302     }
303     if (AddSendThreadInWait(seqNumber, messageInfo, userWaitTime) != ERR_NONE) {
304         RPC_LOG_ERROR("sender thread wait reply message time out");
305         EraseThreadBySeqNumber(messageInfo);
306         free(messageInfo);
307         return ERR_FAILED;
308     }
309     int32_t result = HandleReply(seqNumber, reply, buffer);
310     EraseThreadBySeqNumber(messageInfo);
311     free(messageInfo);
312     return result;
313 }
314 
SendOrWaitForCompletion(uint32_t userWaitTime,uint64_t seqNumber,HandleSessionList * sessionOfPeer,IpcIo * reply,uintptr_t * buffer)315 static int32_t SendOrWaitForCompletion(uint32_t userWaitTime, uint64_t seqNumber,
316     HandleSessionList *sessionOfPeer, IpcIo *reply, uintptr_t *buffer)
317 {
318     if (seqNumber == 0) {
319         RPC_LOG_ERROR("seqNumber can not be zero");
320         return ERR_FAILED;
321     }
322     if (sessionOfPeer == NULL) {
323         RPC_LOG_ERROR("current session is invalid");
324         return ERR_FAILED;
325     }
326     int32_t result = OnSendMessage(sessionOfPeer);
327     if (result != ERR_NONE) {
328         RPC_LOG_ERROR("fail to send to remote session with error = %d", result);
329         // no return, for msg send failed maybe not mine
330     }
331     return WaitForReply(seqNumber, reply, sessionOfPeer->handle, userWaitTime, buffer);
332 }
333 
GetCallerSessionId(void)334 static int32_t GetCallerSessionId(void)
335 {
336     ThreadContext *threadContext = GetCurrentThreadContext();
337     return threadContext->sessionId;
338 }
339 
SendReply(IpcIo * reply,uint32_t flags,int32_t result)340 static int32_t SendReply(IpcIo *reply, uint32_t flags, int32_t result)
341 {
342     uint64_t seqNumber = 0;
343     MessageOption option = {
344         .flags = flags
345     };
346     HandleSessionList *sessionObject = WriteTransaction(BC_REPLY, option, 0, GetCallerSessionId(),
347         0, reply, &seqNumber, result);
348 
349     if (seqNumber == 0) {
350         RPC_LOG_ERROR("seqNumber can not be zero");
351         return ERR_FAILED;
352     }
353     SendOrWaitForCompletion(0, seqNumber, sessionObject, reply, NULL);
354     return ERR_NONE;
355 }
356 
ProcessTransaction(const dbinder_transaction_data * tr,uint32_t sessionId)357 static void ProcessTransaction(const dbinder_transaction_data *tr, uint32_t sessionId)
358 {
359     if (tr == NULL || tr->cookie == 0) {
360         return;
361     }
362 
363     IpcIo data;
364     IpcIo reply;
365     uint8_t replyAlloc[RPC_IPC_LENGTH];
366     IpcIoInit(&reply, replyAlloc, RPC_IPC_LENGTH, 0);
367     MessageOption option = {
368         .flags =  tr->flags
369     };
370     uint64_t senderSeqNumber = tr->seqNumber;
371 
372     ToIpcData(tr, &data);
373 
374     ThreadContext *threadContext = GetCurrentThreadContext();
375     const pid_t oldPid = threadContext->callerPid;
376     const pid_t oldUid = threadContext->callerUid;
377     char oldDeviceId[DEVICEID_LENGTH];
378     if (memcpy_s(oldDeviceId, DEVICEID_LENGTH, threadContext->callerDeviceID, DEVICEID_LENGTH) != EOK) {
379         RPC_LOG_ERROR("oldDeviceId memcpy failed");
380         return;
381     }
382 
383     StubObject *stubObject = QueryStubByIndex(tr->cookie);
384     if (stubObject == NULL) {
385         RPC_LOG_ERROR("stubIndex is invalid");
386         return;
387     }
388 
389     int32_t result = stubObject->func(tr->code, &data, &reply, option);
390     if (result != ERR_NONE) {
391         RPC_LOG_ERROR("stub is invalid, has not OnReceive or Request");
392     }
393     if (!(option.flags & TF_OP_ASYNC)) {
394         threadContext->sessionId = sessionId;
395         threadContext->seqNumber = senderSeqNumber;
396         SendReply(&reply, 0, result);
397         threadContext->sessionId = 0;
398         threadContext->seqNumber = 0;
399     }
400 
401     threadContext->callerPid = oldPid;
402     threadContext->callerUid = oldUid;
403     if (memcpy_s(threadContext->callerDeviceID, DEVICEID_LENGTH, oldDeviceId, DEVICEID_LENGTH) != EOK) {
404         RPC_LOG_ERROR("threadContext callerDeviceID memcpy failed");
405     }
406 }
407 
ProcessReply(const dbinder_transaction_data * tr,uint32_t sessionId)408 static void ProcessReply(const dbinder_transaction_data *tr, uint32_t sessionId)
409 {
410     ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(tr->seqNumber);
411     if (messageInfo == NULL) {
412         RPC_LOG_ERROR("no thread waiting reply message of this seqNumber");
413         /* messageInfo is null, no thread need to wakeup */
414         return;
415     }
416 
417     size_t bufferSize = tr->sizeOfSelf - sizeof(dbinder_transaction_data);
418     messageInfo->buffer = (void *)malloc(bufferSize);
419     if (messageInfo->buffer == NULL) {
420         RPC_LOG_ERROR("some thread is waiting for reply message, but no memory");
421         /* wake up sender thread */
422         WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
423         return;
424     }
425 
426     if (memcpy_s(messageInfo->buffer, bufferSize, tr->buffer, bufferSize) != EOK) {
427         RPC_LOG_ERROR("messageInfo buffer memset failed");
428         free(messageInfo->buffer);
429         WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
430         return;
431     }
432 
433     messageInfo->flags = tr->flags;
434     messageInfo->bufferSize = tr->buffer_size;
435     messageInfo->offsetsSize = tr->offsets_size;
436     messageInfo->offsets = tr->offsets;
437     messageInfo->sessionId = sessionId;
438 
439     /* wake up sender thread */
440     WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
441 }
442 
OnTransaction(ThreadProcessInfo * processInfo)443 static void OnTransaction(ThreadProcessInfo *processInfo)
444 {
445     if (processInfo == NULL) {
446         return;
447     }
448     dbinder_transaction_data *tr = (dbinder_transaction_data *)processInfo->buffer;
449     tr->buffer = (char *)(processInfo->buffer + sizeof(dbinder_transaction_data));
450 
451     if (tr->cmd == BC_TRANSACTION) {
452         ProcessTransaction(tr, processInfo->sessionId);
453     } else if (tr->cmd == BC_REPLY) {
454         ProcessReply(tr, processInfo->sessionId);
455     }
456 }
457 
MakeThreadProcessInfo(uint32_t handle,const char * inBuffer,uint32_t size)458 static ThreadProcessInfo *MakeThreadProcessInfo(uint32_t handle, const char *inBuffer, uint32_t size)
459 {
460     if (inBuffer == NULL || size < sizeof(dbinder_transaction_data)) {
461         RPC_LOG_ERROR("buffer is null or size invalid");
462         return NULL;
463     }
464 
465     ThreadProcessInfo *processInfo = (ThreadProcessInfo *)malloc(sizeof(ThreadProcessInfo));
466     if (processInfo == NULL) {
467         return NULL;
468     }
469     processInfo->buffer = (char *)malloc(size);
470     if (processInfo->buffer == NULL) {
471         free(processInfo);
472         return NULL;
473     }
474     if (memcpy_s(processInfo->buffer, size, inBuffer, size) != EOK) {
475         free(processInfo->buffer);
476         free(processInfo);
477         return NULL;
478     }
479     processInfo->sessionId = handle;
480     processInfo->packageSize = size;
481 
482     return processInfo;
483 }
484 
CreateProcessThread(void)485 static int32_t CreateProcessThread(void)
486 {
487     IpcSkeleton *current = GetCurrentSkeleton();
488     if (current == NULL) {
489         RPC_LOG_ERROR("current ipcskeleton is nullptr");
490         return ERR_FAILED;
491     }
492     if (current->threadPool->idleSocketThreadNum > 0) {
493         SpawnThread(SPAWN_PASSIVE, IF_PROT_DATABUS);
494         RPC_LOG_INFO("create Process thread success");
495         return ERR_NONE;
496     }
497     return ERR_FAILED;
498 }
499 
StartProcessLoop(uint32_t handle,const void * buffer,uint32_t size)500 static void StartProcessLoop(uint32_t handle, const void *buffer, uint32_t size)
501 {
502     ThreadProcessInfo *processInfo = MakeThreadProcessInfo(handle, buffer, size);
503     if (processInfo == NULL) {
504         RPC_LOG_ERROR("MakeThreadProcessInfo failed");
505         return;
506     }
507 
508     IdleDataThread *idleDataThread = GetIdleDataThread();
509     if (idleDataThread == NULL) {
510         if (CreateProcessThread() != ERR_NONE) {
511             RPC_LOG_ERROR("create IO thread failed");
512         }
513         do {
514             /*  no IO thread in idle state, wait a monent */
515             usleep(GET_IDLE_THREAD_WAIT_TIME);
516             idleDataThread = GetIdleDataThread();
517         } while (idleDataThread == NULL);
518     }
519     pthread_t threadId = idleDataThread->threadId;
520     processInfo->threadId = threadId;
521     AddDataInfoToThread(processInfo);
522     WakeUpDataThread(threadId);
523 }
524 
OnReceiveNewConnection(int sessionId)525 int32_t OnReceiveNewConnection(int sessionId)
526 {
527     uint32_t handle = sessionId;
528     IpcSkeleton *current = GetCurrentSkeleton();
529     if (current == NULL) {
530         RPC_LOG_ERROR("current ipcskeleton is nullptr");
531         return ERR_FAILED;
532     }
533 
534     HandleSessionList *stubSession = (HandleSessionList *)malloc(sizeof(HandleSessionList));
535     if (stubSession == NULL) {
536         RPC_LOG_ERROR("stubSession malloc failed");
537         return ERR_FAILED;
538     }
539     stubSession->handle = handle;
540     stubSession->sessionId = sessionId;
541     if (AttachStubSession(stubSession) != ERR_NONE) {
542         RPC_LOG_ERROR("AttachStubSession failed");
543         free(stubSession);
544         return ERR_FAILED;
545     }
546     return HandleNewConnection(RpcGetSessionIdList(), sessionId);
547 }
548 
OnDatabusSessionClosed(int sessionId)549 void OnDatabusSessionClosed(int sessionId)
550 {
551     if (sessionId < 0) {
552         return;
553     }
554 
555     uint32_t handle = sessionId;
556     HandleSessionList *handleSession = QueryStubSession(handle);
557     if (handleSession != NULL) {
558         DetachStubSession(handleSession);
559         free(handleSession);
560         RPC_LOG_INFO("OnDatabusSessionClosed called on rpc stub");
561         return;
562     }
563 
564     handleSession = QueryProxySessionBySessionId(sessionId);
565     if (handleSession == NULL) {
566         RPC_LOG_INFO("OnDatabusSessionClosed query session is null");
567         return;
568     }
569     DetachProxySession(handleSession);
570 
571     HandleToIndexList *handeleIndex = QueryHandleToIndex(handleSession->handle);
572     if (handeleIndex == NULL) {
573         RPC_LOG_INFO("OnDatabusSessionClosed query stub index is null");
574         return;
575     }
576     DetachHandleToIndex(handeleIndex);
577 
578     IpcSkeleton *ipcSkeleton  = GetCurrentSkeleton();
579     if (ipcSkeleton == NULL) {
580         RPC_LOG_ERROR("GetCurrentSkeleton return null");
581         return;
582     }
583 
584     DeathCallback *node = NULL;
585     UTILS_DL_LIST_FOR_EACH_ENTRY(node, &ipcSkeleton->objects, DeathCallback, list)
586     {
587         if (node->handle == handleSession->handle) {
588             RPC_LOG_INFO("OnDatabusSessionClosed SendObituary handle %d", node->handle);
589             SendObituary(node);
590             DeleteDeathCallback(node);
591             break;
592         }
593     }
594 }
595 
HasCompletePackage(const char * data,uint32_t readCursor,uint32_t len)596 static uint32_t HasCompletePackage(const char *data, uint32_t readCursor, uint32_t len)
597 {
598     const dbinder_transaction_data *tr = (const dbinder_transaction_data *)(data + readCursor);
599     if ((tr->magic == DBINDER_MAGICWORD) &&
600         (tr->sizeOfSelf <= SOCKET_MAX_BUFF_SIZE + sizeof(dbinder_transaction_data)) &&
601         (readCursor + tr->sizeOfSelf <= len)) {
602         return (uint32_t)tr->sizeOfSelf;
603     }
604     return 0;
605 }
606 
OnMessageAvailable(int sessionId,const void * data,uint32_t len)607 void OnMessageAvailable(int sessionId, const void *data, uint32_t len)
608 {
609     if (sessionId < 0 || data == NULL || len < sizeof(dbinder_transaction_data)) {
610         RPC_LOG_ERROR("session has wrong inputs");
611         return;
612     }
613 
614     uint32_t handle = sessionId;
615     uint32_t readSize = 0;
616     while (readSize + sizeof(dbinder_transaction_data) < len) {
617         uint32_t packageSize = HasCompletePackage(data, readSize, len);
618         if (packageSize > 0) {
619             StartProcessLoop(handle, data, packageSize);
620             readSize += packageSize;
621         } else {
622             // If the current is abnormal, the subsequent is no longer processed.
623             break;
624         }
625     }
626 }
627 
UpdateClientSession(int32_t handle,HandleSessionList * sessionObject,const char * serviceName,const char * deviceId)628 void UpdateClientSession(int32_t handle, HandleSessionList *sessionObject,
629     const char *serviceName, const char *deviceId)
630 {
631     if (handle < 0 || sessionObject == NULL || serviceName == NULL || deviceId == NULL) {
632         RPC_LOG_ERROR("UpdateClientSession params invalid");
633         return;
634     }
635 
636     RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
637     if (rpcSkeleton == NULL) {
638         return;
639     }
640     int sessionId = rpcSkeleton->rpcTrans->Connect(serviceName, deviceId, NULL);
641     if (sessionId < 0) {
642         RPC_LOG_ERROR("UpdateClientSession connect failed");
643         return;
644     }
645     if (WaitForSessionIdReady(RpcGetSessionIdList(), sessionId) != ERR_NONE) {
646         RPC_LOG_ERROR("SendDataToRemote connect failed, sessionId=%d", sessionId);
647         return;
648     }
649 
650     sessionObject->handle = handle;
651     sessionObject->sessionId = sessionId;
652     if (AttachProxySession(sessionObject) != ERR_NONE) {
653         RPC_LOG_ERROR("UpdateClientSession AttachProxySession failed");
654     }
655 }
656 
CreateTransServer(const char * sessionName)657 int32_t CreateTransServer(const char *sessionName)
658 {
659     if (sessionName == NULL) {
660         return ERR_FAILED;
661     }
662     RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
663     if (rpcSkeleton == NULL) {
664         return ERR_FAILED;
665     }
666 
667     if (rpcSkeleton->isServerCreated == 0) {
668         return ERR_NONE;
669     }
670 
671     pthread_mutex_lock(&rpcSkeleton->lock);
672     if (rpcSkeleton->isServerCreated == -1) {
673         if (rpcSkeleton->rpcTrans->StartListen(sessionName, GetRpcTransCallback()) != ERR_NONE) {
674             RPC_LOG_ERROR("CreateTransServer failed");
675             pthread_mutex_unlock(&rpcSkeleton->lock);
676             return ERR_FAILED;
677         }
678         rpcSkeleton->isServerCreated = 0;
679         pthread_mutex_unlock(&rpcSkeleton->lock);
680         return SpawnThread(SPAWN_ACTIVE, IF_PROT_DATABUS);
681     }
682     pthread_mutex_unlock(&rpcSkeleton->lock);
683 
684     return ERR_NONE;
685 }
686 
RpcAcquireHandle(int32_t handle)687 static int32_t RpcAcquireHandle(int32_t handle)
688 {
689     (void)handle;
690     return ERR_NONE;
691 }
692 
RpcReleaseHandle(int32_t handle)693 static int32_t RpcReleaseHandle(int32_t handle)
694 {
695     (void)handle;
696     return ERR_NONE;
697 }
698 
RpcInvokerSendRequest(SvcIdentity target,uint32_t code,IpcIo * data,IpcIo * reply,MessageOption option,uintptr_t * buffer)699 static int32_t RpcInvokerSendRequest(SvcIdentity target, uint32_t code, IpcIo *data, IpcIo *reply,
700     MessageOption option, uintptr_t *buffer)
701 {
702     RPC_LOG_INFO("RPCInvokerSendRequest called");
703     int32_t result;
704     uint64_t seqNumber = 0;
705 
706     uint32_t userWaitTime = option.waitTime;
707     if (userWaitTime > RPC_MAX_SEND_WAIT_TIME) {
708         userWaitTime = RPC_MAX_SEND_WAIT_TIME;
709     }
710 
711     HandleSessionList *sessinoObject = WriteTransaction(BC_TRANSACTION, option, target.handle,
712         0, code, data, &seqNumber, 0);
713     if (sessinoObject == NULL) {
714         return ERR_FAILED;
715     }
716 
717     if (option.flags & TF_OP_ASYNC) {
718         result = SendOrWaitForCompletion(userWaitTime, seqNumber, sessinoObject, NULL, buffer);
719     } else {
720         result = SendOrWaitForCompletion(userWaitTime, seqNumber, sessinoObject, reply, buffer);
721     }
722 
723     return result;
724 }
725 
RpcFreeBuffer(void * ptr)726 static int32_t RpcFreeBuffer(void *ptr)
727 {
728     if (ptr != NULL) {
729         free(ptr);
730     }
731     return ERR_NONE;
732 }
733 
RpcSetMaxWorkThread(int32_t maxThreadNum)734 static int32_t RpcSetMaxWorkThread(int32_t maxThreadNum)
735 {
736     (void)maxThreadNum;
737     return ERR_NONE;
738 }
739 
RpcJoinThread(bool initiative)740 static void RpcJoinThread(bool initiative)
741 {
742     pthread_t threadId = pthread_self();
743 
744     ThreadContext *threadContext = GetCurrentThreadContext();
745     if (threadContext == NULL) {
746         return;
747     }
748     threadContext->stopWorkThread = false;
749 
750     while (threadContext->stopWorkThread == false) {
751         AddDataThreadInWait(threadId);
752         ThreadProcessInfo *processInfo = PopDataInfoFromThread(threadId);
753         if (processInfo != NULL) {
754             OnTransaction(processInfo);
755             free(processInfo->buffer);
756             free(processInfo);
757         }
758     }
759 }
760 
RpcStopWorkThread(void)761 void RpcStopWorkThread(void)
762 {
763     IpcSkeleton *current = GetCurrentSkeleton();
764     if (current == NULL) {
765         return;
766     }
767 
768     ThreadContext *threadContext = GetCurrentThreadContext();
769     if (threadContext == NULL) {
770         return;
771     }
772     threadContext->stopWorkThread = true;
773 }
774 
RpcSetRegistryObject(SvcIdentity target,SvcIdentity * samgr)775 static int32_t RpcSetRegistryObject(SvcIdentity target, SvcIdentity *samgr)
776 {
777     (void)target;
778     (void)samgr;
779     return ERR_NONE;
780 }
781 
RpcAddDeathRecipient(int32_t handle,void * cookie)782 static int32_t RpcAddDeathRecipient(int32_t handle, void *cookie)
783 {
784     (void)handle;
785     (void)cookie;
786     return ERR_NONE;
787 }
788 
RpcRemoveDeathRecipient(int32_t handle,void * cookie)789 static int32_t RpcRemoveDeathRecipient(int32_t handle, void *cookie)
790 {
791     (void)handle;
792     (void)cookie;
793     return ERR_NONE;
794 }
795 
GetRpcInvoker(void)796 RemoteInvoker *GetRpcInvoker(void)
797 {
798     if (g_rpcInvoker == NULL) {
799         g_rpcInvoker = (RemoteInvoker *)malloc(sizeof(RemoteInvoker));
800         if (g_rpcInvoker != NULL) {
801             g_rpcInvoker->AcquireHandle = RpcAcquireHandle;
802             g_rpcInvoker->ReleaseHandle = RpcReleaseHandle;
803             g_rpcInvoker->SendRequest = RpcInvokerSendRequest;
804             g_rpcInvoker->FreeBuffer = RpcFreeBuffer;
805             g_rpcInvoker->SetMaxWorkThread = RpcSetMaxWorkThread;
806             g_rpcInvoker->JoinThread = RpcJoinThread;
807             g_rpcInvoker->ExitCurrentThread = RpcStopWorkThread;
808             g_rpcInvoker->SetRegistryObject = RpcSetRegistryObject;
809             g_rpcInvoker->AddDeathRecipient = RpcAddDeathRecipient;
810             g_rpcInvoker->RemoveDeathRecipient = RpcRemoveDeathRecipient;
811         }
812     }
813 
814     return g_rpcInvoker;
815 }