1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "softbus_proxychannel_pipeline.h"
17 
18 #include <securec.h>
19 #include <stdatomic.h>
20 
21 #include "bus_center_manager.h"
22 #include "common_list.h"
23 #include "lnn_lane_interface.h"
24 #include "message_handler.h"
25 #include "softbus_adapter_mem.h"
26 #include "softbus_adapter_socket.h"
27 #include "softbus_error_code.h"
28 #include "softbus_transmission_interface.h"
29 #include "softbus_utils.h"
30 #include "trans_log.h"
31 
32 #define SESSION_NAME "ohos.dsoftbus.inner.p2pchannel"
33 #define PIPELINEHANDLER_NAME "ProxyChannelPipelineHandler"
34 #define MSG_CNT 2
35 
36 enum PipelineLooperMsgType {
37     LOOPER_MSG_TYPE_OPEN_CHANNEL,
38     LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL,
39 
40     LOOPER_MSG_TYPE_ON_CHANNEL_OPENED,
41     LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED,
42 };
43 
44 struct ListenerItem {
45     TransProxyPipelineMsgType type;
46     ITransProxyPipelineListener listener;
47 };
48 
49 struct PipelineChannelItem {
50     ListNode node;
51 
52     // for open channel request context
53     int32_t requestId;
54     char networkId[NETWORK_ID_BUF_LEN];
55     TransProxyPipelineChannelOption option;
56     ITransProxyPipelineCallback callback;
57 
58     // for channel opened context
59     int32_t channelId;
60     int32_t ref;
61     char uuid[UUID_BUF_LEN];
62 };
63 
64 struct PipelineManager {
65     _Atomic bool inited;
66     SoftBusMutex lock;
67     struct ListenerItem listeners[MSG_TYPE_CNT];
68     SoftBusList *channels;
69 
70     SoftBusLooper *looper;
71     SoftBusHandler handler;
72 };
73 
74 static struct PipelineManager g_manager = {
75     .inited = false,
76     .listeners = {},
77     .looper = NULL,
78     .handler = {},
79 };
80 
81 typedef bool (*Comparable)(const struct PipelineChannelItem *item, const void *param);
SearchChannelItemUnsafe(const void * param,Comparable func)82 static struct PipelineChannelItem *SearchChannelItemUnsafe(const void *param, Comparable func)
83 {
84     struct PipelineChannelItem *target = NULL;
85     struct PipelineChannelItem *it = NULL;
86     LIST_FOR_EACH_ENTRY(it, &g_manager.channels->list, struct PipelineChannelItem, node) {
87         if (func(it, param)) {
88             target = it;
89         }
90     }
91     return target;
92 }
93 
CompareByRequestId(const struct PipelineChannelItem * item,const void * param)94 static bool CompareByRequestId(const struct PipelineChannelItem *item, const void *param)
95 {
96     return item->requestId == *(int32_t *)param;
97 }
98 
CompareByChannelId(const struct PipelineChannelItem * item,const void * param)99 static bool CompareByChannelId(const struct PipelineChannelItem *item, const void *param)
100 {
101     return item->channelId == *(int32_t *)param;
102 }
103 
CompareByUuid(const struct PipelineChannelItem * item,const void * param)104 static bool CompareByUuid(const struct PipelineChannelItem *item, const void *param)
105 {
106     return strlen(item->uuid) != 0 && strcmp(item->uuid, (const char *)param) == 0;
107 }
108 
TransProxyPipelineFreeMessage(SoftBusMessage * msg)109 static void TransProxyPipelineFreeMessage(SoftBusMessage *msg)
110 {
111     TRANS_CHECK_AND_RETURN_LOGW(msg, TRANS_CTRL, "null msg");
112     if (msg->obj != NULL) {
113         SoftBusFree(msg->obj);
114         msg->obj = NULL;
115     }
116     SoftBusFree(msg);
117 }
118 
TransProxyReuseByChannelId(int32_t channelId)119 int32_t TransProxyReuseByChannelId(int32_t channelId)
120 {
121     TRANS_LOGD(TRANS_CTRL, "enter.");
122     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
123         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
124     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
125     if (target == NULL) {
126         TRANS_LOGE(TRANS_CTRL, "channel not exist. channelId=%{public}d", channelId);
127         SoftBusMutexUnlock(&g_manager.channels->lock);
128         return SOFTBUS_NOT_FIND;
129     }
130     target->ref++;
131     SoftBusMutexUnlock(&g_manager.channels->lock);
132     return SOFTBUS_OK;
133 }
134 
TransProxyPipelineGenRequestId(void)135 int32_t TransProxyPipelineGenRequestId(void)
136 {
137     static int32_t requestIdGenerator = 0;
138     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
139         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
140     int32_t retValue = ++requestIdGenerator;
141     SoftBusMutexUnlock(&g_manager.channels->lock);
142     return retValue;
143 }
144 
TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type,const ITransProxyPipelineListener * listener)145 int32_t TransProxyPipelineRegisterListener(TransProxyPipelineMsgType type, const ITransProxyPipelineListener *listener)
146 {
147     TRANS_LOGD(TRANS_CTRL, "enter.");
148     TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
149         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d", type);
150     TRANS_CHECK_AND_RETURN_RET_LOGW(listener && listener->onDataReceived && listener->onDisconnected,
151         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "listen is invalid");
152 
153     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.lock) == SOFTBUS_OK, SOFTBUS_LOCK_ERR,
154         TRANS_CTRL, "lock failed");
155     for (int32_t i = 0; i < MSG_CNT; i++) {
156         if (g_manager.listeners[i].type == type) {
157             TRANS_LOGW(TRANS_CTRL, "repeat register listener, overwrite it. type=%{public}d", type);
158             g_manager.listeners[i].listener = *listener;
159             SoftBusMutexUnlock(&g_manager.lock);
160             return SOFTBUS_OK;
161         }
162         if (g_manager.listeners[i].type == MSG_TYPE_INVALID) {
163             g_manager.listeners[i].type = type;
164             g_manager.listeners[i].listener = *listener;
165             SoftBusMutexUnlock(&g_manager.lock);
166             return SOFTBUS_OK;
167         }
168     }
169     TRANS_LOGE(TRANS_CTRL, "register listener failed: no position. type=%{public}d", type);
170     SoftBusMutexUnlock(&g_manager.lock);
171     return SOFTBUS_TRANS_REGISTER_LISTENER_FAILED;
172 }
173 
TransProxyPipelineOpenChannel(int32_t requestId,const char * networkId,const TransProxyPipelineChannelOption * option,const ITransProxyPipelineCallback * callback)174 int32_t TransProxyPipelineOpenChannel(int32_t requestId, const char *networkId,
175     const TransProxyPipelineChannelOption *option, const ITransProxyPipelineCallback *callback)
176 {
177     TRANS_LOGD(TRANS_CTRL, "enter.");
178     if (!IsValidString(networkId, ID_MAX_LEN)) {
179         return SOFTBUS_INVALID_PARAM;
180     }
181     TRANS_CHECK_AND_RETURN_RET_LOGE(option != NULL, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "option invalid");
182     TRANS_CHECK_AND_RETURN_RET_LOGE(networkId, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid network id");
183     TRANS_CHECK_AND_RETURN_RET_LOGE(callback && callback->onChannelOpened && callback->onChannelOpenFailed,
184         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "invalid callback");
185 
186     if (option->bleDirect) {
187         TRANS_CHECK_AND_RETURN_RET_LOGE(
188             ConnBleDirectIsEnable(BLE_COC), SOFTBUS_FUNC_NOT_SUPPORT, TRANS_CTRL, "ble direct is not enable");
189     }
190     struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
191     TRANS_CHECK_AND_RETURN_RET_LOGE(
192         item != NULL, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc item failed, reqId=%{public}d", requestId);
193     item->requestId = requestId;
194     if (strcpy_s(item->networkId, NETWORK_ID_BUF_LEN, networkId) != EOK) {
195         TRANS_LOGE(TRANS_CTRL, "strcpy_s network id failed, reqId=%{public}d", requestId);
196         SoftBusFree(item);
197         return SOFTBUS_STRCPY_ERR;
198     }
199     item->option = *option;
200     item->callback = *callback;
201     item->channelId = INVALID_CHANNEL_ID;
202 
203     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
204     if (msg == NULL) {
205         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, reqId=%{public}d", requestId);
206         SoftBusFree(item);
207         return SOFTBUS_MALLOC_ERR;
208     }
209     msg->what = LOOPER_MSG_TYPE_OPEN_CHANNEL;
210     msg->arg1 = (uint64_t)requestId;
211     msg->handler = &g_manager.handler;
212     msg->FreeMessage = TransProxyPipelineFreeMessage;
213 
214     if (SoftBusMutexLock(&g_manager.channels->lock) != SOFTBUS_OK) {
215         TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d", requestId);
216         SoftBusFree(item);
217         SoftBusFree(msg);
218         return SOFTBUS_LOCK_ERR;
219     }
220     ListInit(&item->node);
221     ListAdd(&g_manager.channels->list, &item->node);
222     TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
223     g_manager.channels->cnt++;
224     SoftBusMutexUnlock(&g_manager.channels->lock);
225 
226     g_manager.looper->PostMessage(g_manager.looper, msg);
227     return SOFTBUS_OK;
228 }
229 
TransProxyPipelineSendMessage(int32_t channelId,const uint8_t * data,uint32_t dataLen,TransProxyPipelineMsgType type)230 int32_t TransProxyPipelineSendMessage(
231     int32_t channelId, const uint8_t *data, uint32_t dataLen, TransProxyPipelineMsgType type)
232 {
233     TRANS_LOGD(TRANS_CTRL, "enter.");
234     TRANS_CHECK_AND_RETURN_RET_LOGW(data, SOFTBUS_INVALID_PARAM, TRANS_CTRL, "data is invalid");
235     TRANS_CHECK_AND_RETURN_RET_LOGW(type == MSG_TYPE_P2P_NEGO || type == MSG_TYPE_IP_PORT_EXCHANGE,
236         SOFTBUS_INVALID_PARAM, TRANS_CTRL, "type is invalid. type=%{public}d ", type);
237 
238     char *sendData = (char *)SoftBusCalloc(dataLen + sizeof(uint32_t));
239     TRANS_CHECK_AND_RETURN_RET_LOGW(sendData, SOFTBUS_MALLOC_ERR, TRANS_CTRL, "malloc send data failed");
240     *(uint32_t *)sendData = SoftBusHtoLl((uint32_t)type);
241     if (memcpy_s(sendData + sizeof(uint32_t), dataLen, data, dataLen) != EOK) {
242         TRANS_LOGE(TRANS_CTRL, "memcpy send data failed");
243         SoftBusFree(sendData);
244         return SOFTBUS_MEM_ERR;
245     }
246     int32_t ret = TransSendNetworkingMessage(channelId, sendData, dataLen + sizeof(uint32_t), CONN_HIGH);
247     if (ret != SOFTBUS_OK) {
248         TRANS_LOGE(TRANS_CTRL, "trans send data failed");
249         SoftBusFree(sendData);
250         return ret;
251     }
252     SoftBusFree(sendData);
253     return SOFTBUS_OK;
254 }
255 
TransProxyPipelineGetChannelIdByNetworkId(const char * networkId)256 int32_t TransProxyPipelineGetChannelIdByNetworkId(const char *networkId)
257 {
258     TRANS_LOGD(TRANS_CTRL, "enter.");
259     if (!IsValidString(networkId, ID_MAX_LEN)) {
260         return SOFTBUS_INVALID_PARAM;
261     }
262     char uuid[UUID_BUF_LEN] = { 0 };
263     int32_t ret = LnnGetRemoteStrInfo(networkId, STRING_KEY_UUID, uuid, sizeof(uuid));
264     if (ret != SOFTBUS_OK) {
265         TRANS_LOGE(TRANS_CTRL, "get remote uuid by network id fail, ret=%{public}d", ret);
266         return INVALID_CHANNEL_ID;
267     }
268 
269     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
270         INVALID_CHANNEL_ID, TRANS_CTRL, "lock failed");
271     struct PipelineChannelItem *target = SearchChannelItemUnsafe(uuid, CompareByUuid);
272     if (target == NULL) {
273         TRANS_LOGE(TRANS_CTRL, "channel not found");
274         SoftBusMutexUnlock(&g_manager.channels->lock);
275         return INVALID_CHANNEL_ID;
276     }
277     int32_t channelId = target->channelId;
278     SoftBusMutexUnlock(&g_manager.channels->lock);
279     return channelId;
280 }
281 
TransProxyPipelineGetUuidByChannelId(int32_t channelId,char * uuid,uint32_t uuidLen)282 int32_t TransProxyPipelineGetUuidByChannelId(int32_t channelId, char *uuid, uint32_t uuidLen)
283 {
284     TRANS_LOGD(TRANS_CTRL, "enter.");
285     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
286         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
287     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
288     if (target == NULL) {
289         TRANS_LOGE(TRANS_CTRL, "channelId not exist. channelId=%{public}d", channelId);
290         SoftBusMutexUnlock(&g_manager.channels->lock);
291         return SOFTBUS_NOT_FIND;
292     }
293     if (strcpy_s(uuid, uuidLen, target->uuid) != EOK) {
294         SoftBusMutexUnlock(&g_manager.channels->lock);
295         return SOFTBUS_STRCPY_ERR;
296     }
297     SoftBusMutexUnlock(&g_manager.channels->lock);
298     return SOFTBUS_OK;
299 }
300 
TransProxyPipelineCloseChannel(int32_t channelId)301 int32_t TransProxyPipelineCloseChannel(int32_t channelId)
302 {
303     TRANS_LOGI(TRANS_CTRL, "enter.");
304     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
305         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
306 
307     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
308     if (target != NULL) {
309         target->ref--;
310         if (target->ref <= 0) {
311             ListDelete(&target->node);
312             g_manager.channels->cnt -= 1;
313             SoftBusFree(target);
314             SoftBusMutexUnlock(&g_manager.channels->lock);
315             TRANS_LOGW(TRANS_CTRL, "close channelId=%{public}d", channelId);
316             return TransCloseNetWorkingChannel(channelId);
317         }
318         TRANS_LOGI(TRANS_CTRL, "channelId=%{public}d, ref=%{public}d", channelId, target->ref);
319     }
320     SoftBusMutexUnlock(&g_manager.channels->lock);
321     return SOFTBUS_OK;
322 }
323 
TransProxyPipelineCloseChannelDelay(int32_t channelId)324 int32_t TransProxyPipelineCloseChannelDelay(int32_t channelId)
325 {
326 #define DELAY_CLOSE_CHANNEL_MS 3000
327     TRANS_LOGD(TRANS_CTRL, "enter.");
328     TRANS_CHECK_AND_RETURN_RET_LOGW(channelId != INVALID_CHANNEL_ID, SOFTBUS_INVALID_PARAM,
329         TRANS_CTRL, "invalid channelId=%{public}d", channelId);
330     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
331     if (msg == NULL) {
332         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
333         return SOFTBUS_MALLOC_ERR;
334     }
335     msg->what = LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL;
336     msg->arg1 = (uint64_t)channelId;
337     msg->handler = &g_manager.handler;
338     msg->FreeMessage = TransProxyPipelineFreeMessage;
339     g_manager.looper->PostMessageDelay(g_manager.looper, msg, DELAY_CLOSE_CHANNEL_MS);
340     return SOFTBUS_OK;
341 }
342 
InnerSaveChannel(int32_t channelId,const char * uuid)343 int32_t InnerSaveChannel(int32_t channelId, const char *uuid)
344 {
345     if (uuid == NULL) {
346         TRANS_LOGE(TRANS_CTRL, "invalid uuid");
347         return SOFTBUS_TRANS_INVALID_UUID;
348     }
349     TRANS_CHECK_AND_RETURN_RET_LOGW(SoftBusMutexLock(&g_manager.channels->lock) == SOFTBUS_OK,
350         SOFTBUS_LOCK_ERR, TRANS_CTRL, "lock failed");
351     struct PipelineChannelItem *item = (struct PipelineChannelItem *)SoftBusCalloc(sizeof(struct PipelineChannelItem));
352     if (item == NULL) {
353         SoftBusMutexUnlock(&g_manager.channels->lock);
354         return SOFTBUS_MALLOC_ERR;
355     }
356     item->channelId = channelId;
357     if (strcpy_s(item->uuid, UUID_BUF_LEN, uuid) != EOK) {
358         SoftBusFree(item);
359         SoftBusMutexUnlock(&g_manager.channels->lock);
360         return SOFTBUS_STRCPY_ERR;
361     }
362     ListInit(&item->node);
363     ListAdd(&g_manager.channels->list, &item->node);
364     TRANS_LOGI(TRANS_CTRL, "add channelId=%{public}d", item->channelId);
365     g_manager.channels->cnt += 1;
366     SoftBusMutexUnlock(&g_manager.channels->lock);
367     return SOFTBUS_OK;
368 }
369 
TransProxyPipelineOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)370 static int TransProxyPipelineOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
371 {
372     TRANS_LOGD(TRANS_CTRL, "enter.");
373     if (uuid == NULL) {
374         TRANS_LOGE(TRANS_CTRL, "invalid uuid");
375         return SOFTBUS_TRANS_INVALID_UUID;
376     }
377     char *clone = (char *)SoftBusCalloc(UUID_BUF_LEN);
378     if (clone == NULL || strcpy_s(clone, UUID_BUF_LEN, uuid) != EOK) {
379         TRANS_LOGE(TRANS_CTRL, "copy uuid failed, channelId=%{public}d", channelId);
380         SoftBusFree(clone);
381         return SOFTBUS_MEM_ERR;
382     }
383     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
384     if (msg == NULL) {
385         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
386         SoftBusFree(clone);
387         return SOFTBUS_MALLOC_ERR;
388     }
389     msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPENED;
390     msg->arg1 = (uint64_t)channelId;
391     msg->arg2 = isServer;
392     msg->obj = clone;
393     msg->handler = &g_manager.handler;
394     msg->FreeMessage = TransProxyPipelineFreeMessage;
395     g_manager.looper->PostMessage(g_manager.looper, msg);
396     return SOFTBUS_OK;
397 }
398 #ifdef  __cplusplus
399 extern "C" {
400 #endif
InnerOnChannelOpened(int32_t channelId,const char * uuid,unsigned char isServer)401 static void InnerOnChannelOpened(int32_t channelId, const char *uuid, unsigned char isServer)
402 {
403     TRANS_LOGD(TRANS_CTRL, "enter.");
404     if (isServer) {
405         if (InnerSaveChannel(channelId, uuid) != SOFTBUS_OK) {
406             TRANS_LOGE(TRANS_CTRL, "save server channel failed");
407             TransCloseNetWorkingChannel(channelId);
408         }
409         return;
410     }
411     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
412     if (ret != SOFTBUS_OK) {
413         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
414         TransCloseNetWorkingChannel(channelId);
415         return;
416     }
417 
418     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
419     if (target == NULL) {
420         TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
421         SoftBusMutexUnlock(&g_manager.channels->lock);
422         TransCloseNetWorkingChannel(channelId);
423         return;
424     }
425     int32_t requestId = target->requestId;
426     ITransProxyPipelineCallback callback = {
427         .onChannelOpened = target->callback.onChannelOpened,
428         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
429     };
430     if (strcpy_s(target->uuid, UUID_BUF_LEN, uuid) != EOK) {
431         TRANS_LOGE(TRANS_CTRL, "strcpy uuid failed, channelId=%{public}d", channelId);
432         ListDelete(&target->node);
433         SoftBusFree(target);
434         g_manager.channels->cnt -= 1;
435         ret = SOFTBUS_STRCPY_ERR;
436     }
437     SoftBusMutexUnlock(&g_manager.channels->lock);
438     if (ret != SOFTBUS_OK) {
439         TransCloseNetWorkingChannel(channelId);
440         callback.onChannelOpenFailed(requestId, ret);
441     } else {
442         callback.onChannelOpened(requestId, channelId);
443     }
444 }
445 #ifdef  __cplusplus
446 }
447 #endif
TransProxyPipelineOnChannelOpenFailed(int32_t channelId,const char * uuid)448 static void TransProxyPipelineOnChannelOpenFailed(int32_t channelId, const char *uuid)
449 {
450     (void)uuid;
451     TRANS_LOGD(TRANS_CTRL, "enter.");
452     struct SoftBusMessage *msg = (struct SoftBusMessage *)SoftBusCalloc(sizeof(SoftBusMessage));
453     if (msg == NULL) {
454         TRANS_LOGE(TRANS_CTRL, "malloc msg failed, channelId=%{public}d", channelId);
455         return;
456     }
457     msg->what = LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED;
458     msg->arg1 = (uint64_t)channelId;
459     msg->handler = &g_manager.handler;
460     msg->FreeMessage = TransProxyPipelineFreeMessage;
461     g_manager.looper->PostMessage(g_manager.looper, msg);
462 }
463 
InnerOnChannelOpenFailed(int32_t channelId)464 static void InnerOnChannelOpenFailed(int32_t channelId)
465 {
466     TRANS_LOGD(TRANS_CTRL, "enter.");
467     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
468     if (ret != SOFTBUS_OK) {
469         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
470         return;
471     }
472 
473     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
474     if (target == NULL) {
475         TRANS_LOGE(TRANS_CTRL, "channelId not found. channelId=%{public}d", channelId);
476         SoftBusMutexUnlock(&g_manager.channels->lock);
477         return;
478     }
479     int32_t requestId = target->requestId;
480     ITransProxyPipelineCallback callback = {
481         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
482     };
483     ListDelete(&target->node);
484     TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
485     SoftBusFree(target);
486     g_manager.channels->cnt -= 1;
487     SoftBusMutexUnlock(&g_manager.channels->lock);
488     callback.onChannelOpenFailed(requestId, SOFTBUS_TRANS_CHANNEL_OPEN_FAILED);
489     TRANS_LOGI(TRANS_CTRL, "exit");
490 }
491 
TransProxyPipelineOnChannelClosed(int32_t channelId)492 static void TransProxyPipelineOnChannelClosed(int32_t channelId)
493 {
494     TRANS_LOGD(TRANS_CTRL, "enter.");
495     struct PipelineChannelItem *target = NULL;
496     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
497     if (ret != SOFTBUS_OK) {
498         TRANS_LOGE(TRANS_CTRL, "lock channels failed, channelId=%{public}d, ret=%{public}d", channelId, ret);
499         goto exit;
500     }
501     target = SearchChannelItemUnsafe(&channelId, CompareByChannelId);
502     if (target != NULL) {
503         ListDelete(&target->node);
504         TRANS_LOGI(TRANS_CTRL, "delete channelId=%{public}d", channelId);
505         SoftBusFree(target);
506         g_manager.channels->cnt -= 1;
507     }
508     SoftBusMutexUnlock(&g_manager.channels->lock);
509 exit:
510     for (int32_t i = 0; i < MSG_CNT; i++) {
511         if (g_manager.listeners[i].type != MSG_TYPE_INVALID && g_manager.listeners[i].listener.onDisconnected != NULL) {
512             g_manager.listeners[i].listener.onDisconnected(channelId);
513         }
514     }
515 }
516 
TransProxyPipelineOnMessageReceived(int32_t channelId,const char * data,uint32_t len)517 static void TransProxyPipelineOnMessageReceived(int32_t channelId, const char *data, uint32_t len)
518 {
519     TRANS_LOGD(TRANS_CTRL, "enter.");
520     TRANS_CHECK_AND_RETURN_LOGW(data, TRANS_CTRL, "data is invalid");
521     TRANS_CHECK_AND_RETURN_LOGW(len > sizeof(uint32_t), TRANS_CTRL, "len is too short. len=%{public}d", len);
522 
523     uint32_t msgType = SoftBusLtoHl(*(uint32_t *)data);
524     struct ListenerItem *target = NULL;
525     for (int32_t i = 0; i < MSG_CNT; i++) {
526         if ((uint32_t)(g_manager.listeners[i].type) == msgType) {
527             target = g_manager.listeners + i;
528             break;
529         }
530     }
531 
532     if (target == NULL || target->listener.onDataReceived == NULL) {
533         TRANS_LOGE(TRANS_CTRL, "not listener for msgType=%{public}u", msgType);
534         return;
535     }
536     target->listener.onDataReceived(channelId, data + sizeof(uint32_t), len - sizeof(uint32_t));
537 }
538 
OpenNetWorkingChannel(int32_t requestId,ITransProxyPipelineCallback * callback,LanePreferredLinkList * preferred,char * networkId,struct PipelineChannelItem * target)539 static void OpenNetWorkingChannel(int32_t requestId, ITransProxyPipelineCallback *callback,
540     LanePreferredLinkList *preferred, char *networkId, struct PipelineChannelItem *target)
541 {
542     int32_t channelId = TransOpenNetWorkingChannel(SESSION_NAME, networkId, preferred);
543     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
544     TRANS_CHECK_AND_RETURN_LOGE(ret == SOFTBUS_OK, TRANS_CTRL, "fail to lock channels.");
545     target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
546     if (target == NULL) {
547         TRANS_LOGE(TRANS_CTRL,
548             "open proxy session failed, reqId=%{public}d, channelId=%{public}d", requestId, channelId);
549         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
550         if (channelId != INVALID_CHANNEL_ID) {
551             TransCloseNetWorkingChannel(channelId);
552         }
553         return;
554     }
555     callback->onChannelOpenFailed = target->callback.onChannelOpenFailed;
556 
557     if (channelId == INVALID_CHANNEL_ID) {
558         TRANS_LOGE(TRANS_CTRL, "open proxy channel failed, reqId=%{public}d", requestId);
559         ListDelete(&target->node);
560         g_manager.channels->cnt -= 1;
561         SoftBusFree(target);
562         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
563         callback->onChannelOpenFailed(requestId, SOFTBUS_ERR);
564         return;
565     }
566     target->channelId = channelId;
567     target->ref = 1;
568     (void)SoftBusMutexUnlock(&g_manager.channels->lock);
569 }
570 
InnerOpenProxyChannel(int32_t requestId)571 static void InnerOpenProxyChannel(int32_t requestId)
572 {
573     TRANS_LOGD(TRANS_CTRL, "enter.");
574     int32_t ret = SoftBusMutexLock(&g_manager.channels->lock);
575     if (ret != SOFTBUS_OK) {
576         TRANS_LOGE(TRANS_CTRL, "lock channels failed, reqId=%{public}d, ret=%{public}d", requestId, ret);
577         return;
578     }
579     struct PipelineChannelItem *target = SearchChannelItemUnsafe(&requestId, CompareByRequestId);
580     if (target == NULL) {
581         TRANS_LOGE(TRANS_CTRL, "channel not found. reqId=%{public}d", requestId);
582         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
583         return;
584     }
585     ITransProxyPipelineCallback callback = {
586         .onChannelOpenFailed = target->callback.onChannelOpenFailed,
587     };
588     LanePreferredLinkList preferred = { 0 };
589     if (target->option.bleDirect) {
590         preferred.linkTypeNum = 1;
591         preferred.linkType[0] = LANE_COC_DIRECT;
592     }
593     char networkId[NETWORK_ID_BUF_LEN] = { 0 };
594     if (strcpy_s(networkId, sizeof(networkId), target->networkId) != EOK) {
595         TRANS_LOGE(TRANS_CTRL, "strcpy_s failed, reqId=%{public}d", requestId);
596         ListDelete(&target->node);
597         g_manager.channels->cnt -= 1;
598         SoftBusFree(target);
599         (void)SoftBusMutexUnlock(&g_manager.channels->lock);
600         callback.onChannelOpenFailed(requestId, SOFTBUS_STRCPY_ERR);
601         return;
602     }
603     target = NULL;
604     (void)SoftBusMutexUnlock(&g_manager.channels->lock);
605 
606     OpenNetWorkingChannel(requestId, &callback, &preferred, networkId, target);
607 }
608 #ifdef  __cplusplus
609 extern "C" {
610 #endif
TransProxyPipelineHandleMessage(SoftBusMessage * msg)611 static void TransProxyPipelineHandleMessage(SoftBusMessage *msg)
612 {
613     TRANS_LOGD(TRANS_CTRL, "enter, messageType=%{public}d", msg->what);
614     switch (msg->what) {
615         case LOOPER_MSG_TYPE_OPEN_CHANNEL:
616             InnerOpenProxyChannel(msg->arg1);
617             break;
618         case LOOPER_MSG_TYPE_DELEY_CLOSE_CHANNEL:
619             TransProxyPipelineCloseChannel(msg->arg1);
620             break;
621         case LOOPER_MSG_TYPE_ON_CHANNEL_OPEN_FAILED:
622             InnerOnChannelOpenFailed(msg->arg1);
623             break;
624         case LOOPER_MSG_TYPE_ON_CHANNEL_OPENED:
625             InnerOnChannelOpened(msg->arg1, (char *)msg->obj, msg->arg2);
626             break;
627         default:
628             TRANS_LOGE(TRANS_CTRL, "unknown messageType=%{public}d", msg->what);
629             break;
630     }
631 }
632 
TransProxyPipelineInit(void)633 int32_t TransProxyPipelineInit(void)
634 {
635     TRANS_LOGD(TRANS_CTRL, "enter.");
636     SoftBusList *channels = NULL;
637     int32_t ret = 0;
638     INetworkingListener listener = {
639         .onChannelOpened = TransProxyPipelineOnChannelOpened,
640         .onChannelOpenFailed = TransProxyPipelineOnChannelOpenFailed,
641         .onChannelClosed = TransProxyPipelineOnChannelClosed,
642         .onMessageReceived = TransProxyPipelineOnMessageReceived,
643     };
644 
645     if (atomic_load_explicit(&(g_manager.inited), memory_order_acquire)) {
646         return SOFTBUS_OK;
647     };
648     channels = CreateSoftBusList();
649     if (channels == NULL) {
650         goto exit;
651     }
652     if (SoftBusMutexInit(&g_manager.lock, NULL) != SOFTBUS_OK) {
653         goto exit;
654     }
655     g_manager.channels = channels;
656 
657     ret = TransRegisterNetworkingChannelListener(SESSION_NAME, &listener);
658     if (ret != SOFTBUS_OK) {
659         goto exit;
660     }
661     g_manager.looper = GetLooper(LOOP_TYPE_DEFAULT);
662     if (g_manager.looper == NULL) {
663         TRANS_LOGE(TRANS_INIT, "fail to get looper.");
664         return SOFTBUS_LOOPER_ERR;
665     }
666     g_manager.handler.looper = g_manager.looper;
667     strcpy_s(g_manager.handler.name, strlen(PIPELINEHANDLER_NAME) + 1, PIPELINEHANDLER_NAME);
668     g_manager.handler.HandleMessage = TransProxyPipelineHandleMessage;
669     atomic_store_explicit(&(g_manager.inited), true, memory_order_release);
670     return SOFTBUS_OK;
671 exit:
672     if (channels != NULL) {
673         TRANS_LOGE(TRANS_INIT, "softbus list is not null.");
674         DestroySoftBusList(channels);
675     }
676     g_manager.channels = NULL;
677     SoftBusMutexDestroy(&g_manager.lock);
678     atomic_store_explicit(&(g_manager.inited), false, memory_order_release);
679 
680     return SOFTBUS_TRANS_INIT_FAILED;
681 }
682 #ifdef  __cplusplus
683 }
684 #endif