1 /*
2 * Copyright (C) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dbinder_invoker.h"
17
18 #include <inttypes.h>
19 #include <unistd.h>
20
21 #include "securec.h"
22 #include "utils_list.h"
23
24 #include "dbinder_types.h"
25 #include "ipc_process_skeleton.h"
26 #include "ipc_skeleton.h"
27 #include "ipc_thread_pool.h"
28 #include "rpc_errno.h"
29 #include "rpc_log.h"
30 #include "rpc_process_skeleton.h"
31 #include "rpc_session_handle.h"
32 #include "rpc_trans.h"
33 #include "rpc_trans_callback.h"
34 #include "rpc_types.h"
35
36 #define BC_TRANSACTION 1076388608
37 #define BC_REPLY 1076388609
38
39 static const int DBINDER_MAGICWORD = 0X4442494E;
40 static const int SOCKET_MAX_BUFF_SIZE = 1024 * 1024;
41 static RemoteInvoker *g_rpcInvoker = NULL;
42
DeleteRpcInvoker(RemoteInvoker * remoteInvoker)43 void DeleteRpcInvoker(RemoteInvoker *remoteInvoker)
44 {
45 if (remoteInvoker == NULL) {
46 return;
47 }
48 free(remoteInvoker);
49 }
50
GetSessionObject(uint32_t handle,uint32_t sessionId)51 static HandleSessionList *GetSessionObject(uint32_t handle, uint32_t sessionId)
52 {
53 if (handle != 0) {
54 /* transact case */
55 return QueryProxySession(handle);
56 } else {
57 /* reply case */
58 return QueryStubSession(sessionId);
59 }
60 }
61
GetUniqueSeqNumber(int cmd)62 static uint64_t GetUniqueSeqNumber(int cmd)
63 {
64 if (cmd == BC_TRANSACTION) {
65 return ProcessGetSeqNumber();
66 } else if (cmd == BC_REPLY) {
67 ThreadContext *threadContext = GetCurrentThreadContext();
68 if (threadContext == NULL) {
69 RPC_LOG_ERROR("GetUniqueSeqNumber threadContext is null");
70 return 0;
71 }
72 return threadContext->seqNumber;
73 }
74 return 0;
75 }
76
ToTransData(const IpcIo * data,dbinder_transaction_data * buf)77 static void ToTransData(const IpcIo *data, dbinder_transaction_data *buf)
78 {
79 buf->buffer_size = (data == NULL) ? 0 : (data->bufferCur - data->bufferBase);
80 buf->offsets = buf->buffer_size;
81 buf->offsets_size = (data == NULL) ? 0 :
82 (uint64_t)((data->offsetsCur - data->offsetsBase) * sizeof(size_t));
83 }
84
ToIpcData(const dbinder_transaction_data * tr,IpcIo * data)85 static void ToIpcData(const dbinder_transaction_data *tr, IpcIo *data)
86 {
87 data->bufferBase = data->bufferCur = (char *)(tr->buffer);
88 data->offsetsBase = data->offsetsCur = (size_t *)(data->bufferBase + (size_t)tr->buffer_size);
89 data->bufferLeft = (size_t)tr->buffer_size;
90 data->offsetsLeft = (tr->offsets_size) / sizeof(size_t);
91 data->flag = IPC_IO_INITIALIZED;
92 }
93
MoveIpcToTransData(IpcIo * data,dbinder_transaction_data * transData,int status)94 static int32_t MoveIpcToTransData(IpcIo *data, dbinder_transaction_data *transData, int status)
95 {
96 if (data != NULL) {
97 ToTransData(data, transData);
98 if (memcpy_s(transData->buffer, transData->buffer_size, data->bufferBase, transData->buffer_size) != EOK) {
99 RPC_LOG_ERROR("transData buffer memset failed");
100 return ERR_FAILED;
101 }
102 uint32 offsetsSize = transData->sizeOfSelf - sizeof(dbinder_transaction_data) - transData->buffer_size;
103 if (offsetsSize > 0) {
104 if (memcpy_s(transData->buffer + transData->buffer_size, offsetsSize,
105 data->offsetsBase, offsetsSize) != EOK) {
106 RPC_LOG_ERROR("transData buffer memset failed");
107 return ERR_FAILED;
108 }
109 }
110 } else {
111 transData->flags |= TF_OP_STATUS_CODE;
112 transData->buffer_size = sizeof(size_t);
113 transData->offsets_size = (size_t)status;
114 transData->offsets = transData->buffer_size;
115 }
116 return ERR_NONE;
117 }
118
ProcessNormalData(IpcIo * data,int32_t handle,int status,dbinder_transaction_data * transData)119 static int32_t ProcessNormalData(IpcIo *data, int32_t handle, int status, dbinder_transaction_data *transData)
120 {
121 if (transData == NULL) {
122 RPC_LOG_ERROR("ProcessNormalData transData is null");
123 return ERR_FAILED;
124 }
125
126 uint32_t dataSize = (uint32_t)(data->offsetsCur - data->offsetsBase) +
127 (uint32_t)(data->bufferCur - data->bufferBase);
128 transData->buffer = (char *)malloc(dataSize);
129 if (transData->buffer == NULL) {
130 RPC_LOG_ERROR("transData buffer malloc failed");
131 return ERR_FAILED;
132 }
133 transData->sizeOfSelf = sizeof(dbinder_transaction_data) + dataSize;
134
135 if (handle == 0) {
136 transData->cookie = 0;
137 } else {
138 HandleToIndexList *handleToIndex = QueryHandleToIndex(handle);
139 if (handleToIndex == NULL) {
140 RPC_LOG_ERROR("stubIndex not found for handle %d", handle);
141 return ERR_FAILED;
142 }
143 transData->cookie = handleToIndex->index;
144 }
145
146 if (MoveIpcToTransData(data, transData, status) != ERR_NONE) {
147 RPC_LOG_ERROR("move parcel to transData failed, handle = %d", handle);
148 return ERR_FAILED;
149 }
150
151 return ERR_NONE;
152 }
153
MoveTransData2Buffer(HandleSessionList * sessionObject,dbinder_transaction_data * transData)154 static int32_t MoveTransData2Buffer(HandleSessionList *sessionObject, dbinder_transaction_data *transData)
155 {
156 sessionObject->buffer = (char *)malloc((size_t)transData->sizeOfSelf);
157 if (sessionObject->buffer == NULL) {
158 RPC_LOG_ERROR("sessionObject buffer malloc failed");
159 return ERR_FAILED;
160 }
161 sessionObject->len = transData->sizeOfSelf;
162
163 if (memcpy_s(sessionObject->buffer, sizeof(dbinder_transaction_data),
164 transData, sizeof(dbinder_transaction_data)) != EOK) {
165 RPC_LOG_ERROR("sessionObject buffer memset failed");
166 free(sessionObject->buffer);
167 return ERR_FAILED;
168 }
169
170 if (memcpy_s(sessionObject->buffer + sizeof(dbinder_transaction_data),
171 transData->buffer_size, transData->buffer, transData->buffer_size) != EOK) {
172 RPC_LOG_ERROR("sessionObject buffer memset failed");
173 free(sessionObject->buffer);
174 return ERR_FAILED;
175 }
176
177 return ERR_NONE;
178 }
179
WriteTransaction(int32_t cmd,MessageOption option,int32_t handle,int32_t sessionId,uint32_t code,IpcIo * data,uint64_t * seqNumber,int status)180 static HandleSessionList *WriteTransaction(int32_t cmd, MessageOption option, int32_t handle,
181 int32_t sessionId, uint32_t code, IpcIo *data, uint64_t *seqNumber, int status)
182 {
183 HandleSessionList *sessionObject = GetSessionObject(handle, sessionId);
184 if (sessionObject == NULL) {
185 RPC_LOG_ERROR("session is not exist for sessionId = %d, handle = %d", sessionId, handle);
186 return NULL;
187 }
188
189 uint64_t seqNum = GetUniqueSeqNumber(cmd);
190 if (seqNum == 0) {
191 RPC_LOG_ERROR("seqNum invalid");
192 if (sessionObject->buffer != NULL) {
193 free(sessionObject->buffer);
194 }
195 return NULL;
196 }
197 *seqNumber = seqNum;
198
199 dbinder_transaction_data transData = {
200 .magic = DBINDER_MAGICWORD,
201 .version = VERSION_NUM,
202 .cmd = cmd,
203 .code = code,
204 .flags = option.flags,
205 .seqNumber = *seqNumber,
206 .buffer = NULL
207 };
208
209 if (ProcessNormalData(data, handle, status, &transData) != ERR_NONE) {
210 RPC_LOG_ERROR("ProcessNormalData failed");
211 if (transData.buffer != NULL) {
212 free(transData.buffer);
213 return NULL;
214 }
215 }
216
217 if (MoveTransData2Buffer(sessionObject, &transData) != ERR_NONE) {
218 RPC_LOG_ERROR("move transaction data to buffer failed");
219 free(transData.buffer);
220 return NULL;
221 }
222
223 free(transData.buffer);
224 return sessionObject;
225 }
226
OnSendMessage(HandleSessionList * sessionOfPeer)227 static int32_t OnSendMessage(HandleSessionList *sessionOfPeer)
228 {
229 if (sessionOfPeer == NULL || sessionOfPeer->buffer == NULL) {
230 RPC_LOG_ERROR("sessionOfPeer or buffer is null");
231 return ERR_FAILED;
232 }
233 RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
234 if (rpcSkeleton == NULL) {
235 RPC_LOG_ERROR("RpcSkeleton is null");
236 return ERR_FAILED;
237 }
238
239 int32_t ret = rpcSkeleton->rpcTrans->Send((int)sessionOfPeer->sessionId,
240 (void *)sessionOfPeer->buffer, (uint32_t)sessionOfPeer->len);
241
242 free(sessionOfPeer->buffer);
243 return ret;
244 }
245
MakeThreadMessageInfo(uint64_t seqNumber,uint32_t handle)246 static ThreadMessageInfo *MakeThreadMessageInfo(uint64_t seqNumber, uint32_t handle)
247 {
248 ThreadMessageInfo *messageInfo = (ThreadMessageInfo *)malloc(sizeof(ThreadMessageInfo));
249 if (messageInfo == NULL) {
250 RPC_LOG_ERROR("messageInfo malloc failed");
251 return NULL;
252 }
253
254 messageInfo->threadId = pthread_self();
255 messageInfo->seqNumber = seqNumber;
256 messageInfo->buffer = NULL;
257 messageInfo->offsets = 0;
258 messageInfo->sessionId = handle;
259 return messageInfo;
260 }
261
HandleReply(uint64_t seqNumber,IpcIo * reply,uintptr_t * buffer)262 static int32_t HandleReply(uint64_t seqNumber, IpcIo *reply, uintptr_t *buffer)
263 {
264 if (reply == NULL) {
265 RPC_LOG_ERROR("no need reply, free the buffer");
266 return ERR_FAILED;
267 }
268
269 ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(seqNumber);
270 if (messageInfo == NULL) {
271 RPC_LOG_ERROR("receive buffer is nullptr");
272 return ERR_NONE;
273 }
274
275 if (messageInfo->flags & TF_OP_STATUS_CODE) {
276 int32_t err = messageInfo->offsetsSize;
277 return err;
278 }
279
280 dbinder_transaction_data transData = {
281 .buffer_size = messageInfo->bufferSize,
282 .offsets_size = messageInfo->offsetsSize,
283 .offsets = messageInfo->offsets,
284 .buffer = messageInfo->buffer
285 };
286 ToIpcData(&transData, reply);
287 *buffer = (uintptr_t)messageInfo->buffer;
288
289 return ERR_NONE;
290 }
291
WaitForReply(uint64_t seqNumber,IpcIo * reply,uint32_t handle,uint32_t userWaitTime,uintptr_t * buffer)292 static int32_t WaitForReply(uint64_t seqNumber, IpcIo *reply, uint32_t handle, uint32_t userWaitTime, uintptr_t *buffer)
293 {
294 if (reply == NULL || userWaitTime == 0) {
295 return ERR_NONE;
296 }
297
298 ThreadMessageInfo *messageInfo = MakeThreadMessageInfo(seqNumber, handle);
299 if (messageInfo == NULL) {
300 RPC_LOG_ERROR("make thread message info failed, no memory");
301 return ERR_FAILED;
302 }
303 if (AddSendThreadInWait(seqNumber, messageInfo, userWaitTime) != ERR_NONE) {
304 RPC_LOG_ERROR("sender thread wait reply message time out");
305 EraseThreadBySeqNumber(messageInfo);
306 free(messageInfo);
307 return ERR_FAILED;
308 }
309 int32_t result = HandleReply(seqNumber, reply, buffer);
310 EraseThreadBySeqNumber(messageInfo);
311 free(messageInfo);
312 return result;
313 }
314
SendOrWaitForCompletion(uint32_t userWaitTime,uint64_t seqNumber,HandleSessionList * sessionOfPeer,IpcIo * reply,uintptr_t * buffer)315 static int32_t SendOrWaitForCompletion(uint32_t userWaitTime, uint64_t seqNumber,
316 HandleSessionList *sessionOfPeer, IpcIo *reply, uintptr_t *buffer)
317 {
318 if (seqNumber == 0) {
319 RPC_LOG_ERROR("seqNumber can not be zero");
320 return ERR_FAILED;
321 }
322 if (sessionOfPeer == NULL) {
323 RPC_LOG_ERROR("current session is invalid");
324 return ERR_FAILED;
325 }
326 int32_t result = OnSendMessage(sessionOfPeer);
327 if (result != ERR_NONE) {
328 RPC_LOG_ERROR("fail to send to remote session with error = %d", result);
329 // no return, for msg send failed maybe not mine
330 }
331 return WaitForReply(seqNumber, reply, sessionOfPeer->handle, userWaitTime, buffer);
332 }
333
GetCallerSessionId(void)334 static int32_t GetCallerSessionId(void)
335 {
336 ThreadContext *threadContext = GetCurrentThreadContext();
337 return threadContext->sessionId;
338 }
339
SendReply(IpcIo * reply,uint32_t flags,int32_t result)340 static int32_t SendReply(IpcIo *reply, uint32_t flags, int32_t result)
341 {
342 uint64_t seqNumber = 0;
343 MessageOption option = {
344 .flags = flags
345 };
346 HandleSessionList *sessionObject = WriteTransaction(BC_REPLY, option, 0, GetCallerSessionId(),
347 0, reply, &seqNumber, result);
348
349 if (seqNumber == 0) {
350 RPC_LOG_ERROR("seqNumber can not be zero");
351 return ERR_FAILED;
352 }
353 SendOrWaitForCompletion(0, seqNumber, sessionObject, reply, NULL);
354 return ERR_NONE;
355 }
356
ProcessTransaction(const dbinder_transaction_data * tr,uint32_t sessionId)357 static void ProcessTransaction(const dbinder_transaction_data *tr, uint32_t sessionId)
358 {
359 if (tr == NULL || tr->cookie == 0) {
360 return;
361 }
362
363 IpcIo data;
364 IpcIo reply;
365 uint8_t replyAlloc[RPC_IPC_LENGTH];
366 IpcIoInit(&reply, replyAlloc, RPC_IPC_LENGTH, 0);
367 MessageOption option = {
368 .flags = tr->flags
369 };
370 uint64_t senderSeqNumber = tr->seqNumber;
371
372 ToIpcData(tr, &data);
373
374 ThreadContext *threadContext = GetCurrentThreadContext();
375 const pid_t oldPid = threadContext->callerPid;
376 const pid_t oldUid = threadContext->callerUid;
377 char oldDeviceId[DEVICEID_LENGTH];
378 if (memcpy_s(oldDeviceId, DEVICEID_LENGTH, threadContext->callerDeviceID, DEVICEID_LENGTH) != EOK) {
379 RPC_LOG_ERROR("oldDeviceId memcpy failed");
380 return;
381 }
382
383 StubObject *stubObject = QueryStubByIndex(tr->cookie);
384 if (stubObject == NULL) {
385 RPC_LOG_ERROR("stubIndex is invalid");
386 return;
387 }
388
389 int32_t result = stubObject->func(tr->code, &data, &reply, option);
390 if (result != ERR_NONE) {
391 RPC_LOG_ERROR("stub is invalid, has not OnReceive or Request");
392 }
393 if (!(option.flags & TF_OP_ASYNC)) {
394 threadContext->sessionId = sessionId;
395 threadContext->seqNumber = senderSeqNumber;
396 SendReply(&reply, 0, result);
397 threadContext->sessionId = 0;
398 threadContext->seqNumber = 0;
399 }
400
401 threadContext->callerPid = oldPid;
402 threadContext->callerUid = oldUid;
403 if (memcpy_s(threadContext->callerDeviceID, DEVICEID_LENGTH, oldDeviceId, DEVICEID_LENGTH) != EOK) {
404 RPC_LOG_ERROR("threadContext callerDeviceID memcpy failed");
405 }
406 }
407
ProcessReply(const dbinder_transaction_data * tr,uint32_t sessionId)408 static void ProcessReply(const dbinder_transaction_data *tr, uint32_t sessionId)
409 {
410 ThreadMessageInfo *messageInfo = QueryThreadBySeqNumber(tr->seqNumber);
411 if (messageInfo == NULL) {
412 RPC_LOG_ERROR("no thread waiting reply message of this seqNumber");
413 /* messageInfo is null, no thread need to wakeup */
414 return;
415 }
416
417 size_t bufferSize = tr->sizeOfSelf - sizeof(dbinder_transaction_data);
418 messageInfo->buffer = (void *)malloc(bufferSize);
419 if (messageInfo->buffer == NULL) {
420 RPC_LOG_ERROR("some thread is waiting for reply message, but no memory");
421 /* wake up sender thread */
422 WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
423 return;
424 }
425
426 if (memcpy_s(messageInfo->buffer, bufferSize, tr->buffer, bufferSize) != EOK) {
427 RPC_LOG_ERROR("messageInfo buffer memset failed");
428 free(messageInfo->buffer);
429 WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
430 return;
431 }
432
433 messageInfo->flags = tr->flags;
434 messageInfo->bufferSize = tr->buffer_size;
435 messageInfo->offsetsSize = tr->offsets_size;
436 messageInfo->offsets = tr->offsets;
437 messageInfo->sessionId = sessionId;
438
439 /* wake up sender thread */
440 WakeUpThreadBySeqNumber(tr->seqNumber, sessionId);
441 }
442
OnTransaction(ThreadProcessInfo * processInfo)443 static void OnTransaction(ThreadProcessInfo *processInfo)
444 {
445 if (processInfo == NULL) {
446 return;
447 }
448 dbinder_transaction_data *tr = (dbinder_transaction_data *)processInfo->buffer;
449 tr->buffer = (char *)(processInfo->buffer + sizeof(dbinder_transaction_data));
450
451 if (tr->cmd == BC_TRANSACTION) {
452 ProcessTransaction(tr, processInfo->sessionId);
453 } else if (tr->cmd == BC_REPLY) {
454 ProcessReply(tr, processInfo->sessionId);
455 }
456 }
457
MakeThreadProcessInfo(uint32_t handle,const char * inBuffer,uint32_t size)458 static ThreadProcessInfo *MakeThreadProcessInfo(uint32_t handle, const char *inBuffer, uint32_t size)
459 {
460 if (inBuffer == NULL || size < sizeof(dbinder_transaction_data)) {
461 RPC_LOG_ERROR("buffer is null or size invalid");
462 return NULL;
463 }
464
465 ThreadProcessInfo *processInfo = (ThreadProcessInfo *)malloc(sizeof(ThreadProcessInfo));
466 if (processInfo == NULL) {
467 return NULL;
468 }
469 processInfo->buffer = (char *)malloc(size);
470 if (processInfo->buffer == NULL) {
471 free(processInfo);
472 return NULL;
473 }
474 if (memcpy_s(processInfo->buffer, size, inBuffer, size) != EOK) {
475 free(processInfo->buffer);
476 free(processInfo);
477 return NULL;
478 }
479 processInfo->sessionId = handle;
480 processInfo->packageSize = size;
481
482 return processInfo;
483 }
484
CreateProcessThread(void)485 static int32_t CreateProcessThread(void)
486 {
487 IpcSkeleton *current = GetCurrentSkeleton();
488 if (current == NULL) {
489 RPC_LOG_ERROR("current ipcskeleton is nullptr");
490 return ERR_FAILED;
491 }
492 if (current->threadPool->idleSocketThreadNum > 0) {
493 SpawnThread(SPAWN_PASSIVE, IF_PROT_DATABUS);
494 RPC_LOG_INFO("create Process thread success");
495 return ERR_NONE;
496 }
497 return ERR_FAILED;
498 }
499
StartProcessLoop(uint32_t handle,const void * buffer,uint32_t size)500 static void StartProcessLoop(uint32_t handle, const void *buffer, uint32_t size)
501 {
502 ThreadProcessInfo *processInfo = MakeThreadProcessInfo(handle, buffer, size);
503 if (processInfo == NULL) {
504 RPC_LOG_ERROR("MakeThreadProcessInfo failed");
505 return;
506 }
507
508 IdleDataThread *idleDataThread = GetIdleDataThread();
509 if (idleDataThread == NULL) {
510 if (CreateProcessThread() != ERR_NONE) {
511 RPC_LOG_ERROR("create IO thread failed");
512 }
513 do {
514 /* no IO thread in idle state, wait a monent */
515 usleep(GET_IDLE_THREAD_WAIT_TIME);
516 idleDataThread = GetIdleDataThread();
517 } while (idleDataThread == NULL);
518 }
519 pthread_t threadId = idleDataThread->threadId;
520 processInfo->threadId = threadId;
521 AddDataInfoToThread(processInfo);
522 WakeUpDataThread(threadId);
523 }
524
OnReceiveNewConnection(int sessionId)525 int32_t OnReceiveNewConnection(int sessionId)
526 {
527 uint32_t handle = sessionId;
528 IpcSkeleton *current = GetCurrentSkeleton();
529 if (current == NULL) {
530 RPC_LOG_ERROR("current ipcskeleton is nullptr");
531 return ERR_FAILED;
532 }
533
534 HandleSessionList *stubSession = (HandleSessionList *)malloc(sizeof(HandleSessionList));
535 if (stubSession == NULL) {
536 RPC_LOG_ERROR("stubSession malloc failed");
537 return ERR_FAILED;
538 }
539 stubSession->handle = handle;
540 stubSession->sessionId = sessionId;
541 if (AttachStubSession(stubSession) != ERR_NONE) {
542 RPC_LOG_ERROR("AttachStubSession failed");
543 free(stubSession);
544 return ERR_FAILED;
545 }
546 return HandleNewConnection(RpcGetSessionIdList(), sessionId);
547 }
548
OnDatabusSessionClosed(int sessionId)549 void OnDatabusSessionClosed(int sessionId)
550 {
551 if (sessionId < 0) {
552 return;
553 }
554
555 uint32_t handle = sessionId;
556 HandleSessionList *handleSession = QueryStubSession(handle);
557 if (handleSession != NULL) {
558 DetachStubSession(handleSession);
559 free(handleSession);
560 RPC_LOG_INFO("OnDatabusSessionClosed called on rpc stub");
561 return;
562 }
563
564 handleSession = QueryProxySessionBySessionId(sessionId);
565 if (handleSession == NULL) {
566 RPC_LOG_INFO("OnDatabusSessionClosed query session is null");
567 return;
568 }
569 DetachProxySession(handleSession);
570
571 HandleToIndexList *handeleIndex = QueryHandleToIndex(handleSession->handle);
572 if (handeleIndex == NULL) {
573 RPC_LOG_INFO("OnDatabusSessionClosed query stub index is null");
574 return;
575 }
576 DetachHandleToIndex(handeleIndex);
577
578 IpcSkeleton *ipcSkeleton = GetCurrentSkeleton();
579 if (ipcSkeleton == NULL) {
580 RPC_LOG_ERROR("GetCurrentSkeleton return null");
581 return;
582 }
583
584 DeathCallback *node = NULL;
585 UTILS_DL_LIST_FOR_EACH_ENTRY(node, &ipcSkeleton->objects, DeathCallback, list)
586 {
587 if (node->handle == handleSession->handle) {
588 RPC_LOG_INFO("OnDatabusSessionClosed SendObituary handle %d", node->handle);
589 SendObituary(node);
590 DeleteDeathCallback(node);
591 break;
592 }
593 }
594 }
595
HasCompletePackage(const char * data,uint32_t readCursor,uint32_t len)596 static uint32_t HasCompletePackage(const char *data, uint32_t readCursor, uint32_t len)
597 {
598 const dbinder_transaction_data *tr = (const dbinder_transaction_data *)(data + readCursor);
599 if ((tr->magic == DBINDER_MAGICWORD) &&
600 (tr->sizeOfSelf <= SOCKET_MAX_BUFF_SIZE + sizeof(dbinder_transaction_data)) &&
601 (readCursor + tr->sizeOfSelf <= len)) {
602 return (uint32_t)tr->sizeOfSelf;
603 }
604 return 0;
605 }
606
OnMessageAvailable(int sessionId,const void * data,uint32_t len)607 void OnMessageAvailable(int sessionId, const void *data, uint32_t len)
608 {
609 if (sessionId < 0 || data == NULL || len < sizeof(dbinder_transaction_data)) {
610 RPC_LOG_ERROR("session has wrong inputs");
611 return;
612 }
613
614 uint32_t handle = sessionId;
615 uint32_t readSize = 0;
616 while (readSize + sizeof(dbinder_transaction_data) < len) {
617 uint32_t packageSize = HasCompletePackage(data, readSize, len);
618 if (packageSize > 0) {
619 StartProcessLoop(handle, data, packageSize);
620 readSize += packageSize;
621 } else {
622 // If the current is abnormal, the subsequent is no longer processed.
623 break;
624 }
625 }
626 }
627
UpdateClientSession(int32_t handle,HandleSessionList * sessionObject,const char * serviceName,const char * deviceId)628 void UpdateClientSession(int32_t handle, HandleSessionList *sessionObject,
629 const char *serviceName, const char *deviceId)
630 {
631 if (handle < 0 || sessionObject == NULL || serviceName == NULL || deviceId == NULL) {
632 RPC_LOG_ERROR("UpdateClientSession params invalid");
633 return;
634 }
635
636 RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
637 if (rpcSkeleton == NULL) {
638 return;
639 }
640 int sessionId = rpcSkeleton->rpcTrans->Connect(serviceName, deviceId, NULL);
641 if (sessionId < 0) {
642 RPC_LOG_ERROR("UpdateClientSession connect failed");
643 return;
644 }
645 if (WaitForSessionIdReady(RpcGetSessionIdList(), sessionId) != ERR_NONE) {
646 RPC_LOG_ERROR("SendDataToRemote connect failed, sessionId=%d", sessionId);
647 return;
648 }
649
650 sessionObject->handle = handle;
651 sessionObject->sessionId = sessionId;
652 if (AttachProxySession(sessionObject) != ERR_NONE) {
653 RPC_LOG_ERROR("UpdateClientSession AttachProxySession failed");
654 }
655 }
656
CreateTransServer(const char * sessionName)657 int32_t CreateTransServer(const char *sessionName)
658 {
659 if (sessionName == NULL) {
660 return ERR_FAILED;
661 }
662 RpcSkeleton *rpcSkeleton = GetCurrentRpcSkeleton();
663 if (rpcSkeleton == NULL) {
664 return ERR_FAILED;
665 }
666
667 if (rpcSkeleton->isServerCreated == 0) {
668 return ERR_NONE;
669 }
670
671 pthread_mutex_lock(&rpcSkeleton->lock);
672 if (rpcSkeleton->isServerCreated == -1) {
673 if (rpcSkeleton->rpcTrans->StartListen(sessionName, GetRpcTransCallback()) != ERR_NONE) {
674 RPC_LOG_ERROR("CreateTransServer failed");
675 pthread_mutex_unlock(&rpcSkeleton->lock);
676 return ERR_FAILED;
677 }
678 rpcSkeleton->isServerCreated = 0;
679 pthread_mutex_unlock(&rpcSkeleton->lock);
680 return SpawnThread(SPAWN_ACTIVE, IF_PROT_DATABUS);
681 }
682 pthread_mutex_unlock(&rpcSkeleton->lock);
683
684 return ERR_NONE;
685 }
686
RpcAcquireHandle(int32_t handle)687 static int32_t RpcAcquireHandle(int32_t handle)
688 {
689 (void)handle;
690 return ERR_NONE;
691 }
692
RpcReleaseHandle(int32_t handle)693 static int32_t RpcReleaseHandle(int32_t handle)
694 {
695 (void)handle;
696 return ERR_NONE;
697 }
698
RpcInvokerSendRequest(SvcIdentity target,uint32_t code,IpcIo * data,IpcIo * reply,MessageOption option,uintptr_t * buffer)699 static int32_t RpcInvokerSendRequest(SvcIdentity target, uint32_t code, IpcIo *data, IpcIo *reply,
700 MessageOption option, uintptr_t *buffer)
701 {
702 RPC_LOG_INFO("RPCInvokerSendRequest called");
703 int32_t result;
704 uint64_t seqNumber = 0;
705
706 uint32_t userWaitTime = option.waitTime;
707 if (userWaitTime > RPC_MAX_SEND_WAIT_TIME) {
708 userWaitTime = RPC_MAX_SEND_WAIT_TIME;
709 }
710
711 HandleSessionList *sessinoObject = WriteTransaction(BC_TRANSACTION, option, target.handle,
712 0, code, data, &seqNumber, 0);
713 if (sessinoObject == NULL) {
714 return ERR_FAILED;
715 }
716
717 if (option.flags & TF_OP_ASYNC) {
718 result = SendOrWaitForCompletion(userWaitTime, seqNumber, sessinoObject, NULL, buffer);
719 } else {
720 result = SendOrWaitForCompletion(userWaitTime, seqNumber, sessinoObject, reply, buffer);
721 }
722
723 return result;
724 }
725
RpcFreeBuffer(void * ptr)726 static int32_t RpcFreeBuffer(void *ptr)
727 {
728 if (ptr != NULL) {
729 free(ptr);
730 }
731 return ERR_NONE;
732 }
733
RpcSetMaxWorkThread(int32_t maxThreadNum)734 static int32_t RpcSetMaxWorkThread(int32_t maxThreadNum)
735 {
736 (void)maxThreadNum;
737 return ERR_NONE;
738 }
739
RpcJoinThread(bool initiative)740 static void RpcJoinThread(bool initiative)
741 {
742 pthread_t threadId = pthread_self();
743
744 ThreadContext *threadContext = GetCurrentThreadContext();
745 if (threadContext == NULL) {
746 return;
747 }
748 threadContext->stopWorkThread = false;
749
750 while (threadContext->stopWorkThread == false) {
751 AddDataThreadInWait(threadId);
752 ThreadProcessInfo *processInfo = PopDataInfoFromThread(threadId);
753 if (processInfo != NULL) {
754 OnTransaction(processInfo);
755 free(processInfo->buffer);
756 free(processInfo);
757 }
758 }
759 }
760
RpcStopWorkThread(void)761 void RpcStopWorkThread(void)
762 {
763 IpcSkeleton *current = GetCurrentSkeleton();
764 if (current == NULL) {
765 return;
766 }
767
768 ThreadContext *threadContext = GetCurrentThreadContext();
769 if (threadContext == NULL) {
770 return;
771 }
772 threadContext->stopWorkThread = true;
773 }
774
RpcSetRegistryObject(SvcIdentity target,SvcIdentity * samgr)775 static int32_t RpcSetRegistryObject(SvcIdentity target, SvcIdentity *samgr)
776 {
777 (void)target;
778 (void)samgr;
779 return ERR_NONE;
780 }
781
RpcAddDeathRecipient(int32_t handle,void * cookie)782 static int32_t RpcAddDeathRecipient(int32_t handle, void *cookie)
783 {
784 (void)handle;
785 (void)cookie;
786 return ERR_NONE;
787 }
788
RpcRemoveDeathRecipient(int32_t handle,void * cookie)789 static int32_t RpcRemoveDeathRecipient(int32_t handle, void *cookie)
790 {
791 (void)handle;
792 (void)cookie;
793 return ERR_NONE;
794 }
795
GetRpcInvoker(void)796 RemoteInvoker *GetRpcInvoker(void)
797 {
798 if (g_rpcInvoker == NULL) {
799 g_rpcInvoker = (RemoteInvoker *)malloc(sizeof(RemoteInvoker));
800 if (g_rpcInvoker != NULL) {
801 g_rpcInvoker->AcquireHandle = RpcAcquireHandle;
802 g_rpcInvoker->ReleaseHandle = RpcReleaseHandle;
803 g_rpcInvoker->SendRequest = RpcInvokerSendRequest;
804 g_rpcInvoker->FreeBuffer = RpcFreeBuffer;
805 g_rpcInvoker->SetMaxWorkThread = RpcSetMaxWorkThread;
806 g_rpcInvoker->JoinThread = RpcJoinThread;
807 g_rpcInvoker->ExitCurrentThread = RpcStopWorkThread;
808 g_rpcInvoker->SetRegistryObject = RpcSetRegistryObject;
809 g_rpcInvoker->AddDeathRecipient = RpcAddDeathRecipient;
810 g_rpcInvoker->RemoveDeathRecipient = RpcRemoveDeathRecipient;
811 }
812 }
813
814 return g_rpcInvoker;
815 }