1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dh_transport.h"
17 
18 #include <cinttypes>
19 
20 #include "cJSON.h"
21 #include <securec.h>
22 
23 #include "anonymous_string.h"
24 #include "constants.h"
25 #include "dh_comm_tool.h"
26 #include "dh_context.h"
27 #include "dh_transport_obj.h"
28 #include "dh_utils_tool.h"
29 #include "distributed_hardware_errno.h"
30 #include "distributed_hardware_log.h"
31 
32 namespace OHOS {
33 namespace DistributedHardware {
34 #undef DH_LOG_TAG
35 #define DH_LOG_TAG "DHTransport"
36 namespace {
37 // Dsoftbus sendBytes max message length: 4MB
38 const uint32_t MAX_SEND_MSG_LENGTH = 4 * 1024 * 1024;
39 const uint32_t INTERCEPT_STRING_LENGTH = 20;
40 static QosTV g_qosInfo[] = {
41     { .qos = QOS_TYPE_MIN_BW, .value = 256 * 1024},
42     { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
43     { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
44 };
45 static uint32_t g_qosTvParamIndex = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
46 static std::weak_ptr<DHCommTool> g_dhCommToolWPtr_;
47 }
48 
DHTransport(std::shared_ptr<DHCommTool> dhCommToolPtr)49 DHTransport::DHTransport(std::shared_ptr<DHCommTool> dhCommToolPtr) : remoteDevSocketIds_({}), localServerSocket_(-1),
50     localSocketName_(""), isSocketSvrCreateFlag_(false), dhCommToolWPtr_(dhCommToolPtr)
51 {
52     DHLOGI("Ctor DHTransport");
53     g_dhCommToolWPtr_ = dhCommToolPtr;
54 }
55 
OnSocketOpened(int32_t socketId,const PeerSocketInfo & info)56 int32_t DHTransport::OnSocketOpened(int32_t socketId, const PeerSocketInfo &info)
57 {
58     DHLOGI("OnSocketOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
59         "peerPkgName: %{public}s", socketId, info.name, GetAnonyString(info.networkId).c_str(), info.pkgName);
60     std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
61     remoteDevSocketIds_[info.networkId] = socketId;
62     return DH_FWK_SUCCESS;
63 }
64 
OnSocketClosed(int32_t socketId,ShutdownReason reason)65 void DHTransport::OnSocketClosed(int32_t socketId, ShutdownReason reason)
66 {
67     DHLOGI("OnSocketClosed, socket: %{public}d, reason: %{public}d", socketId, (int32_t)reason);
68     std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
69     for (auto iter = remoteDevSocketIds_.begin(); iter != remoteDevSocketIds_.end(); ++iter) {
70         if (iter->second == socketId) {
71             remoteDevSocketIds_.erase(iter);
72             break;
73         }
74     }
75 }
76 
OnBytesReceived(int32_t socketId,const void * data,uint32_t dataLen)77 void DHTransport::OnBytesReceived(int32_t socketId, const void *data, uint32_t dataLen)
78 {
79     if (socketId < 0 || data == nullptr || dataLen == 0 || dataLen > MAX_SEND_MSG_LENGTH) {
80         DHLOGE("OnBytesReceived param check failed");
81         return;
82     }
83 
84     std::string remoteNeworkId = GetRemoteNetworkIdBySocketId(socketId);
85     if (remoteNeworkId.empty()) {
86         DHLOGE("Can not find the remote network id by socketId: %{public}d", socketId);
87         return;
88     }
89 
90     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
91     if (buf == nullptr) {
92         DHLOGE("OnBytesReceived: malloc memory failed");
93         return;
94     }
95 
96     if (memcpy_s(buf, dataLen + 1,  reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
97         DHLOGE("OnBytesReceived: memcpy memory failed");
98         free(buf);
99         return;
100     }
101 
102     std::string message(buf, buf + dataLen);
103     DHLOGI("Receive message size: %{public}" PRIu32, dataLen);
104     HandleReceiveMessage(message);
105     free(buf);
106     return;
107 }
108 
HandleReceiveMessage(const std::string & payload)109 void DHTransport::HandleReceiveMessage(const std::string &payload)
110 {
111     if (!IsMessageLengthValid(payload)) {
112         return;
113     }
114     std::string rawPayload = Decompress(payload);
115 
116     cJSON *root = cJSON_Parse(rawPayload.c_str());
117     if (root == NULL) {
118         DHLOGE("the msg is not json format");
119         return;
120     }
121     std::shared_ptr<CommMsg> commMsg = std::make_shared<CommMsg>();
122     FromJson(root, *commMsg);
123     cJSON_Delete(root);
124 
125     DHLOGI("Receive DH msg, code: %{public}d, msg: %{public}s", commMsg->code, GetAnonyString(commMsg->msg).c_str());
126     AppExecFwk::InnerEvent::Pointer msgEvent = AppExecFwk::InnerEvent::Get(commMsg->code, commMsg);
127     std::shared_ptr<DHCommTool> dhCommToolSPtr = dhCommToolWPtr_.lock();
128     if (dhCommToolSPtr == nullptr) {
129         DHLOGE("Can not get DHCommTool ptr");
130         return;
131     }
132     if (dhCommToolSPtr->GetEventHandler() == nullptr) {
133         DHLOGE("Can not get eventHandler");
134         return;
135     }
136     dhCommToolSPtr->GetEventHandler()->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE);
137 }
138 
GetDHCommToolPtr()139 std::shared_ptr<DHCommTool> GetDHCommToolPtr()
140 {
141     if (g_dhCommToolWPtr_.expired()) {
142         DHLOGE("DHCommTool Weak ptr expired");
143         return nullptr;
144     }
145 
146     std::shared_ptr<DHCommTool> dhCommToolSPtr = g_dhCommToolWPtr_.lock();
147     if (dhCommToolSPtr == nullptr) {
148         DHLOGE("Can not get DHCommTool ptr");
149         return nullptr;
150     }
151 
152     return dhCommToolSPtr;
153 }
154 
OnBind(int32_t socket,PeerSocketInfo info)155 void OnBind(int32_t socket, PeerSocketInfo info)
156 {
157     std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
158     if (dhCommToolSPtr == nullptr) {
159         DHLOGE("Can not get DHCommTool ptr");
160         return;
161     }
162     dhCommToolSPtr->GetDHTransportPtr()->OnSocketOpened(socket, info);
163 }
164 
OnShutdown(int32_t socket,ShutdownReason reason)165 void OnShutdown(int32_t socket, ShutdownReason reason)
166 {
167     std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
168     if (dhCommToolSPtr == nullptr) {
169         DHLOGE("Can not get DHCommTool ptr");
170         return;
171     }
172     dhCommToolSPtr->GetDHTransportPtr()->OnSocketClosed(socket, reason);
173 }
174 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)175 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
176 {
177     std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
178     if (dhCommToolSPtr == nullptr) {
179         DHLOGE("Can not get DHCommTool ptr");
180         return;
181     }
182     dhCommToolSPtr->GetDHTransportPtr()->OnBytesReceived(socket, data, dataLen);
183 }
184 
OnMessage(int32_t socket,const void * data,uint32_t dataLen)185 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
186 {
187     (void)socket;
188     (void)data;
189     (void)dataLen;
190     DHLOGI("socket: %{public}d, dataLen:%{public}" PRIu32, socket, dataLen);
191 }
192 
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)193 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
194     const StreamFrameInfo *param)
195 {
196     (void)socket;
197     (void)data;
198     (void)ext;
199     (void)param;
200     DHLOGI("socket: %{public}d", socket);
201 }
202 
OnFile(int32_t socket,FileEvent * event)203 void OnFile(int32_t socket, FileEvent *event)
204 {
205     (void)event;
206     DHLOGI("socket: %{public}d", socket);
207 }
208 
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)209 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
210 {
211     if (qosCount == 0 || qosCount > MAX_ROUND_SIZE) {
212         DHLOGE("qosCount is invalid!");
213         return;
214     }
215     DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}" PRIu32,
216         socket, (int32_t)eventId, qosCount);
217     for (uint32_t idx = 0; idx < qosCount; idx++) {
218         DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
219     }
220 }
221 
222 ISocketListener iSocketListener = {
223     .OnBind = OnBind,
224     .OnShutdown = OnShutdown,
225     .OnBytes = OnBytes,
226     .OnMessage = OnMessage,
227     .OnStream = OnStream,
228     .OnFile = OnFile,
229     .OnQos = OnQos
230 };
231 
CreateServerSocket()232 int32_t DHTransport::CreateServerSocket()
233 {
234     DHLOGI("CreateServerSocket start");
235     std::string networkId = GetLocalNetworkId();
236     localSocketName_ = DH_FWK_SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
237     DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
238         GetAnonyString(networkId).c_str(), localSocketName_.c_str());
239     SocketInfo info = {
240         .name = const_cast<char*>(localSocketName_.c_str()),
241         .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
242         .dataType = DATA_TYPE_BYTES
243     };
244     int32_t socket = Socket(info);
245     DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
246     return socket;
247 }
248 
CreateClientSocket(const std::string & remoteNetworkId)249 int32_t DHTransport::CreateClientSocket(const std::string &remoteNetworkId)
250 {
251     if (!IsIdLengthValid(remoteNetworkId)) {
252         return ERR_DH_FWK_PARA_INVALID;
253     }
254     DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
255     std::string peerSocketName = DH_FWK_SESSION_NAME + remoteNetworkId.substr(0, INTERCEPT_STRING_LENGTH);
256     SocketInfo info = {
257         .name = const_cast<char*>(localSocketName_.c_str()),
258         .peerName = const_cast<char*>(peerSocketName.c_str()),
259         .peerNetworkId = const_cast<char*>(remoteNetworkId.c_str()),
260         .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
261         .dataType = DATA_TYPE_BYTES
262     };
263     int32_t socket = Socket(info);
264     DHLOGI("Bind Socket server, socket: %{public}d, localSocketName: %{public}s, peerSocketName: %{public}s",
265         socket, localSocketName_.c_str(), peerSocketName.c_str());
266     return socket;
267 }
268 
Init()269 int32_t DHTransport::Init()
270 {
271     DHLOGI("Init DHTransport");
272     if (isSocketSvrCreateFlag_.load()) {
273         DHLOGI("SocketServer already create success.");
274         return DH_FWK_SUCCESS;
275     }
276     int32_t socket = CreateServerSocket();
277     if (socket < DH_FWK_SUCCESS) {
278         DHLOGE("CreateSocketServer failed, ret: %{public}d", socket);
279         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
280     }
281 
282     int32_t ret = Listen(socket, g_qosInfo, g_qosTvParamIndex, &iSocketListener);
283     if (ret != DH_FWK_SUCCESS) {
284         DHLOGE("Socket Listen failed, error code %{public}d.", ret);
285         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
286     }
287     isSocketSvrCreateFlag_.store(true);
288     localServerSocket_ = socket;
289     DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
290     return DH_FWK_SUCCESS;
291 }
292 
UnInit()293 int32_t DHTransport::UnInit()
294 {
295     {
296         std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
297         for (auto iter = remoteDevSocketIds_.begin(); iter != remoteDevSocketIds_.end(); ++iter) {
298             DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
299                 GetAnonyString(iter->first).c_str());
300             Shutdown(iter->second);
301         }
302         remoteDevSocketIds_.clear();
303     }
304 
305     if (!isSocketSvrCreateFlag_.load()) {
306         DHLOGI("DSoftBus Server Socket already remove success.");
307     } else {
308         DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
309         Shutdown(localServerSocket_.load());
310         localServerSocket_ = -1;
311         isSocketSvrCreateFlag_.store(false);
312     }
313     return DH_FWK_SUCCESS;
314 }
315 
IsDeviceSessionOpened(const std::string & remoteNetworkId,int32_t & socketId)316 bool DHTransport::IsDeviceSessionOpened(const std::string &remoteNetworkId, int32_t &socketId)
317 {
318     if (!IsIdLengthValid(remoteNetworkId)) {
319         return false;
320     }
321     std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
322     if (remoteDevSocketIds_.find(remoteNetworkId) == remoteDevSocketIds_.end()) {
323         return false;
324     }
325     socketId = remoteDevSocketIds_.at(remoteNetworkId);
326     DHLOGI("DeviceSession has opened, remoteNetworkId: %{public}s, socketId: %{public}d",
327         GetAnonyString(remoteNetworkId).c_str(), socketId);
328     return true;
329 }
330 
GetRemoteNetworkIdBySocketId(int32_t socketId)331 std::string DHTransport::GetRemoteNetworkIdBySocketId(int32_t socketId)
332 {
333     std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
334     std::string networkId = "";
335     for (auto const &item : remoteDevSocketIds_) {
336         if (item.second == socketId) {
337             networkId = item.first;
338             break;
339         }
340     }
341     return networkId;
342 }
343 
ClearDeviceSocketOpened(const std::string & remoteDevId)344 void DHTransport::ClearDeviceSocketOpened(const std::string &remoteDevId)
345 {
346     if (!IsIdLengthValid(remoteDevId)) {
347         return;
348     }
349     std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
350     remoteDevSocketIds_.erase(remoteDevId);
351 }
352 
StartSocket(const std::string & remoteNetworkId)353 int32_t DHTransport::StartSocket(const std::string &remoteNetworkId)
354 {
355     if (!IsIdLengthValid(remoteNetworkId)) {
356         return ERR_DH_FWK_PARA_INVALID;
357     }
358     int32_t socketId = -1;
359     if (IsDeviceSessionOpened(remoteNetworkId, socketId)) {
360         DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
361         return DH_FWK_SUCCESS;
362     }
363 
364     int32_t socket = CreateClientSocket(remoteNetworkId);
365     if (socket < DH_FWK_SUCCESS) {
366         DHLOGE("StartSocket failed, ret: %{public}d", socket);
367         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
368     }
369 
370     int32_t ret = Bind(socket, g_qosInfo, g_qosTvParamIndex, &iSocketListener);
371     if (ret < DH_FWK_SUCCESS) {
372         DHLOGE("OpenSession fail, remoteNetworkId: %{public}s, socket: %{public}d, ret: %{public}d",
373             GetAnonyString(remoteNetworkId).c_str(), socket, ret);
374         Shutdown(socket);
375         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
376     }
377 
378     DHLOGI("Bind Socket success, remoteNetworkId:%{public}s, socketId: %{public}d",
379         GetAnonyString(remoteNetworkId).c_str(), socket);
380     std::string peerSocketName = DH_FWK_SESSION_NAME + remoteNetworkId.substr(0, INTERCEPT_STRING_LENGTH);
381     PeerSocketInfo peerSocketInfo = {
382         .name = const_cast<char*>(peerSocketName.c_str()),
383         .networkId = const_cast<char*>(remoteNetworkId.c_str()),
384         .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
385         .dataType = DATA_TYPE_BYTES
386     };
387     OnSocketOpened(socket, peerSocketInfo);
388     return DH_FWK_SUCCESS;
389 }
390 
StopSocket(const std::string & remoteNetworkId)391 int32_t DHTransport::StopSocket(const std::string &remoteNetworkId)
392 {
393     if (!IsIdLengthValid(remoteNetworkId)) {
394         return ERR_DH_FWK_PARA_INVALID;
395     }
396     int32_t socketId = -1;
397     if (!IsDeviceSessionOpened(remoteNetworkId, socketId)) {
398         DHLOGI("remote dev may be not opened, remoteNetworkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
399         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
400     }
401 
402     DHLOGI("StopSocket remoteNetworkId: %{public}s, socketId: %{public}d",
403         GetAnonyString(remoteNetworkId).c_str(), socketId);
404     Shutdown(socketId);
405     ClearDeviceSocketOpened(remoteNetworkId);
406     return DH_FWK_SUCCESS;
407 }
408 
Send(const std::string & remoteNetworkId,const std::string & payload)409 int32_t DHTransport::Send(const std::string &remoteNetworkId, const std::string &payload)
410 {
411     if (!IsIdLengthValid(remoteNetworkId) || !IsMessageLengthValid(payload)) {
412         return ERR_DH_FWK_PARA_INVALID;
413     }
414     int32_t socketId = -1;
415     if (!IsDeviceSessionOpened(remoteNetworkId, socketId)) {
416         DHLOGI("The session is not open, target networkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
417         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
418     }
419     std::string compressedPayLoad = Compress(payload);
420     uint32_t compressedPayLoadSize = compressedPayLoad.size();
421     DHLOGI("Send payload size: %{public}" PRIu32 ", after compressed size: %{public}" PRIu32
422         ", target networkId: %{public}s, socketId: %{public}d", static_cast<uint32_t>(payload.size()),
423         compressedPayLoadSize, GetAnonyString(remoteNetworkId).c_str(), socketId);
424 
425     if (compressedPayLoadSize > MAX_SEND_MSG_LENGTH) {
426         DHLOGE("Send error: msg size: %{public}" PRIu32 " too long", compressedPayLoadSize);
427         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
428     }
429     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((compressedPayLoadSize), sizeof(uint8_t)));
430     if (buf == nullptr) {
431         DHLOGE("Send: malloc memory failed");
432         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
433     }
434 
435     if (memcpy_s(buf, compressedPayLoadSize, reinterpret_cast<const uint8_t *>(compressedPayLoad.c_str()),
436                  compressedPayLoadSize) != EOK) {
437         DHLOGE("Send: memcpy memory failed");
438         free(buf);
439         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
440     }
441 
442     int32_t ret = SendBytes(socketId, buf, compressedPayLoadSize);
443     free(buf);
444     if (ret != DH_FWK_SUCCESS) {
445         DHLOGE("dsoftbus send error, ret: %{public}d", ret);
446         return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
447     }
448     DHLOGI("Send payload success");
449     return DH_FWK_SUCCESS;
450 }
451 } // DistributedHardware
452 } // OHOS