1 /*
2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "softbus_proxychannel_pipeline.h"
17
18 #include <securec.h>
19 #include <stdatomic.h>
20
21 #include "bus_center_manager.h"
22 #include "common_list.h"
23 #include "lnn_lane_interface.h"
24 #include "message_handler.h"
25 #include "softbus_adapter_mem.h"
26 #include "softbus_adapter_socket.h"
27 #include "softbus_error_code.h"
28 #include "softbus_transmission_interface.h"
29 #include "softbus_utils.h"
30 #include "trans_log.h"
31
32 #define SESSION_NAME "ohos.dsoftbus.inner.p2pchannel"
33 #define PIPELINEHANDLER_NAME "ProxyChannelPipelineHandler"
34 #define MSG_CNT 2
35
36 enum PipelineLooperMsgType {
37 LOOPER_MSG_TYPE_OPEN_CHANNEL,
38 LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL,
39
40 LOOPER_MSG_TYPE_ON_CHANNEL_OPENED,
41 LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED,
42 };
43
44 struct ListenerItem {
45 TransProxyPipelineMsgType type;
46 ITransProxyPipelineListener listener;
47 };
48
49 struct PipelineChannelItem {
50 ListNode node;
51
52 // for open channel request context
53 int32_t requestId;
54 char networkId[NETWORK_ID_BUF_LEN];
55 TransProxyPipelineChannelOption option;
56 ITransProxyPipelineCallback callback;
57
58 // for channel opened context
59 int32_t channelId;
60 int32_t ref;
61 char uuid[UUID_BUF_LEN];
62 };
63
64 struct PipelineManager {
65 _Atomic bool inited;
66 SoftBusMutex lock;
67 struct ListenerItem listeners[MSG_TYPE_CNT];
68 SoftBusList *channels;
69
70 SoftBusLooper *looper;
71 SoftBusHandler handler;
72 };
73
74 static struct PipelineManager g_manager = {
75 .inited = false,
76 .listeners = {},
77 .looper = NULL,
78 .handler = {},
79 };
80
81 typedef bool (*Comparable)(const struct PipelineChannelItem *item, const void *param);
SearchChannelItemUnsafe(const void * param,Comparable func)82 static struct PipelineChannelItem *SearchChannelItemUnsafe(const void *param, Comparable func)
83 {
84 struct PipelineChannelItem *target = NULL;
85 struct PipelineChannelItem *it = NULL;
86 LIST_FOR_EACH_ENTRY(it, &g_manager.channels->list, struct PipelineChannelItem, node) {
87 if (func(it, param)) {
88 target = it;
89 }
90 }
91 return target;
92 }
93
CompareByRequestId(const struct PipelineChannelItem * item,const void * param)94 static bool CompareByRequestId(const struct PipelineChannelItem *item, const void *param)
95 {
96 return item->requestId == *(int32_t *)param;
97 }
98
CompareByChannelId(const struct PipelineChannelItem * item,const void * param)99 static bool CompareByChannelId(const struct PipelineChannelItem *item, const void *param)
100 {
101 return item->channelId == *(int32_t *)param;
102 }
103
CompareByUuid(const struct PipelineChannelItem * item,const void * param)104 static bool CompareByUuid(const struct PipelineChannelItem *item, const void *param)
105 {
106 return strlen(item->uuid) != 0 && strcmp(item->uuid, (const char *)param) == 0;
107 }
108
TransProxyPipelineFreeMessage(SoftBusMessage * msg)109 static void TransProxyPipelineFreeMessage(SoftBusMessage *msg)
110 {
111 TRANS_CHECK_AND_RETURN_LOGW(msg, TRANS_CTRL, "null msg");
112 if (msg->obj != NULL) {
113 SoftBusFree(msg->obj);
114 msg->obj = NULL;
115 }
116 SoftBusFree(msg);
117 }
118
TransProxyReuseByChannelId(int32_t channelId)119 int32_t TransProxyReuseByChannelId(int32_t channelId)
120 {
121 TRANS_LOGD(TRANS_CTRL, "enter.");
122 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
123 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
124 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
125 if (target == NULL) {
126 TRANS_LOGE(TRANS_CTRL, "channel not exist. channelId=%{public}d", channelId);
127 SoftBusMutexUnlock(&g_manager.channels->lock);
128 return SOFTBUS_NOT_FIND;
129 }
130 target->ref++;
131 SoftBusMutexUnlock(&g_manager.channels->lock);
132 return SOFTBUS_OK;
133 }
134
TransProxyPipelineGenRequestId(void)135 int32_t TransProxyPipelineGenRequestId(void)
136 {
137 static int32_t requestIdGenerator = 0;
138 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
139 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
140 int32_t retValue = ++requestIdGenerator;
141 SoftBusMutexUnlock(&g_manager.channels->lock);
142 return retValue;
143 }
144
TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type,const ITransProxyPipelineListener * listener)145 int32_t TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type, const ITransProxyPipelineListener *listener)
146 {
147 TRANS_LOGD(TRANS_CTRL, "enter.");
148 TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
149 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d", type);
150 TRANS_CHECK_AND_RETURN_RET_LOGW(listener && listener->onDataReceived && listener->onDisconnected,
151 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "listen is invalid");
152
153 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
154 TRANS_CTRL, "lock failed");
155 for (int32_t i = 0; i < MSG_CNT; i++) {
156 if (g_manager.listeners[i].type == type) {
157 TRANS_LOGW(TRANS_CTRL, "repeat register listener, overwrite it. type=%{public}d", type);
158 g_manager.listeners[i].listener = *listener;
159 SoftBusMutexUnlock(&g_manager.lock);
160 return SOFTBUS_OK;
161 }
162 if (g_manager.listeners[i].type == MSG_TYPE_INVALID) {
163 g_manager.listeners[i].type = type;
164 g_manager.listeners[i].listener = *listener;
165 SoftBusMutexUnlock(&g_manager.lock);
166 return SOFTBUS_OK;
167 }
168 }
169 TRANS_LOGE(TRANS_CTRL, "register listener failed: no position. type=%{public}d", type);
170 SoftBusMutexUnlock(&g_manager.lock);
171 return SOFTBUS_TRANS_REGISTER_LISTENER_FAILED;
172 }
173
TransProxyPipelineOpenChannel(int32_t requestId,const char * networkId,const TransProxyPipelineChannelOption * option,const ITransProxyPipelineCallback * callback)174 int32_t TransProxyPipelineOpenChannel(int32_t requestId, const char *networkId,
175 const TransProxyPipelineChannelOption *option, const ITransProxyPipelineCallback *callback)
176 {
177 TRANS_LOGD(TRANS_CTRL, "enter.");
178 if (!IsValidString(networkId, ID_MAX_LEN)) {
179 return SOFTBUS_INVALID_PARAM;
180 }
181 TRANS_CHECK_AND_RETURN_RET_LOGE(option != NULL, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "option invalid");
182 TRANS_CHECK_AND_RETURN_RET_LOGE(networkId, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid network id");
183 TRANS_CHECK_AND_RETURN_RET_LOGE(callback && callback->onChannelOpened && callback->onChannelOpenFailed,
184 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid callback");
185
186 if (option->bleDirect) {
187 TRANS_CHECK_AND_RETURN_RET_LOGE(
188 ConnBleDirectIsEnable(BLE_COC), SOFTBUS_FUNC_NOT_SUPPORT, TRANS_CTRL, "ble direct is not enable");
189 }
190 struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
191 TRANS_CHECK_AND_RETURN_RET_LOGE(
192 item != NULL, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc item failed, reqId=%{public}d", requestId);
193 item->requestId = requestId;
194 if (strcpy_s(item->networkId, NETWORK_ID_BUF_LEN, networkId) != EOK) {
195 TRANS_LOGE(TRANS_CTRL, "strcpy_s network id failed, reqId=%{public}d", requestId);
196 SoftBusFree(item);
197 return SOFTBUS_STRCPY_ERR;
198 }
199 item->option = *option;
200 item->callback = *callback;
201 item->channelId = INVALID_CHANNEL_ID;
202
203 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
204 if (msg == NULL) {
205 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, reqId=%{public}d", requestId);
206 SoftBusFree(item);
207 return SOFTBUS_MALLOC_ERR;
208 }
209 msg->what = LOOPER_MSG_TYPE_OPEN_CHANNEL;
210 msg->arg1 = (uint64_t)requestId;
211 msg->handler = &g_manager.handler;
212 msg->FreeMessage = TransProxyPipelineFreeMessage;
213
214 if (SoftBusMutexLock(&g_manager.channels->lock) != SOFTBUS_OK) {
215 TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d", requestId);
216 SoftBusFree(item);
217 SoftBusFree(msg);
218 return SOFTBUS_LOCK_ERR;
219 }
220 ListInit(&item->node);
221 ListAdd(&g_manager.channels->list, &item->node);
222 TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
223 g_manager.channels->cnt++;
224 SoftBusMutexUnlock(&g_manager.channels->lock);
225
226 g_manager.looper->PostMessage(g_manager.looper, msg);
227 return SOFTBUS_OK;
228 }
229
TransProxyPipelineSendMessage(int32_t channelId,const uint8_t * data,uint32_t dataLen,TransProxyPipelineMsgType type)230 int32_t TransProxyPipelineSendMessage(
231 int32_t channelId, const uint8_t *data, uint32_t dataLen, TransProxyPipelineMsgType type)
232 {
233 TRANS_LOGD(TRANS_CTRL, "enter.");
234 TRANS_CHECK_AND_RETURN_RET_LOGW(data, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "data is invalid");
235 TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
236 SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d ", type);
237
238 char *sendData = (char *)SoftBusCalloc(dataLen + sizeof(uint32_t));
239 TRANS_CHECK_AND_RETURN_RET_LOGW(sendData, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc send data failed");
240 *(uint32_t *)sendData = SoftBusHtoLl((uint32_t)type);
241 if (memcpy_s(sendData + sizeof(uint32_t), dataLen, data, dataLen) != EOK) {
242 TRANS_LOGE(TRANS_CTRL, "memcpy send data failed");
243 SoftBusFree(sendData);
244 return SOFTBUS_MEM_ERR;
245 }
246 int32_t ret = TransSendNetworkingMessage(channelId, sendData, dataLen + sizeof(uint32_t), CONN_HIGH);
247 if (ret != SOFTBUS_OK) {
248 TRANS_LOGE(TRANS_CTRL, "trans send data failed");
249 SoftBusFree(sendData);
250 return ret;
251 }
252 SoftBusFree(sendData);
253 return SOFTBUS_OK;
254 }
255
TransProxyPipelineGetChannelIdByNetworkId(const char * networkId)256 int32_t TransProxyPipelineGetChannelIdByNetworkId(const char *networkId)
257 {
258 TRANS_LOGD(TRANS_CTRL, "enter.");
259 if (!IsValidString(networkId, ID_MAX_LEN)) {
260 return SOFTBUS_INVALID_PARAM;
261 }
262 char uuid[UUID_BUF_LEN] = { 0 };
263 int32_t ret = LnnGetRemoteStrInfo(networkId, STRING_KEY_UUID, uuid, sizeof(uuid));
264 if (ret != SOFTBUS_OK) {
265 TRANS_LOGE(TRANS_CTRL, "get remote uuid by network id fail, ret=%{public}d", ret);
266 return INVALID_CHANNEL_ID;
267 }
268
269 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
270 INVALID_CHANNEL_ID, TRANS_CTRL, "lock failed");
271 struct PipelineChannelItem *target = SearchChannelItemUnsafe(uuid, CompareByUuid);
272 if (target == NULL) {
273 TRANS_LOGE(TRANS_CTRL, "channel not found");
274 SoftBusMutexUnlock(&g_manager.channels->lock);
275 return INVALID_CHANNEL_ID;
276 }
277 int32_t channelId = target->channelId;
278 SoftBusMutexUnlock(&g_manager.channels->lock);
279 return channelId;
280 }
281
TransProxyPipelineGetUuidByChannelId(int32_t channelId,char * uuid,uint32_t uuidLen)282 int32_t TransProxyPipelineGetUuidByChannelId(int32_t channelId, char *uuid, uint32_t uuidLen)
283 {
284 TRANS_LOGD(TRANS_CTRL, "enter.");
285 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
286 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
287 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
288 if (target == NULL) {
289 TRANS_LOGE(TRANS_CTRL, "channelId not exist. channelId=%{public}d", channelId);
290 SoftBusMutexUnlock(&g_manager.channels->lock);
291 return SOFTBUS_NOT_FIND;
292 }
293 if (strcpy_s(uuid, uuidLen, target->uuid) != EOK) {
294 SoftBusMutexUnlock(&g_manager.channels->lock);
295 return SOFTBUS_STRCPY_ERR;
296 }
297 SoftBusMutexUnlock(&g_manager.channels->lock);
298 return SOFTBUS_OK;
299 }
300
TransProxyPipelineCloseChannel(int32_t channelId)301 int32_t TransProxyPipelineCloseChannel(int32_t channelId)
302 {
303 TRANS_LOGI(TRANS_CTRL, "enter.");
304 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
305 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
306
307 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
308 if (target != NULL) {
309 target->ref--;
310 if (target->ref <= 0) {
311 ListDelete(&target->node);
312 g_manager.channels->cnt -= 1;
313 SoftBusFree(target);
314 SoftBusMutexUnlock(&g_manager.channels->lock);
315 TRANS_LOGW(TRANS_CTRL, "close channelId=%{public}d", channelId);
316 return TransCloseNetWorkingChannel(channelId);
317 }
318 TRANS_LOGI(TRANS_CTRL, "channelId=%{public}d, ref=%{public}d", channelId, target->ref);
319 }
320 SoftBusMutexUnlock(&g_manager.channels->lock);
321 return SOFTBUS_OK;
322 }
323
TransProxyPipelineCloseChannelDelay(int32_t channelId)324 int32_t TransProxyPipelineCloseChannelDelay(int32_t channelId)
325 {
326 #define DELAY_CLOSE_CHANNEL_MS 3000
327 TRANS_LOGD(TRANS_CTRL, "enter.");
328 TRANS_CHECK_AND_RETURN_RET_LOGW(channelId != INVALID_CHANNEL_ID, SOFTBUS_INVALID_PARAM,
329 TRANS_CTRL, "invalid channelId=%{public}d", channelId);
330 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
331 if (msg == NULL) {
332 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
333 return SOFTBUS_MALLOC_ERR;
334 }
335 msg->what = LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL;
336 msg->arg1 = (uint64_t)channelId;
337 msg->handler = &g_manager.handler;
338 msg->FreeMessage = TransProxyPipelineFreeMessage;
339 g_manager.looper->PostMessageDelay(g_manager.looper, msg, DELAY_CLOSE_CHANNEL_MS);
340 return SOFTBUS_OK;
341 }
342
InnerSaveChannel(int32_t channelId,const char * uuid)343 int32_t InnerSaveChannel(int32_t channelId, const char *uuid)
344 {
345 if (uuid == NULL) {
346 TRANS_LOGE(TRANS_CTRL, "invalid uuid");
347 return SOFTBUS_TRANS_INVALID_UUID;
348 }
349 TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
350 SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
351 struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
352 if (item == NULL) {
353 SoftBusMutexUnlock(&g_manager.channels->lock);
354 return SOFTBUS_MALLOC_ERR;
355 }
356 item->channelId = channelId;
357 if (strcpy_s(item->uuid, UUID_BUF_LEN, uuid) != EOK) {
358 SoftBusFree(item);
359 SoftBusMutexUnlock(&g_manager.channels->lock);
360 return SOFTBUS_STRCPY_ERR;
361 }
362 ListInit(&item->node);
363 ListAdd(&g_manager.channels->list, &item->node);
364 TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
365 g_manager.channels->cnt += 1;
366 SoftBusMutexUnlock(&g_manager.channels->lock);
367 return SOFTBUS_OK;
368 }
369
TransProxyPipelineOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)370 static int TransProxyPipelineOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
371 {
372 TRANS_LOGD(TRANS_CTRL, "enter.");
373 if (uuid == NULL) {
374 TRANS_LOGE(TRANS_CTRL, "invalid uuid");
375 return SOFTBUS_TRANS_INVALID_UUID;
376 }
377 char *clone = (char *)SoftBusCalloc(UUID_BUF_LEN);
378 if (clone == NULL || strcpy_s(clone, UUID_BUF_LEN, uuid) != EOK) {
379 TRANS_LOGE(TRANS_CTRL, "copy uuid failed, channelId=%{public}d", channelId);
380 SoftBusFree(clone);
381 return SOFTBUS_MEM_ERR;
382 }
383 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
384 if (msg == NULL) {
385 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
386 SoftBusFree(clone);
387 return SOFTBUS_MALLOC_ERR;
388 }
389 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPENED;
390 msg->arg1 = (uint64_t)channelId;
391 msg->arg2 = isServer;
392 msg->obj = clone;
393 msg->handler = &g_manager.handler;
394 msg->FreeMessage = TransProxyPipelineFreeMessage;
395 g_manager.looper->PostMessage(g_manager.looper, msg);
396 return SOFTBUS_OK;
397 }
398 #ifdef __cplusplus
399 extern "C" {
400 #endif
InnerOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)401 static void InnerOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
402 {
403 TRANS_LOGD(TRANS_CTRL, "enter.");
404 if (isServer) {
405 if (InnerSaveChannel(channelId, uuid) != SOFTBUS_OK) {
406 TRANS_LOGE(TRANS_CTRL, "save server channel failed");
407 TransCloseNetWorkingChannel(channelId);
408 }
409 return;
410 }
411 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
412 if (ret != SOFTBUS_OK) {
413 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
414 TransCloseNetWorkingChannel(channelId);
415 return;
416 }
417
418 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
419 if (target == NULL) {
420 TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
421 SoftBusMutexUnlock(&g_manager.channels->lock);
422 TransCloseNetWorkingChannel(channelId);
423 return;
424 }
425 int32_t requestId = target->requestId;
426 ITransProxyPipelineCallback callback = {
427 .onChannelOpened = target->callback.onChannelOpened,
428 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
429 };
430 if (strcpy_s(target->uuid, UUID_BUF_LEN, uuid) != EOK) {
431 TRANS_LOGE(TRANS_CTRL, "strcpy uuid failed, channelId=%{public}d", channelId);
432 ListDelete(&target->node);
433 SoftBusFree(target);
434 g_manager.channels->cnt -= 1;
435 ret = SOFTBUS_STRCPY_ERR;
436 }
437 SoftBusMutexUnlock(&g_manager.channels->lock);
438 if (ret != SOFTBUS_OK) {
439 TransCloseNetWorkingChannel(channelId);
440 callback.onChannelOpenFailed(requestId, ret);
441 } else {
442 callback.onChannelOpened(requestId, channelId);
443 }
444 }
445 #ifdef __cplusplus
446 }
447 #endif
TransProxyPipelineOnChannelOpenFailed(int32_t channelId,const char * uuid)448 static void TransProxyPipelineOnChannelOpenFailed(int32_t channelId, const char *uuid)
449 {
450 (void)uuid;
451 TRANS_LOGD(TRANS_CTRL, "enter.");
452 struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
453 if (msg == NULL) {
454 TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
455 return;
456 }
457 msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED;
458 msg->arg1 = (uint64_t)channelId;
459 msg->handler = &g_manager.handler;
460 msg->FreeMessage = TransProxyPipelineFreeMessage;
461 g_manager.looper->PostMessage(g_manager.looper, msg);
462 }
463
InnerOnChannelOpenFailed(int32_t channelId)464 static void InnerOnChannelOpenFailed(int32_t channelId)
465 {
466 TRANS_LOGD(TRANS_CTRL, "enter.");
467 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
468 if (ret != SOFTBUS_OK) {
469 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
470 return;
471 }
472
473 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
474 if (target == NULL) {
475 TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
476 SoftBusMutexUnlock(&g_manager.channels->lock);
477 return;
478 }
479 int32_t requestId = target->requestId;
480 ITransProxyPipelineCallback callback = {
481 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
482 };
483 ListDelete(&target->node);
484 TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
485 SoftBusFree(target);
486 g_manager.channels->cnt -= 1;
487 SoftBusMutexUnlock(&g_manager.channels->lock);
488 callback.onChannelOpenFailed(requestId, SOFTBUS_TRANS_CHANNEL_OPEN_FAILED);
489 TRANS_LOGI(TRANS_CTRL, "exit");
490 }
491
TransProxyPipelineOnChannelClosed(int32_t channelId)492 static void TransProxyPipelineOnChannelClosed(int32_t channelId)
493 {
494 TRANS_LOGD(TRANS_CTRL, "enter.");
495 struct PipelineChannelItem *target = NULL;
496 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
497 if (ret != SOFTBUS_OK) {
498 TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
499 goto exit;
500 }
501 target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
502 if (target != NULL) {
503 ListDelete(&target->node);
504 TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
505 SoftBusFree(target);
506 g_manager.channels->cnt -= 1;
507 }
508 SoftBusMutexUnlock(&g_manager.channels->lock);
509 exit:
510 for (int32_t i = 0; i < MSG_CNT; i++) {
511 if (g_manager.listeners[i].type != MSG_TYPE_INVALID && g_manager.listeners[i].listener.onDisconnected != NULL) {
512 g_manager.listeners[i].listener.onDisconnected(channelId);
513 }
514 }
515 }
516
TransProxyPipelineOnMessageReceived(int32_t channelId,const char * data,uint32_t len)517 static void TransProxyPipelineOnMessageReceived(int32_t channelId, const char *data, uint32_t len)
518 {
519 TRANS_LOGD(TRANS_CTRL, "enter.");
520 TRANS_CHECK_AND_RETURN_LOGW(data, TRANS_CTRL, "data is invalid");
521 TRANS_CHECK_AND_RETURN_LOGW(len > sizeof(uint32_t), TRANS_CTRL, "len is too short. len=%{public}d", len);
522
523 uint32_t msgType = SoftBusLtoHl(*(uint32_t *)data);
524 struct ListenerItem *target = NULL;
525 for (int32_t i = 0; i < MSG_CNT; i++) {
526 if ((uint32_t)(g_manager.listeners[i].type) == msgType) {
527 target = g_manager.listeners + i;
528 break;
529 }
530 }
531
532 if (target == NULL || target->listener.onDataReceived == NULL) {
533 TRANS_LOGE(TRANS_CTRL, "not listener for msgType=%{public}u", msgType);
534 return;
535 }
536 target->listener.onDataReceived(channelId, data + sizeof(uint32_t), len - sizeof(uint32_t));
537 }
538
OpenNetWorkingChannel(int32_t requestId,ITransProxyPipelineCallback * callback,LanePreferredLinkList * preferred,char * networkId,struct PipelineChannelItem * target)539 static void OpenNetWorkingChannel(int32_t requestId, ITransProxyPipelineCallback *callback,
540 LanePreferredLinkList *preferred, char *networkId, struct PipelineChannelItem *target)
541 {
542 int32_t channelId = TransOpenNetWorkingChannel(SESSION_NAME, networkId, preferred);
543 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
544 TRANS_CHECK_AND_RETURN_LOGE(ret == SOFTBUS_OK, TRANS_CTRL, "fail to lock channels.");
545 target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
546 if (target == NULL) {
547 TRANS_LOGE(TRANS_CTRL,
548 "open proxy session failed, reqId=%{public}d, channelId=%{public}d", requestId, channelId);
549 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
550 if (channelId != INVALID_CHANNEL_ID) {
551 TransCloseNetWorkingChannel(channelId);
552 }
553 return;
554 }
555 callback->onChannelOpenFailed = target->callback.onChannelOpenFailed;
556
557 if (channelId == INVALID_CHANNEL_ID) {
558 TRANS_LOGE(TRANS_CTRL, "open proxy channel failed, reqId=%{public}d", requestId);
559 ListDelete(&target->node);
560 g_manager.channels->cnt -= 1;
561 SoftBusFree(target);
562 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
563 callback->onChannelOpenFailed(requestId, SOFTBUS_ERR);
564 return;
565 }
566 target->channelId = channelId;
567 target->ref = 1;
568 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
569 }
570
InnerOpenProxyChannel(int32_t requestId)571 static void InnerOpenProxyChannel(int32_t requestId)
572 {
573 TRANS_LOGD(TRANS_CTRL, "enter.");
574 int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
575 if (ret != SOFTBUS_OK) {
576 TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d, ret=%{public}d", requestId, ret);
577 return;
578 }
579 struct PipelineChannelItem *target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
580 if (target == NULL) {
581 TRANS_LOGE(TRANS_CTRL, "channel not found. reqId=%{public}d", requestId);
582 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
583 return;
584 }
585 ITransProxyPipelineCallback callback = {
586 .onChannelOpenFailed = target->callback.onChannelOpenFailed,
587 };
588 LanePreferredLinkList preferred = { 0 };
589 if (target->option.bleDirect) {
590 preferred.linkTypeNum = 1;
591 preferred.linkType[0] = LANE_COC_DIRECT;
592 }
593 char networkId[NETWORK_ID_BUF_LEN] = { 0 };
594 if (strcpy_s(networkId, sizeof(networkId), target->networkId) != EOK) {
595 TRANS_LOGE(TRANS_CTRL, "strcpy_s failed, reqId=%{public}d", requestId);
596 ListDelete(&target->node);
597 g_manager.channels->cnt -= 1;
598 SoftBusFree(target);
599 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
600 callback.onChannelOpenFailed(requestId, SOFTBUS_STRCPY_ERR);
601 return;
602 }
603 target = NULL;
604 (void)SoftBusMutexUnlock(&g_manager.channels->lock);
605
606 OpenNetWorkingChannel(requestId, &callback, &preferred, networkId, target);
607 }
608 #ifdef __cplusplus
609 extern "C" {
610 #endif
TransProxyPipelineHandleMessage(SoftBusMessage * msg)611 static void TransProxyPipelineHandleMessage(SoftBusMessage *msg)
612 {
613 TRANS_LOGD(TRANS_CTRL, "enter, messageType=%{public}d", msg->what);
614 switch (msg->what) {
615 case LOOPER_MSG_TYPE_OPEN_CHANNEL:
616 InnerOpenProxyChannel(msg->arg1);
617 break;
618 case LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL:
619 TransProxyPipelineCloseChannel(msg->arg1);
620 break;
621 case LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED:
622 InnerOnChannelOpenFailed(msg->arg1);
623 break;
624 case LOOPER_MSG_TYPE_ON_CHANNEL_OPENED:
625 InnerOnChannelOpened(msg->arg1, (char *)msg->obj, msg->arg2);
626 break;
627 default:
628 TRANS_LOGE(TRANS_CTRL, "unknown messageType=%{public}d", msg->what);
629 break;
630 }
631 }
632
TransProxyPipelineInit(void)633 int32_t TransProxyPipelineInit(void)
634 {
635 TRANS_LOGD(TRANS_CTRL, "enter.");
636 SoftBusList *channels = NULL;
637 int32_t ret = 0;
638 INetworkingListener listener = {
639 .onChannelOpened = TransProxyPipelineOnChannelOpened,
640 .onChannelOpenFailed = TransProxyPipelineOnChannelOpenFailed,
641 .onChannelClosed = TransProxyPipelineOnChannelClosed,
642 .onMessageReceived = TransProxyPipelineOnMessageReceived,
643 };
644
645 if (atomic_load_explicit(&(g_manager.inited), memory_order_acquire)) {
646 return SOFTBUS_OK;
647 };
648 channels = CreateSoftBusList();
649 if (channels == NULL) {
650 goto exit;
651 }
652 if (SoftBusMutexInit(&g_manager.lock, NULL) != SOFTBUS_OK) {
653 goto exit;
654 }
655 g_manager.channels = channels;
656
657 ret = TransRegisterNetworkingChannelListener(SESSION_NAME, &listener);
658 if (ret != SOFTBUS_OK) {
659 goto exit;
660 }
661 g_manager.looper = GetLooper(LOOP_TYPE_DEFAULT);
662 if (g_manager.looper == NULL) {
663 TRANS_LOGE(TRANS_INIT, "fail to get looper.");
664 return SOFTBUS_LOOPER_ERR;
665 }
666 g_manager.handler.looper = g_manager.looper;
667 strcpy_s(g_manager.handler.name, strlen(PIPELINEHANDLER_NAME) + 1, PIPELINEHANDLER_NAME);
668 g_manager.handler.HandleMessage = TransProxyPipelineHandleMessage;
669 atomic_store_explicit(&(g_manager.inited), true, memory_order_release);
670 return SOFTBUS_OK;
671 exit:
672 if (channels != NULL) {
673 TRANS_LOGE(TRANS_INIT, "softbus list is not null.");
674 DestroySoftBusList(channels);
675 }
676 g_manager.channels = NULL;
677 SoftBusMutexDestroy(&g_manager.lock);
678 atomic_store_explicit(&(g_manager.inited), false, memory_order_release);
679
680 return SOFTBUS_TRANS_INIT_FAILED;
681 }
682 #ifdef __cplusplus
683 }
684 #endif