1 /*
2 * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "distributed_input_transport_base.h"
17
18 #include <algorithm>
19 #include <cstring>
20
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "hidumper.h"
34
35 #include "softbus_common.h"
36
37 namespace OHOS {
38 namespace DistributedHardware {
39 namespace DistributedInput {
40 namespace {
41 const int32_t SESSION_STATUS_OPENED = 0;
42 const int32_t SESSION_STATUS_CLOSED = 1;
43 static QosTV g_qosInfo[] = {
44 { .qos = QOS_TYPE_MIN_BW, .value = 80 * 1024 * 1024},
45 { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
46 { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
47 };
48 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
49 }
50 IMPLEMENT_SINGLE_INSTANCE(DistributedInputTransportBase);
~DistributedInputTransportBase()51 DistributedInputTransportBase::~DistributedInputTransportBase()
52 {
53 DHLOGI("Release Transport Session");
54 Release();
55 }
56
OnBind(int32_t socket,PeerSocketInfo info)57 void OnBind(int32_t socket, PeerSocketInfo info)
58 {
59 DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(socket, info);
60 }
61
OnShutdown(int32_t socket,ShutdownReason reason)62 void OnShutdown(int32_t socket, ShutdownReason reason)
63 {
64 DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(socket, reason);
65 }
66
OnBytes(int32_t socket,const void * data,uint32_t dataLen)67 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
68 {
69 DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(socket, data, dataLen);
70 }
71
OnMessage(int32_t socket,const void * data,uint32_t dataLen)72 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
73 {
74 (void)socket;
75 (void)data;
76 (void)dataLen;
77 DHLOGI("socket: %{public}d, dataLen:%{public}d", socket, dataLen);
78 }
79
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)80 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
81 const StreamFrameInfo *param)
82 {
83 (void)socket;
84 (void)data;
85 (void)ext;
86 (void)param;
87 DHLOGI("socket: %{public}d", socket);
88 }
89
OnFile(int32_t socket,FileEvent * event)90 void OnFile(int32_t socket, FileEvent *event)
91 {
92 (void)event;
93 DHLOGI("socket: %{public}d", socket);
94 }
95
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)96 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
97 {
98 DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}u", socket, (int32_t)eventId, qosCount);
99 for (uint32_t idx = 0; idx < qosCount; idx++) {
100 DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
101 }
102 }
103
104 ISocketListener iSocketListener = {
105 .OnBind = OnBind,
106 .OnShutdown = OnShutdown,
107 .OnBytes = OnBytes,
108 .OnMessage = OnMessage,
109 .OnStream = OnStream,
110 .OnFile = OnFile,
111 .OnQos = OnQos
112 };
113
Init()114 int32_t DistributedInputTransportBase::Init()
115 {
116 DHLOGI("Init Transport Base Session");
117 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
118 if (isSessSerCreateFlag_.load()) {
119 DHLOGI("SessionServer already create success.");
120 return DH_SUCCESS;
121 }
122 int32_t socket = CreateServerSocket();
123 if (socket < DH_SUCCESS) {
124 DHLOGE("CreateServerSocket failed, ret: %{public}d", socket);
125 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
126 }
127
128 int32_t ret = Listen(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
129 if (ret != DH_SUCCESS) {
130 DHLOGE("Socket Listen failed, error code %{public}d.", ret);
131 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
132 }
133 isSessSerCreateFlag_.store(true);
134 localServerSocket_ = socket;
135 DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
136 return DH_SUCCESS;
137 }
138
CreateServerSocket()139 int32_t DistributedInputTransportBase::CreateServerSocket()
140 {
141 DHLOGI("CreateServerSocket start");
142 auto localNode = std::make_unique<NodeBasicInfo>();
143 int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
144 if (retCode != DH_SUCCESS) {
145 DHLOGE("Init Could not get local device id.");
146 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
147 }
148 std::string networkId = localNode->networkId;
149 localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
150 DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
151 GetAnonyString(networkId).c_str(), localSessionName_.c_str());
152 SocketInfo info = {
153 .name = const_cast<char*>(localSessionName_.c_str()),
154 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
155 .dataType = DATA_TYPE_BYTES
156 };
157 int32_t socket = Socket(info);
158 DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
159 return socket;
160 }
161
Release()162 void DistributedInputTransportBase::Release()
163 {
164 std::unique_lock<std::mutex> sessionLock(operationMutex_);
165 auto iter = remoteDevSessionMap_.begin();
166 for (; iter != remoteDevSessionMap_.end(); ++iter) {
167 DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
168 GetAnonyString(iter->first).c_str());
169 Shutdown(iter->second);
170 }
171
172 {
173 std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
174 if (!isSessSerCreateFlag_.load()) {
175 DHLOGI("DSoftBus Server Socket already remove success.");
176 } else {
177 DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
178 Shutdown(localServerSocket_.load());
179 localServerSocket_ = -1;
180 isSessSerCreateFlag_.store(false);
181 }
182 }
183 remoteDevSessionMap_.clear();
184 channelStatusMap_.clear();
185 }
186
CheckDeviceSessionState(const std::string & remoteDevId)187 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
188 {
189 std::unique_lock<std::mutex> sessionLock(operationMutex_);
190 if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
191 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
192 }
193 DHLOGI("CheckDeviceSessionState has opened, remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
194 return DH_SUCCESS;
195 }
196
GetDevIdBySessionId(int32_t sessionId)197 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
198 {
199 std::unique_lock<std::mutex> sessionLock(operationMutex_);
200 for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
201 if (iter->second == sessionId) {
202 return iter->first;
203 }
204 }
205 return "";
206 }
207
CreateClientSocket(const std::string & remoteDevId)208 int32_t DistributedInputTransportBase::CreateClientSocket(const std::string &remoteDevId)
209 {
210 DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteDevId).c_str());
211 std::string localSesionName = localSessionName_ + "_" + std::to_string(GetCurrentTimeUs());
212 std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
213 SocketInfo info = {
214 .name = const_cast<char*>(localSesionName.c_str()),
215 .peerName = const_cast<char*>(peerSessionName.c_str()),
216 .peerNetworkId = const_cast<char*>(remoteDevId.c_str()),
217 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
218 .dataType = DATA_TYPE_BYTES
219 };
220 int32_t socket = Socket(info);
221 DHLOGI("Bind Socket server, socket: %{public}d, localSessionName: %{public}s, peerSessionName: %{public}s",
222 socket, localSesionName.c_str(), peerSessionName.c_str());
223 return socket;
224 }
225
StartSession(const std::string & remoteDevId)226 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
227 {
228 int32_t ret = CheckDeviceSessionState(remoteDevId);
229 if (ret == DH_SUCCESS) {
230 DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteDevId).c_str());
231 return DH_SUCCESS;
232 }
233
234 int socket = CreateClientSocket(remoteDevId);
235 if (socket < DH_SUCCESS) {
236 DHLOGE("StartSession failed, ret: %{public}d", socket);
237 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
238 }
239 StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
240 ret = Bind(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
241 if (ret < DH_SUCCESS) {
242 DHLOGE("OpenSession fail, remoteDevId: %{public}s, socket: %{public}d", GetAnonyString(remoteDevId).c_str(),
243 socket);
244 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
245 Shutdown(socket);
246 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
247 }
248
249 std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
250 HiDumper::GetInstance().CreateSessionInfo(remoteDevId, socket, localSessionName_, peerSessionName,
251 SessionStatus::OPENED);
252 DHLOGI("OpenSession success, remoteDevId:%{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(),
253 socket);
254 sessionId_ = socket;
255
256 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
257 if (dhFwkKit != nullptr) {
258 DHLOGD("Enable low Latency!");
259 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
260 }
261
262 PeerSocketInfo peerSocketInfo = {
263 .name = const_cast<char*>(peerSessionName.c_str()),
264 .networkId = const_cast<char*>(remoteDevId.c_str()),
265 .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
266 .dataType = DATA_TYPE_BYTES
267 };
268 OnSessionOpened(socket, peerSocketInfo);
269 return DH_SUCCESS;
270 }
271
GetCurrentSessionId()272 int32_t DistributedInputTransportBase::GetCurrentSessionId()
273 {
274 return sessionId_;
275 }
276
StopAllSession()277 void DistributedInputTransportBase::StopAllSession()
278 {
279 std::map<std::string, int32_t> remoteDevSessions;
280 {
281 std::unique_lock<std::mutex> sessionLock(operationMutex_);
282 std::for_each(remoteDevSessionMap_.begin(), remoteDevSessionMap_.end(),
283 [&remoteDevSessions] (const std::pair<std::string, int32_t> &pair) {
284 remoteDevSessions[pair.first] = pair.second;
285 });
286 }
287
288 std::for_each(remoteDevSessions.begin(), remoteDevSessions.end(),
289 [this](const std::pair<std::string, int32_t> &pair) {
290 StopSession(pair.first);
291 });
292 }
293
StopSession(const std::string & remoteDevId)294 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
295 {
296 std::unique_lock<std::mutex> sessionLock(operationMutex_);
297 if (remoteDevSessionMap_.count(remoteDevId) == 0) {
298 DHLOGE("remoteDevSessionMap not find remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
299 return;
300 }
301 int32_t sessionId = remoteDevSessionMap_[remoteDevId];
302
303 DHLOGI("RemoteDevId: %{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(), sessionId);
304 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
305 Shutdown(sessionId);
306 remoteDevSessionMap_.erase(remoteDevId);
307 channelStatusMap_.erase(remoteDevId);
308
309 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
310 if (dhFwkKit != nullptr) {
311 DHLOGD("Disable low Latency!");
312 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
313 }
314
315 HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
316 HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
317 }
318
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)319 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
320 std::shared_ptr<DInputTransbaseSourceCallback> callback)
321 {
322 DHLOGI("RegisterSrcHandleSessionCallback");
323 srcCallback_ = callback;
324 }
325
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)326 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
327 std::shared_ptr<DInputTransbaseSinkCallback> callback)
328 {
329 DHLOGI("RegisterSinkHandleSessionCallback");
330 sinkCallback_ = callback;
331 }
332
RegisterSourceManagerCallback(std::shared_ptr<DInputSourceManagerCallback> callback)333 void DistributedInputTransportBase::RegisterSourceManagerCallback(
334 std::shared_ptr<DInputSourceManagerCallback> callback)
335 {
336 DHLOGI("RegisterSourceManagerCallback");
337 srcMgrCallback_ = callback;
338 }
339
RegisterSinkManagerCallback(std::shared_ptr<DInputSinkManagerCallback> callback)340 void DistributedInputTransportBase::RegisterSinkManagerCallback(
341 std::shared_ptr<DInputSinkManagerCallback> callback)
342 {
343 DHLOGI("RegisterSinkManagerCallback");
344 sinkMgrCallback_ = callback;
345 }
346
RegisterSessionStateCb(sptr<ISessionStateCallback> callback)347 void DistributedInputTransportBase::RegisterSessionStateCb(sptr<ISessionStateCallback> callback)
348 {
349 DHLOGI("RegisterSessionStateCb");
350 SessionStateCallback_ = callback;
351 }
352
UnregisterSessionStateCb()353 void DistributedInputTransportBase::UnregisterSessionStateCb()
354 {
355 DHLOGI("UnregisterSessionStateCb");
356 SessionStateCallback_ = nullptr;
357 }
358
RunSessionStateCallback(const std::string & remoteDevId,const uint32_t sessionState)359 void DistributedInputTransportBase::RunSessionStateCallback(const std::string &remoteDevId,
360 const uint32_t sessionState)
361 {
362 DHLOGI("RunSessionStateCallback start.");
363 if (SessionStateCallback_ != nullptr) {
364 SessionStateCallback_->OnResult(remoteDevId, sessionState);
365 return;
366 }
367 DHLOGE("RunSessionStateCallback SessionStateCallback_ is null.");
368 }
369
CountSession(const std::string & remoteDevId)370 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
371 {
372 return remoteDevSessionMap_.count(remoteDevId);
373 }
374
EraseSessionId(const std::string & remoteDevId)375 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
376 {
377 remoteDevSessionMap_.erase(remoteDevId);
378 }
379
OnSessionOpened(int32_t sessionId,const PeerSocketInfo & info)380 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, const PeerSocketInfo &info)
381 {
382 std::string peerDevId;
383 peerDevId.assign(info.networkId);
384 DHLOGI("OnSessionOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
385 "peerPkgName: %{public}s", sessionId, info.name, GetAnonyString(peerDevId).c_str(), info.pkgName);
386 FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
387
388 {
389 std::unique_lock<std::mutex> sessionLock(operationMutex_);
390 remoteDevSessionMap_[peerDevId] = sessionId;
391 channelStatusMap_[peerDevId] = true;
392 }
393 RunSessionStateCallback(peerDevId, SESSION_STATUS_OPENED);
394 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
395 if (dhFwkKit != nullptr) {
396 DHLOGD("Enable low Latency!");
397 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
398 }
399 DHLOGI("OnSessionOpened finish");
400 return DH_SUCCESS;
401 }
402
OnSessionClosed(int32_t sessionId,ShutdownReason reason)403 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId, ShutdownReason reason)
404 {
405 DHLOGI("OnSessionClosed, socket: %{public}d, reason: %{public}d", sessionId, (int32_t)reason);
406 std::string deviceId = GetDevIdBySessionId(sessionId);
407 DHLOGI("OnSessionClosed notify session closed, sessionId: %{public}d, peer deviceId:%{public}s",
408 sessionId, GetAnonyString(deviceId).c_str());
409 RunSessionStateCallback(deviceId, SESSION_STATUS_CLOSED);
410
411 {
412 std::unique_lock<std::mutex> sessionLock(operationMutex_);
413 if (CountSession(deviceId) > 0) {
414 EraseSessionId(deviceId);
415 }
416 channelStatusMap_.erase(deviceId);
417
418 if (sinkCallback_ == nullptr) {
419 DHLOGE("sinkCallback is nullptr.");
420 return;
421 }
422 sinkCallback_->NotifySessionClosed(sessionId);
423
424 if (srcCallback_ == nullptr) {
425 DHLOGE("srcCallback is nullptr.");
426 return;
427 }
428 srcCallback_->NotifySessionClosed();
429
430 if (srcMgrCallback_ == nullptr) {
431 DHLOGE("srcMgrCallback is nullptr.");
432 return;
433 }
434 srcMgrCallback_->ResetSrcMgrResStatus();
435
436 if (sinkMgrCallback_ == nullptr) {
437 DHLOGE("sinkMgrCallback is nullptr.");
438 return;
439 }
440 sinkMgrCallback_->ResetSinkMgrResStatus();
441 }
442
443 std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
444 if (dhFwkKit != nullptr) {
445 DHLOGD("Disable low Latency!");
446 dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
447 }
448 DHLOGI("OnSessionClosed finish");
449 }
450
CheckRecivedData(const std::string & message)451 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
452 {
453 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
454 if (recMsg.is_discarded()) {
455 DHLOGE("OnBytesReceived jsonStr error.");
456 return false;
457 }
458
459 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
460 DHLOGE("The key is invalid.");
461 return false;
462 }
463
464 return true;
465 }
466
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)467 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
468 {
469 if (sessionId < 0 || data == nullptr || dataLen == 0 || dataLen > MSG_MAX_SIZE) {
470 DHLOGE("OnBytesReceived param check failed");
471 return;
472 }
473
474 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
475 if (buf == nullptr) {
476 DHLOGE("OnBytesReceived: malloc memory failed");
477 return;
478 }
479
480 if (memcpy_s(buf, dataLen + 1, reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
481 DHLOGE("OnBytesReceived: memcpy memory failed");
482 free(buf);
483 return;
484 }
485
486 std::string message(buf, buf + dataLen);
487 HandleSession(sessionId, message);
488
489 free(buf);
490 return;
491 }
492
HandleSession(int32_t sessionId,const std::string & message)493 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
494 {
495 if (CheckRecivedData(message) != true) {
496 return;
497 }
498 nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
499 if (recMsg.is_discarded()) {
500 DHLOGE("recMsg parse failed!");
501 return;
502 }
503 if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
504 DHLOGE("softbus cmd key is invalid");
505 return;
506 }
507 uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
508 if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
509 if (srcCallback_ == nullptr) {
510 DHLOGE("srcCallback is nullptr.");
511 return;
512 }
513 srcCallback_->HandleSessionData(sessionId, message);
514 return;
515 }
516 if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
517 if (sinkCallback_ == nullptr) {
518 DHLOGE("sinkCallback is nullptr.");
519 return;
520 }
521 sinkCallback_->HandleSessionData(sessionId, message);
522 }
523 }
524
SendMsg(int32_t sessionId,std::string & message)525 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
526 {
527 if (message.size() > MSG_MAX_SIZE) {
528 DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE, msg size: %{public}zu", message.size());
529 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
530 }
531 uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
532 if (buf == nullptr) {
533 DHLOGE("SendMsg: malloc memory failed");
534 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
535 }
536 int32_t outLen = 0;
537 if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
538 DHLOGE("SendMsg: memcpy memory failed");
539 free(buf);
540 return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
541 }
542 outLen = static_cast<int32_t>(message.size());
543 int32_t ret = SendBytes(sessionId, buf, outLen);
544 free(buf);
545 return ret;
546 }
547
GetSessionIdByDevId(const std::string & srcId)548 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
549 {
550 std::unique_lock<std::mutex> sessionLock(operationMutex_);
551 std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
552 if (it != remoteDevSessionMap_.end()) {
553 return it->second;
554 }
555 DHLOGE("get session id failed, srcId = %{public}s", GetAnonyString(srcId).c_str());
556 return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
557 }
558 } // namespace DistributedInput
559 } // namespace DistributedHardware
560 } // namespace OHOS