1 /*
2 * Copyright (c) 2020-2023 Huawei Device Co., Ltd.
3 *
4 * HDF is dual licensed: you can use it either under the terms of
5 * the GPL, or the BSD license, at your option.
6 * See the LICENSE file in the root of this repository for complete details.
7 */
8
9 #include "securec.h"
10 #include "osal_mutex.h"
11 #include "osal_thread.h"
12 #include "osal_time.h"
13 #include "hdf_log.h"
14 #include "hdf_wlan_priority_queue.h"
15 #include "message_dispatcher.h"
16 #include "message_router_inner.h"
17
18 #ifdef USERSPACE_CLIENT_SUPPORT
19 #define HDF_LOG_TAG UMsgEngine
20 #else
21 #define HDF_LOG_TAG KMsgEngine
22 #endif
23
24 typedef struct {
25 INHERT_MESSAGE_DISPATCHER;
26 OSAL_DECLARE_THREAD(dispatcherThread);
27 } LocalMessageDispatcher;
28
ReleaseMessageContext(MessageContext * context)29 void ReleaseMessageContext(MessageContext *context)
30 {
31 if (context == NULL) {
32 HDF_LOGE("%s: Input param is null!", __func__);
33 return;
34 }
35 if (context->rspData != NULL) {
36 HdfSbufRecycle(context->rspData);
37 context->rspData = NULL;
38 }
39
40 if (context->crossNode ||
41 (context->requestType != MESSAGE_TYPE_SYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_RSP)) {
42 // Sync request message may use stack mem.Memory is managed by user
43 if (context->reqData != NULL) {
44 HdfSbufRecycle(context->reqData);
45 context->reqData = NULL;
46 }
47 OsalMemFree(context);
48 }
49 HDF_LOGD("%s: ReleaseMessageContext finished!", __func__);
50 }
51
ReleaseMessageMapper(struct ServiceDef * mapper)52 void ReleaseMessageMapper(struct ServiceDef *mapper)
53 {
54 if (mapper == NULL) {
55 return;
56 }
57 if (mapper->messages != NULL) {
58 OsalMemFree(mapper->messages);
59 mapper->messages = NULL;
60 }
61 OsalMemFree(mapper);
62 }
63
GetMsgDef(const struct ServiceDef * serviceDef,uint32_t commandId)64 struct MessageDef *GetMsgDef(const struct ServiceDef *serviceDef, uint32_t commandId)
65 {
66 struct MessageDef *msgDef = NULL;
67 if (serviceDef == NULL || serviceDef->messages == NULL) {
68 HDF_LOGE("%s:input is NULL!", __func__);
69 return NULL;
70 }
71 if (commandId >= serviceDef->messagesLength) {
72 HDF_LOGE("%s:commandId exceed service def!", __func__);
73 return NULL;
74 }
75
76 msgDef = serviceDef->messages + commandId;
77 if (msgDef->handler == NULL) {
78 HDF_LOGE("%s:command has no handler!", __func__);
79 return NULL;
80 }
81 return msgDef;
82 }
83
AppendToLocalDispatcher(MessageDispatcher * dispatcher,const uint8_t priority,MessageContext * context)84 ErrorCode AppendToLocalDispatcher(MessageDispatcher *dispatcher, const uint8_t priority, MessageContext *context)
85 {
86 if (context == NULL) {
87 HDF_LOGE("%s:Input context is NULL!", __func__);
88 return ME_ERROR_NULL_PTR;
89 }
90 if (dispatcher == NULL) {
91 HDF_LOGE("%s:Input dispatcher is NULL!", __func__);
92 return ME_ERROR_NULL_PTR;
93 }
94
95 if (dispatcher->messageQueue == NULL) {
96 HDF_LOGE("MessageQueue is NULL.");
97 return ME_ERROR_NULL_PTR;
98 }
99
100 if (dispatcher->status != ME_STATUS_RUNNING) {
101 HDF_LOGE("%s:Dispatcher is not running", __func__);
102 return ME_ERROR_DISPATCHER_NOT_RUNNING;
103 }
104 HDF_LOGD("%s:AppendToLocalDispatcher finished!", __func__);
105 return PushPriorityQueue(dispatcher->messageQueue, priority, context);
106 }
107
SetToResponse(MessageContext * context)108 void SetToResponse(MessageContext *context)
109 {
110 ServiceId senderId;
111 if (context->requestType != MESSAGE_TYPE_ASYNC_REQ && context->requestType != MESSAGE_TYPE_SYNC_REQ) {
112 HDF_LOGE("Only sync and async message can send response!type=%u", context->requestType);
113 return;
114 }
115 senderId = context->senderId;
116 context->senderId = context->receiverId;
117 context->receiverId = senderId;
118 context->requestType = MESSAGE_RSP_START + context->requestType - MESSAGE_REQ_START;
119 HDF_LOGD("%s:SetToResponse finished", __func__);
120 }
121
HandleAsyncResponse(MessageContext * context)122 static void HandleAsyncResponse(MessageContext *context)
123 {
124 if (context == NULL) {
125 return;
126 }
127
128 if (context->callback != NULL) {
129 context->callback((RequestContext *)context, context->reqData, context->rspData, context->responseStatus);
130 }
131
132 ReleaseMessageContext(context);
133 }
134
HandleSyncResponse(MessageContext * context)135 static void HandleSyncResponse(MessageContext *context)
136 {
137 HDF_STATUS status;
138 if (context == NULL) {
139 HDF_LOGE("Input context is NULL!");
140 return;
141 }
142 status = OsalSemPost(&context->rspSemaphore);
143 if (status != HDF_SUCCESS) {
144 HDF_LOGE("Send semaphore failed!CMD=%u,Sender=%u,Receiver=%u", context->commandId, context->senderId,
145 context->receiverId);
146 }
147 return;
148 }
149
HandleRequestMessage(MessageContext * context)150 static void HandleRequestMessage(MessageContext *context)
151 {
152 RemoteService *targetService = RefRemoteService(context->receiverId);
153 ErrorCode errCode = ME_SUCCESS;
154 RemoteService *rspService = NULL;
155 do {
156 if (targetService == NULL) {
157 HDF_LOGE("%s:Service %u is not available!", __func__, context->receiverId);
158 errCode = ME_ERROR_NULL_PTR;
159 break;
160 }
161
162 if (targetService->ExecRequestMsg == NULL) {
163 HDF_LOGE("%s:Service %u has no ExecMsg method!", __func__, context->receiverId);
164 errCode = ME_ERROR_NULL_PTR;
165 break;
166 }
167 targetService->ExecRequestMsg(targetService, context);
168
169 // Convert to response message
170 SetToResponse(context);
171
172 if (context->requestType == MESSAGE_TYPE_ASYNC_RSP && context->callback == NULL) {
173 ReleaseMessageContext(context);
174 break;
175 }
176
177 rspService = RefRemoteService(context->receiverId);
178 if (rspService == NULL) {
179 errCode = ME_ERROR_NO_SUCH_SERVICE;
180 break;
181 }
182 if (rspService->SendMessage == NULL) {
183 errCode = ME_ERROR_BAD_SERVICE;
184 break;
185 }
186 errCode = rspService->SendMessage(rspService, context);
187 } while (false);
188
189 if (errCode != ME_SUCCESS) {
190 if (context->requestType == MESSAGE_TYPE_SYNC_RSP || context->requestType == MESSAGE_TYPE_SYNC_REQ) {
191 (void)OsalSemPost(&context->rspSemaphore);
192 } else {
193 ReleaseMessageContext(context);
194 }
195 }
196
197 if (targetService != NULL && targetService->Disref != NULL) {
198 targetService->Disref(targetService);
199 targetService = NULL;
200 }
201
202 if (rspService != NULL && rspService->Disref != NULL) {
203 rspService->Disref(rspService);
204 rspService = NULL;
205 }
206 }
207
HandleMessage(MessageContext * context)208 static void HandleMessage(MessageContext *context)
209 {
210 if (context != NULL) {
211 switch (context->requestType) {
212 case MESSAGE_TYPE_SYNC_REQ:
213 case MESSAGE_TYPE_ASYNC_REQ:
214 HandleRequestMessage(context);
215 break;
216 case MESSAGE_TYPE_SYNC_RSP:
217 HandleSyncResponse(context);
218 break;
219 case MESSAGE_TYPE_ASYNC_RSP:
220 HandleAsyncResponse(context);
221 break;
222 default:
223 HDF_LOGE("Unsupported message type %u", context->requestType);
224 }
225 }
226 }
227
ReleaseAllMessage(MessageDispatcher * dispatcher)228 static void ReleaseAllMessage(MessageDispatcher *dispatcher)
229 {
230 MessageContext *context = NULL;
231 do {
232 HDF_LOGD("%s: ReleaseAllMessage starting...!", __func__);
233 context = PopPriorityQueue(dispatcher->messageQueue, 0);
234 ReleaseMessageContext(context);
235 } while (context != NULL);
236 }
237
RunDispatcher(void * para)238 static int RunDispatcher(void *para)
239 {
240 MessageDispatcher *dispatcher = NULL;
241 MessageContext *context = NULL;
242 if (para == NULL) {
243 HDF_LOGE("Start dispatcher failed! cause:%s\n", "input para is NULL");
244 return ME_ERROR_NULL_PTR;
245 }
246 dispatcher = (MessageDispatcher *)para;
247 if (dispatcher->messageQueue == NULL) {
248 HDF_LOGE("Start dispatcher failed! cause:%s\n", "message queue is NULL");
249 return ME_ERROR_NULL_PTR;
250 }
251
252 if (dispatcher->Ref != NULL) {
253 dispatcher = dispatcher->Ref(dispatcher);
254 }
255
256 if (dispatcher->status != ME_STATUS_STARTTING) {
257 if (dispatcher->Disref != NULL) {
258 dispatcher->Disref(dispatcher);
259 }
260 HDF_LOGE("Start dispatcher failed! cause:%s\n", "dispatcher is not stopped");
261 return ME_ERROR_WRONG_STATUS;
262 } else {
263 dispatcher->status = ME_STATUS_RUNNING;
264 }
265 while (dispatcher->status == ME_STATUS_RUNNING) {
266 context = PopPriorityQueue(dispatcher->messageQueue, QUEUE_OPER_TIMEOUT);
267 if (context == NULL) {
268 continue;
269 }
270 HandleMessage(context);
271 }
272
273 ReleaseAllMessage(dispatcher);
274 dispatcher->status = ME_STATUS_TODESTROY;
275 if (dispatcher->Disref != NULL) {
276 dispatcher->Disref(dispatcher);
277 dispatcher = NULL;
278 }
279
280 HDF_LOGW("Dispatcher shutdown!");
281 return ME_SUCCESS;
282 }
283
StartDispatcher(MessageDispatcher * dispatcher)284 static ErrorCode StartDispatcher(MessageDispatcher *dispatcher)
285 {
286 HDF_STATUS status;
287 ErrorCode errCode;
288 LocalMessageDispatcher *localDispatcher = NULL;
289 struct OsalThreadParam config;
290 (void)memset_s(&config, sizeof(config), 0, sizeof(config));
291 if (dispatcher == NULL) {
292 return ME_ERROR_NULL_PTR;
293 }
294
295 status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
296 if (status != HDF_SUCCESS) {
297 return ME_ERROR_OPER_MUTEX_FAILED;
298 }
299
300 errCode = ME_SUCCESS;
301 do {
302 if (dispatcher->status != ME_STATUS_STOPPED) {
303 errCode = ME_ERROR_WRONG_STATUS;
304 break;
305 }
306 dispatcher->status = ME_STATUS_STARTTING;
307 config.name = "MessageDispatcher";
308 config.priority = OSAL_THREAD_PRI_DEFAULT;
309 config.stackSize = 0x2000;
310 localDispatcher = (LocalMessageDispatcher *)dispatcher;
311 status = OsalThreadCreate(&localDispatcher->dispatcherThread, RunDispatcher, localDispatcher);
312 if (status != HDF_SUCCESS) {
313 HDF_LOGE("%s:OsalThreadCreate failed!status=%d", __func__, status);
314 dispatcher->status = ME_STATUS_STOPPED;
315 errCode = ME_ERROR_CREATE_THREAD_FAILED;
316 break;
317 }
318
319 status = OsalThreadStart(&localDispatcher->dispatcherThread, &config);
320 if (status != HDF_SUCCESS) {
321 HDF_LOGE("%s:OsalThreadStart failed!status=%d", __func__, status);
322 dispatcher->status = ME_STATUS_STOPPED;
323 OsalThreadDestroy(&localDispatcher->dispatcherThread);
324 errCode = ME_ERROR_CREATE_THREAD_FAILED;
325 break;
326 }
327 } while (false);
328
329 status = OsalMutexUnlock(&dispatcher->mutex);
330 if (status != HDF_SUCCESS) {
331 HDF_LOGE("%s:Destroy mutex failed!", __func__);
332 }
333
334 if (errCode != ME_SUCCESS) {
335 return errCode;
336 }
337
338 do {
339 OsalMSleep(1);
340 } while (dispatcher->status == ME_STATUS_STARTTING);
341 return (dispatcher->status == ME_STATUS_RUNNING) ? ME_SUCCESS : ME_ERROR_WRONG_STATUS;
342 }
343
ShutdownDispatcher(MessageDispatcher * dispatcher)344 static void ShutdownDispatcher(MessageDispatcher *dispatcher)
345 {
346 HDF_STATUS status;
347 if (dispatcher == NULL) {
348 HDF_LOGE("%s: Input param is null!", __func__);
349 return;
350 }
351 status = OsalMutexTimedLock(&dispatcher->mutex, HDF_WAIT_FOREVER);
352 if (status != HDF_SUCCESS) {
353 HDF_LOGE("Get lock failed!status=%d", status);
354 return;
355 }
356
357 do {
358 if (dispatcher->status != ME_STATUS_RUNNING && dispatcher->status != ME_STATUS_STARTTING) {
359 HDF_LOGE("%s:wrong status.status=%d", __func__, dispatcher->status);
360 break;
361 }
362 dispatcher->status = ME_STATUS_STOPPING;
363 } while (false);
364
365 status = OsalMutexUnlock(&dispatcher->mutex);
366 if (status != HDF_SUCCESS) {
367 HDF_LOGE("%s:Destroy mutex failed!", __func__);
368 }
369 HDF_LOGD("%s:ShutdownDispatcher successful!", __func__);
370 }
371
372 IMPLEMENT_SHARED_OBJ(MessageDispatcher);
DestroyLocalDispatcher(MessageDispatcher * dispatcher)373 static void DestroyLocalDispatcher(MessageDispatcher *dispatcher)
374 {
375 int32_t ret;
376 if (dispatcher == NULL) {
377 HDF_LOGE("%s: Input param is null!", __func__);
378 return;
379 }
380
381 ReleaseAllMessage(dispatcher);
382
383 if (dispatcher->messageQueue != NULL) {
384 DestroyPriorityQueue(dispatcher->messageQueue);
385 dispatcher->messageQueue = NULL;
386 }
387
388 ret = OsalMutexDestroy(&dispatcher->mutex);
389 if (ret != HDF_SUCCESS) {
390 HDF_LOGE("%s:Release mutex failed.ret=%d", __func__, ret);
391 }
392
393 DEINIT_SHARED_OBJ(MessageDispatcher, dispatcher);
394 HDF_LOGD("%s:DestroyLocalDispatcher finished!", __func__);
395 }
396
CreateLocalDispatcher(MessageDispatcher ** dispatcher,const DispatcherConfig * config)397 ErrorCode CreateLocalDispatcher(MessageDispatcher **dispatcher, const DispatcherConfig *config)
398 {
399 LocalMessageDispatcher *localDispatcher = NULL;
400 int32_t ret;
401 ErrorCode errCode;
402 if (dispatcher == NULL || config == NULL) {
403 HDF_LOGE("%s: Input param is null!", __func__);
404 return ME_ERROR_NULL_PTR;
405 }
406
407 localDispatcher = (LocalMessageDispatcher *)OsalMemCalloc(sizeof(LocalMessageDispatcher));
408 if (localDispatcher == NULL) {
409 HDF_LOGE("%s: Request memory failed!", __func__);
410 return ME_ERROR_RES_LAKE;
411 }
412 do {
413 HDF_LOGI("%s: Create local dispatcher...!", __func__);
414 localDispatcher->status = ME_STATUS_STOPPED;
415 localDispatcher->AppendMessage = AppendToLocalDispatcher;
416 localDispatcher->Shutdown = ShutdownDispatcher;
417 localDispatcher->Start = StartDispatcher;
418
419 localDispatcher->messageQueue = CreatePriorityQueue(config->queueSize, config->priorityLevelCount);
420 if (localDispatcher->messageQueue == NULL) {
421 HDF_LOGE("%s: CreatePriorityQueue failed!", __func__);
422 errCode = ME_ERROR_OPER_QUEUE_FAILED;
423 break;
424 }
425
426 ret = OsalMutexInit(&localDispatcher->mutex);
427 if (ret != HDF_SUCCESS) {
428 HDF_LOGE("%s: Init mutex failed! ret=%d", __func__, ret);
429 errCode = ME_ERROR_OPER_MUTEX_FAILED;
430 break;
431 }
432
433 errCode = INIT_SHARED_OBJ(MessageDispatcher, (MessageDispatcher *)localDispatcher, DestroyLocalDispatcher);
434 if (errCode != ME_SUCCESS) {
435 HDF_LOGE("%s: INIT_SHARED_OBJ failed! errCode=%d", __func__, errCode);
436 break;
437 }
438 } while (false);
439
440 if (errCode == ME_SUCCESS) {
441 *dispatcher = (MessageDispatcher *)localDispatcher;
442 } else {
443 DestroyLocalDispatcher((MessageDispatcher *)localDispatcher);
444 OsalMemFree(localDispatcher);
445 }
446 HDF_LOGI("%s: CreateLocalDispatcher finished! errCode=%d", __func__, errCode);
447 return errCode;
448 }
449