1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dh_transport.h"
17
18 #include <cinttypes>
19
20 #include "cJSON.h"
21 #include <securec.h>
22
23 #include "anonymous_string.h"
24 #include "constants.h"
25 #include "dh_comm_tool.h"
26 #include "dh_context.h"
27 #include "dh_transport_obj.h"
28 #include "dh_utils_tool.h"
29 #include "distributed_hardware_errno.h"
30 #include "distributed_hardware_log.h"
31
32 namespace OHOS {
33 namespace DistributedHardware {
34 #undef DH_LOG_TAG
35 #define DH_LOG_TAG "DHTransport"
36 namespace {
37 // Dsoftbus sendBytes max message length: 4MB
38 const uint32_t MAX_SEND_MSG_LENGTH = 4 * 1024 * 1024;
39 const uint32_t INTERCEPT_STRING_LENGTH = 20;
40 static QosTV g_qosInfo[] = {
41 { .qos = QOS_TYPE_MIN_BW, .value = 256 * 1024},
42 { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
43 { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
44 };
45 static uint32_t g_qosTvParamIndex = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
46 static std::weak_ptr<DHCommTool> g_dhCommToolWPtr_;
47 }
48
DHTransport(std::shared_ptr<DHCommTool> dhCommToolPtr)49 DHTransport::DHTransport(std::shared_ptr<DHCommTool> dhCommToolPtr) : remoteDevSocketIds_({}), localServerSocket_(-1),
50 localSocketName_(""), isSocketSvrCreateFlag_(false), dhCommToolWPtr_(dhCommToolPtr)
51 {
52 DHLOGI("Ctor DHTransport");
53 g_dhCommToolWPtr_ = dhCommToolPtr;
54 }
55
OnSocketOpened(int32_t socketId,const PeerSocketInfo & info)56 int32_t DHTransport::OnSocketOpened(int32_t socketId, const PeerSocketInfo &info)
57 {
58 DHLOGI("OnSocketOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
59 "peerPkgName: %{public}s", socketId, info.name, GetAnonyString(info.networkId).c_str(), info.pkgName);
60 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
61 remoteDevSocketIds_[info.networkId] = socketId;
62 return DH_FWK_SUCCESS;
63 }
64
OnSocketClosed(int32_t socketId,ShutdownReason reason)65 void DHTransport::OnSocketClosed(int32_t socketId, ShutdownReason reason)
66 {
67 DHLOGI("OnSocketClosed, socket: %{public}d, reason: %{public}d", socketId, (int32_t)reason);
68 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
69 for (auto iter = remoteDevSocketIds_.begin(); iter != remoteDevSocketIds_.end(); ++iter) {
70 if (iter->second == socketId) {
71 remoteDevSocketIds_.erase(iter);
72 break;
73 }
74 }
75 }
76
OnBytesReceived(int32_t socketId,const void * data,uint32_t dataLen)77 void DHTransport::OnBytesReceived(int32_t socketId, const void *data, uint32_t dataLen)
78 {
79 if (socketId < 0 || data == nullptr || dataLen == 0 || dataLen > MAX_SEND_MSG_LENGTH) {
80 DHLOGE("OnBytesReceived param check failed");
81 return;
82 }
83
84 std::string remoteNeworkId = GetRemoteNetworkIdBySocketId(socketId);
85 if (remoteNeworkId.empty()) {
86 DHLOGE("Can not find the remote network id by socketId: %{public}d", socketId);
87 return;
88 }
89
90 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
91 if (buf == nullptr) {
92 DHLOGE("OnBytesReceived: malloc memory failed");
93 return;
94 }
95
96 if (memcpy_s(buf, dataLen + 1, reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
97 DHLOGE("OnBytesReceived: memcpy memory failed");
98 free(buf);
99 return;
100 }
101
102 std::string message(buf, buf + dataLen);
103 DHLOGI("Receive message size: %{public}" PRIu32, dataLen);
104 HandleReceiveMessage(message);
105 free(buf);
106 return;
107 }
108
HandleReceiveMessage(const std::string & payload)109 void DHTransport::HandleReceiveMessage(const std::string &payload)
110 {
111 if (!IsMessageLengthValid(payload)) {
112 return;
113 }
114 std::string rawPayload = Decompress(payload);
115
116 cJSON *root = cJSON_Parse(rawPayload.c_str());
117 if (root == NULL) {
118 DHLOGE("the msg is not json format");
119 return;
120 }
121 std::shared_ptr<CommMsg> commMsg = std::make_shared<CommMsg>();
122 FromJson(root, *commMsg);
123 cJSON_Delete(root);
124
125 DHLOGI("Receive DH msg, code: %{public}d, msg: %{public}s", commMsg->code, GetAnonyString(commMsg->msg).c_str());
126 AppExecFwk::InnerEvent::Pointer msgEvent = AppExecFwk::InnerEvent::Get(commMsg->code, commMsg);
127 std::shared_ptr<DHCommTool> dhCommToolSPtr = dhCommToolWPtr_.lock();
128 if (dhCommToolSPtr == nullptr) {
129 DHLOGE("Can not get DHCommTool ptr");
130 return;
131 }
132 if (dhCommToolSPtr->GetEventHandler() == nullptr) {
133 DHLOGE("Can not get eventHandler");
134 return;
135 }
136 dhCommToolSPtr->GetEventHandler()->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE);
137 }
138
GetDHCommToolPtr()139 std::shared_ptr<DHCommTool> GetDHCommToolPtr()
140 {
141 if (g_dhCommToolWPtr_.expired()) {
142 DHLOGE("DHCommTool Weak ptr expired");
143 return nullptr;
144 }
145
146 std::shared_ptr<DHCommTool> dhCommToolSPtr = g_dhCommToolWPtr_.lock();
147 if (dhCommToolSPtr == nullptr) {
148 DHLOGE("Can not get DHCommTool ptr");
149 return nullptr;
150 }
151
152 return dhCommToolSPtr;
153 }
154
OnBind(int32_t socket,PeerSocketInfo info)155 void OnBind(int32_t socket, PeerSocketInfo info)
156 {
157 std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
158 if (dhCommToolSPtr == nullptr) {
159 DHLOGE("Can not get DHCommTool ptr");
160 return;
161 }
162 dhCommToolSPtr->GetDHTransportPtr()->OnSocketOpened(socket, info);
163 }
164
OnShutdown(int32_t socket,ShutdownReason reason)165 void OnShutdown(int32_t socket, ShutdownReason reason)
166 {
167 std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
168 if (dhCommToolSPtr == nullptr) {
169 DHLOGE("Can not get DHCommTool ptr");
170 return;
171 }
172 dhCommToolSPtr->GetDHTransportPtr()->OnSocketClosed(socket, reason);
173 }
174
OnBytes(int32_t socket,const void * data,uint32_t dataLen)175 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
176 {
177 std::shared_ptr<DHCommTool> dhCommToolSPtr = GetDHCommToolPtr();
178 if (dhCommToolSPtr == nullptr) {
179 DHLOGE("Can not get DHCommTool ptr");
180 return;
181 }
182 dhCommToolSPtr->GetDHTransportPtr()->OnBytesReceived(socket, data, dataLen);
183 }
184
OnMessage(int32_t socket,const void * data,uint32_t dataLen)185 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
186 {
187 (void)socket;
188 (void)data;
189 (void)dataLen;
190 DHLOGI("socket: %{public}d, dataLen:%{public}" PRIu32, socket, dataLen);
191 }
192
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)193 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
194 const StreamFrameInfo *param)
195 {
196 (void)socket;
197 (void)data;
198 (void)ext;
199 (void)param;
200 DHLOGI("socket: %{public}d", socket);
201 }
202
OnFile(int32_t socket,FileEvent * event)203 void OnFile(int32_t socket, FileEvent *event)
204 {
205 (void)event;
206 DHLOGI("socket: %{public}d", socket);
207 }
208
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)209 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
210 {
211 if (qosCount == 0 || qosCount > MAX_ROUND_SIZE) {
212 DHLOGE("qosCount is invalid!");
213 return;
214 }
215 DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}" PRIu32,
216 socket, (int32_t)eventId, qosCount);
217 for (uint32_t idx = 0; idx < qosCount; idx++) {
218 DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
219 }
220 }
221
222 ISocketListener iSocketListener = {
223 .OnBind = OnBind,
224 .OnShutdown = OnShutdown,
225 .OnBytes = OnBytes,
226 .OnMessage = OnMessage,
227 .OnStream = OnStream,
228 .OnFile = OnFile,
229 .OnQos = OnQos
230 };
231
CreateServerSocket()232 int32_t DHTransport::CreateServerSocket()
233 {
234 DHLOGI("CreateServerSocket start");
235 std::string networkId = GetLocalNetworkId();
236 localSocketName_ = DH_FWK_SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
237 DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
238 GetAnonyString(networkId).c_str(), localSocketName_.c_str());
239 SocketInfo info = {
240 .name = const_cast<char*>(localSocketName_.c_str()),
241 .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
242 .dataType = DATA_TYPE_BYTES
243 };
244 int32_t socket = Socket(info);
245 DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
246 return socket;
247 }
248
CreateClientSocket(const std::string & remoteNetworkId)249 int32_t DHTransport::CreateClientSocket(const std::string &remoteNetworkId)
250 {
251 if (!IsIdLengthValid(remoteNetworkId)) {
252 return ERR_DH_FWK_PARA_INVALID;
253 }
254 DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
255 std::string peerSocketName = DH_FWK_SESSION_NAME + remoteNetworkId.substr(0, INTERCEPT_STRING_LENGTH);
256 SocketInfo info = {
257 .name = const_cast<char*>(localSocketName_.c_str()),
258 .peerName = const_cast<char*>(peerSocketName.c_str()),
259 .peerNetworkId = const_cast<char*>(remoteNetworkId.c_str()),
260 .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
261 .dataType = DATA_TYPE_BYTES
262 };
263 int32_t socket = Socket(info);
264 DHLOGI("Bind Socket server, socket: %{public}d, localSocketName: %{public}s, peerSocketName: %{public}s",
265 socket, localSocketName_.c_str(), peerSocketName.c_str());
266 return socket;
267 }
268
Init()269 int32_t DHTransport::Init()
270 {
271 DHLOGI("Init DHTransport");
272 if (isSocketSvrCreateFlag_.load()) {
273 DHLOGI("SocketServer already create success.");
274 return DH_FWK_SUCCESS;
275 }
276 int32_t socket = CreateServerSocket();
277 if (socket < DH_FWK_SUCCESS) {
278 DHLOGE("CreateSocketServer failed, ret: %{public}d", socket);
279 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
280 }
281
282 int32_t ret = Listen(socket, g_qosInfo, g_qosTvParamIndex, &iSocketListener);
283 if (ret != DH_FWK_SUCCESS) {
284 DHLOGE("Socket Listen failed, error code %{public}d.", ret);
285 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
286 }
287 isSocketSvrCreateFlag_.store(true);
288 localServerSocket_ = socket;
289 DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
290 return DH_FWK_SUCCESS;
291 }
292
UnInit()293 int32_t DHTransport::UnInit()
294 {
295 {
296 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
297 for (auto iter = remoteDevSocketIds_.begin(); iter != remoteDevSocketIds_.end(); ++iter) {
298 DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
299 GetAnonyString(iter->first).c_str());
300 Shutdown(iter->second);
301 }
302 remoteDevSocketIds_.clear();
303 }
304
305 if (!isSocketSvrCreateFlag_.load()) {
306 DHLOGI("DSoftBus Server Socket already remove success.");
307 } else {
308 DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
309 Shutdown(localServerSocket_.load());
310 localServerSocket_ = -1;
311 isSocketSvrCreateFlag_.store(false);
312 }
313 return DH_FWK_SUCCESS;
314 }
315
IsDeviceSessionOpened(const std::string & remoteNetworkId,int32_t & socketId)316 bool DHTransport::IsDeviceSessionOpened(const std::string &remoteNetworkId, int32_t &socketId)
317 {
318 if (!IsIdLengthValid(remoteNetworkId)) {
319 return false;
320 }
321 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
322 if (remoteDevSocketIds_.find(remoteNetworkId) == remoteDevSocketIds_.end()) {
323 return false;
324 }
325 socketId = remoteDevSocketIds_.at(remoteNetworkId);
326 DHLOGI("DeviceSession has opened, remoteNetworkId: %{public}s, socketId: %{public}d",
327 GetAnonyString(remoteNetworkId).c_str(), socketId);
328 return true;
329 }
330
GetRemoteNetworkIdBySocketId(int32_t socketId)331 std::string DHTransport::GetRemoteNetworkIdBySocketId(int32_t socketId)
332 {
333 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
334 std::string networkId = "";
335 for (auto const &item : remoteDevSocketIds_) {
336 if (item.second == socketId) {
337 networkId = item.first;
338 break;
339 }
340 }
341 return networkId;
342 }
343
ClearDeviceSocketOpened(const std::string & remoteDevId)344 void DHTransport::ClearDeviceSocketOpened(const std::string &remoteDevId)
345 {
346 if (!IsIdLengthValid(remoteDevId)) {
347 return;
348 }
349 std::lock_guard<std::mutex> lock(rmtSocketIdMtx_);
350 remoteDevSocketIds_.erase(remoteDevId);
351 }
352
StartSocket(const std::string & remoteNetworkId)353 int32_t DHTransport::StartSocket(const std::string &remoteNetworkId)
354 {
355 if (!IsIdLengthValid(remoteNetworkId)) {
356 return ERR_DH_FWK_PARA_INVALID;
357 }
358 int32_t socketId = -1;
359 if (IsDeviceSessionOpened(remoteNetworkId, socketId)) {
360 DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
361 return DH_FWK_SUCCESS;
362 }
363
364 int32_t socket = CreateClientSocket(remoteNetworkId);
365 if (socket < DH_FWK_SUCCESS) {
366 DHLOGE("StartSocket failed, ret: %{public}d", socket);
367 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
368 }
369
370 int32_t ret = Bind(socket, g_qosInfo, g_qosTvParamIndex, &iSocketListener);
371 if (ret < DH_FWK_SUCCESS) {
372 DHLOGE("OpenSession fail, remoteNetworkId: %{public}s, socket: %{public}d, ret: %{public}d",
373 GetAnonyString(remoteNetworkId).c_str(), socket, ret);
374 Shutdown(socket);
375 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
376 }
377
378 DHLOGI("Bind Socket success, remoteNetworkId:%{public}s, socketId: %{public}d",
379 GetAnonyString(remoteNetworkId).c_str(), socket);
380 std::string peerSocketName = DH_FWK_SESSION_NAME + remoteNetworkId.substr(0, INTERCEPT_STRING_LENGTH);
381 PeerSocketInfo peerSocketInfo = {
382 .name = const_cast<char*>(peerSocketName.c_str()),
383 .networkId = const_cast<char*>(remoteNetworkId.c_str()),
384 .pkgName = const_cast<char*>(DH_FWK_PKG_NAME.c_str()),
385 .dataType = DATA_TYPE_BYTES
386 };
387 OnSocketOpened(socket, peerSocketInfo);
388 return DH_FWK_SUCCESS;
389 }
390
StopSocket(const std::string & remoteNetworkId)391 int32_t DHTransport::StopSocket(const std::string &remoteNetworkId)
392 {
393 if (!IsIdLengthValid(remoteNetworkId)) {
394 return ERR_DH_FWK_PARA_INVALID;
395 }
396 int32_t socketId = -1;
397 if (!IsDeviceSessionOpened(remoteNetworkId, socketId)) {
398 DHLOGI("remote dev may be not opened, remoteNetworkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
399 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
400 }
401
402 DHLOGI("StopSocket remoteNetworkId: %{public}s, socketId: %{public}d",
403 GetAnonyString(remoteNetworkId).c_str(), socketId);
404 Shutdown(socketId);
405 ClearDeviceSocketOpened(remoteNetworkId);
406 return DH_FWK_SUCCESS;
407 }
408
Send(const std::string & remoteNetworkId,const std::string & payload)409 int32_t DHTransport::Send(const std::string &remoteNetworkId, const std::string &payload)
410 {
411 if (!IsIdLengthValid(remoteNetworkId) || !IsMessageLengthValid(payload)) {
412 return ERR_DH_FWK_PARA_INVALID;
413 }
414 int32_t socketId = -1;
415 if (!IsDeviceSessionOpened(remoteNetworkId, socketId)) {
416 DHLOGI("The session is not open, target networkId: %{public}s", GetAnonyString(remoteNetworkId).c_str());
417 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
418 }
419 std::string compressedPayLoad = Compress(payload);
420 uint32_t compressedPayLoadSize = compressedPayLoad.size();
421 DHLOGI("Send payload size: %{public}" PRIu32 ", after compressed size: %{public}" PRIu32
422 ", target networkId: %{public}s, socketId: %{public}d", static_cast<uint32_t>(payload.size()),
423 compressedPayLoadSize, GetAnonyString(remoteNetworkId).c_str(), socketId);
424
425 if (compressedPayLoadSize > MAX_SEND_MSG_LENGTH) {
426 DHLOGE("Send error: msg size: %{public}" PRIu32 " too long", compressedPayLoadSize);
427 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
428 }
429 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((compressedPayLoadSize), sizeof(uint8_t)));
430 if (buf == nullptr) {
431 DHLOGE("Send: malloc memory failed");
432 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
433 }
434
435 if (memcpy_s(buf, compressedPayLoadSize, reinterpret_cast<const uint8_t *>(compressedPayLoad.c_str()),
436 compressedPayLoadSize) != EOK) {
437 DHLOGE("Send: memcpy memory failed");
438 free(buf);
439 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
440 }
441
442 int32_t ret = SendBytes(socketId, buf, compressedPayLoadSize);
443 free(buf);
444 if (ret != DH_FWK_SUCCESS) {
445 DHLOGE("dsoftbus send error, ret: %{public}d", ret);
446 return ERR_DH_FWK_COMPONENT_TRANSPORT_OPT_FAILED;
447 }
448 DHLOGI("Send payload success");
449 return DH_FWK_SUCCESS;
450 }
451 } // DistributedHardware
452 } // OHOS