1 /*
2  * Copyright (c) 2022-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "distributed_input_transport_base.h"
17 
18 #include <algorithm>
19 #include <cstring>
20 
21 #include "distributed_hardware_fwk_kit.h"
22 #include "ipc_skeleton.h"
23 #include "iservice_registry.h"
24 #include "system_ability_definition.h"
25 
26 #include "constants_dinput.h"
27 #include "dinput_context.h"
28 #include "dinput_errcode.h"
29 #include "dinput_hitrace.h"
30 #include "dinput_log.h"
31 #include "dinput_softbus_define.h"
32 #include "dinput_utils_tool.h"
33 #include "hidumper.h"
34 
35 #include "softbus_common.h"
36 
37 namespace OHOS {
38 namespace DistributedHardware {
39 namespace DistributedInput {
40 namespace {
41 const int32_t SESSION_STATUS_OPENED = 0;
42 const int32_t SESSION_STATUS_CLOSED = 1;
43 static QosTV g_qosInfo[] = {
44     { .qos = QOS_TYPE_MIN_BW, .value = 80 * 1024 * 1024},
45     { .qos = QOS_TYPE_MAX_LATENCY, .value = 8000 },
46     { .qos = QOS_TYPE_MIN_LATENCY, .value = 2000 }
47 };
48 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(g_qosInfo[0]));
49 }
50 IMPLEMENT_SINGLE_INSTANCE(DistributedInputTransportBase);
~DistributedInputTransportBase()51 DistributedInputTransportBase::~DistributedInputTransportBase()
52 {
53     DHLOGI("Release Transport Session");
54     Release();
55 }
56 
OnBind(int32_t socket,PeerSocketInfo info)57 void OnBind(int32_t socket, PeerSocketInfo info)
58 {
59     DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionOpened(socket, info);
60 }
61 
OnShutdown(int32_t socket,ShutdownReason reason)62 void OnShutdown(int32_t socket, ShutdownReason reason)
63 {
64     DistributedInput::DistributedInputTransportBase::GetInstance().OnSessionClosed(socket, reason);
65 }
66 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)67 void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
68 {
69     DistributedInput::DistributedInputTransportBase::GetInstance().OnBytesReceived(socket, data, dataLen);
70 }
71 
OnMessage(int32_t socket,const void * data,uint32_t dataLen)72 void OnMessage(int32_t socket, const void *data, uint32_t dataLen)
73 {
74     (void)socket;
75     (void)data;
76     (void)dataLen;
77     DHLOGI("socket: %{public}d, dataLen:%{public}d", socket, dataLen);
78 }
79 
OnStream(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)80 void OnStream(int32_t socket, const StreamData *data, const StreamData *ext,
81     const StreamFrameInfo *param)
82 {
83     (void)socket;
84     (void)data;
85     (void)ext;
86     (void)param;
87     DHLOGI("socket: %{public}d", socket);
88 }
89 
OnFile(int32_t socket,FileEvent * event)90 void OnFile(int32_t socket, FileEvent *event)
91 {
92     (void)event;
93     DHLOGI("socket: %{public}d", socket);
94 }
95 
OnQos(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)96 void OnQos(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
97 {
98     DHLOGI("OnQos, socket: %{public}d, QoSEvent: %{public}d, qosCount: %{public}u", socket, (int32_t)eventId, qosCount);
99     for (uint32_t idx = 0; idx < qosCount; idx++) {
100         DHLOGI("QosTV: type: %{public}d, value: %{public}d", (int32_t)qos[idx].qos, qos[idx].value);
101     }
102 }
103 
104 ISocketListener iSocketListener = {
105     .OnBind = OnBind,
106     .OnShutdown = OnShutdown,
107     .OnBytes = OnBytes,
108     .OnMessage = OnMessage,
109     .OnStream = OnStream,
110     .OnFile = OnFile,
111     .OnQos = OnQos
112 };
113 
Init()114 int32_t DistributedInputTransportBase::Init()
115 {
116     DHLOGI("Init Transport Base Session");
117     std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
118     if (isSessSerCreateFlag_.load()) {
119         DHLOGI("SessionServer already create success.");
120         return DH_SUCCESS;
121     }
122     int32_t socket = CreateServerSocket();
123     if (socket < DH_SUCCESS) {
124         DHLOGE("CreateServerSocket failed, ret: %{public}d", socket);
125         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
126     }
127 
128     int32_t ret = Listen(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
129     if (ret != DH_SUCCESS) {
130         DHLOGE("Socket Listen failed, error code %{public}d.", ret);
131         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
132     }
133     isSessSerCreateFlag_.store(true);
134     localServerSocket_ = socket;
135     DHLOGI("Finish Init DSoftBus Server Socket, socket: %{public}d", socket);
136     return DH_SUCCESS;
137 }
138 
CreateServerSocket()139 int32_t DistributedInputTransportBase::CreateServerSocket()
140 {
141     DHLOGI("CreateServerSocket start");
142     auto localNode = std::make_unique<NodeBasicInfo>();
143     int32_t retCode = GetLocalNodeDeviceInfo(DINPUT_PKG_NAME.c_str(), localNode.get());
144     if (retCode != DH_SUCCESS) {
145         DHLOGE("Init Could not get local device id.");
146         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_INIT_FAIL;
147     }
148     std::string networkId = localNode->networkId;
149     localSessionName_ = SESSION_NAME + networkId.substr(0, INTERCEPT_STRING_LENGTH);
150     DHLOGI("CreateServerSocket local networkId is %{public}s, local socketName: %{public}s",
151         GetAnonyString(networkId).c_str(), localSessionName_.c_str());
152     SocketInfo info = {
153         .name = const_cast<char*>(localSessionName_.c_str()),
154         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
155         .dataType = DATA_TYPE_BYTES
156     };
157     int32_t socket = Socket(info);
158     DHLOGI("CreateServerSocket Finish, socket: %{public}d", socket);
159     return socket;
160 }
161 
Release()162 void DistributedInputTransportBase::Release()
163 {
164     std::unique_lock<std::mutex> sessionLock(operationMutex_);
165     auto iter = remoteDevSessionMap_.begin();
166     for (; iter != remoteDevSessionMap_.end(); ++iter) {
167         DHLOGI("Shutdown client socket: %{public}d to remote dev: %{public}s", iter->second,
168             GetAnonyString(iter->first).c_str());
169         Shutdown(iter->second);
170     }
171 
172     {
173         std::unique_lock<std::mutex> sessionServerLock(sessSerOperMutex_);
174         if (!isSessSerCreateFlag_.load()) {
175             DHLOGI("DSoftBus Server Socket already remove success.");
176         } else {
177             DHLOGI("Shutdown DSoftBus Server Socket, socket: %{public}d", localServerSocket_.load());
178             Shutdown(localServerSocket_.load());
179             localServerSocket_ = -1;
180             isSessSerCreateFlag_.store(false);
181         }
182     }
183     remoteDevSessionMap_.clear();
184     channelStatusMap_.clear();
185 }
186 
CheckDeviceSessionState(const std::string & remoteDevId)187 int32_t DistributedInputTransportBase::CheckDeviceSessionState(const std::string &remoteDevId)
188 {
189     std::unique_lock<std::mutex> sessionLock(operationMutex_);
190     if (remoteDevSessionMap_.find(remoteDevId) == remoteDevSessionMap_.end()) {
191         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_DEVICE_SESSION_STATE;
192     }
193     DHLOGI("CheckDeviceSessionState has opened, remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
194     return DH_SUCCESS;
195 }
196 
GetDevIdBySessionId(int32_t sessionId)197 std::string DistributedInputTransportBase::GetDevIdBySessionId(int32_t sessionId)
198 {
199     std::unique_lock<std::mutex> sessionLock(operationMutex_);
200     for (auto iter = remoteDevSessionMap_.begin(); iter != remoteDevSessionMap_.end(); ++iter) {
201         if (iter->second == sessionId) {
202             return iter->first;
203         }
204     }
205     return "";
206 }
207 
CreateClientSocket(const std::string & remoteDevId)208 int32_t DistributedInputTransportBase::CreateClientSocket(const std::string &remoteDevId)
209 {
210     DHLOGI("CreateClientSocket start, peerNetworkId: %{public}s", GetAnonyString(remoteDevId).c_str());
211     std::string localSesionName = localSessionName_ + "_" + std::to_string(GetCurrentTimeUs());
212     std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
213     SocketInfo info = {
214         .name = const_cast<char*>(localSesionName.c_str()),
215         .peerName = const_cast<char*>(peerSessionName.c_str()),
216         .peerNetworkId = const_cast<char*>(remoteDevId.c_str()),
217         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
218         .dataType = DATA_TYPE_BYTES
219     };
220     int32_t socket = Socket(info);
221     DHLOGI("Bind Socket server, socket: %{public}d, localSessionName: %{public}s, peerSessionName: %{public}s",
222         socket, localSesionName.c_str(), peerSessionName.c_str());
223     return socket;
224 }
225 
StartSession(const std::string & remoteDevId)226 int32_t DistributedInputTransportBase::StartSession(const std::string &remoteDevId)
227 {
228     int32_t ret = CheckDeviceSessionState(remoteDevId);
229     if (ret == DH_SUCCESS) {
230         DHLOGE("Softbus session has already opened, deviceId: %{public}s", GetAnonyString(remoteDevId).c_str());
231         return DH_SUCCESS;
232     }
233 
234     int socket = CreateClientSocket(remoteDevId);
235     if (socket < DH_SUCCESS) {
236         DHLOGE("StartSession failed, ret: %{public}d", socket);
237         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
238     }
239     StartAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
240     ret = Bind(socket, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
241     if (ret < DH_SUCCESS) {
242         DHLOGE("OpenSession fail, remoteDevId: %{public}s, socket: %{public}d", GetAnonyString(remoteDevId).c_str(),
243             socket);
244         FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
245         Shutdown(socket);
246         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_OPEN_SESSION_FAIL;
247     }
248 
249     std::string peerSessionName = SESSION_NAME + remoteDevId.substr(0, INTERCEPT_STRING_LENGTH);
250     HiDumper::GetInstance().CreateSessionInfo(remoteDevId, socket, localSessionName_, peerSessionName,
251         SessionStatus::OPENED);
252     DHLOGI("OpenSession success, remoteDevId:%{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(),
253         socket);
254     sessionId_ = socket;
255 
256     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
257     if (dhFwkKit != nullptr) {
258         DHLOGD("Enable low Latency!");
259         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
260     }
261 
262     PeerSocketInfo peerSocketInfo = {
263         .name = const_cast<char*>(peerSessionName.c_str()),
264         .networkId = const_cast<char*>(remoteDevId.c_str()),
265         .pkgName = const_cast<char*>(DINPUT_PKG_NAME.c_str()),
266         .dataType = DATA_TYPE_BYTES
267     };
268     OnSessionOpened(socket, peerSocketInfo);
269     return DH_SUCCESS;
270 }
271 
GetCurrentSessionId()272 int32_t DistributedInputTransportBase::GetCurrentSessionId()
273 {
274     return sessionId_;
275 }
276 
StopAllSession()277 void DistributedInputTransportBase::StopAllSession()
278 {
279     std::map<std::string, int32_t> remoteDevSessions;
280     {
281         std::unique_lock<std::mutex> sessionLock(operationMutex_);
282         std::for_each(remoteDevSessionMap_.begin(), remoteDevSessionMap_.end(),
283             [&remoteDevSessions] (const std::pair<std::string, int32_t> &pair) {
284             remoteDevSessions[pair.first] = pair.second;
285         });
286     }
287 
288     std::for_each(remoteDevSessions.begin(), remoteDevSessions.end(),
289         [this](const std::pair<std::string, int32_t> &pair) {
290         StopSession(pair.first);
291     });
292 }
293 
StopSession(const std::string & remoteDevId)294 void DistributedInputTransportBase::StopSession(const std::string &remoteDevId)
295 {
296     std::unique_lock<std::mutex> sessionLock(operationMutex_);
297     if (remoteDevSessionMap_.count(remoteDevId) == 0) {
298         DHLOGE("remoteDevSessionMap not find remoteDevId: %{public}s", GetAnonyString(remoteDevId).c_str());
299         return;
300     }
301     int32_t sessionId = remoteDevSessionMap_[remoteDevId];
302 
303     DHLOGI("RemoteDevId: %{public}s, sessionId: %{public}d", GetAnonyString(remoteDevId).c_str(), sessionId);
304     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSING);
305     Shutdown(sessionId);
306     remoteDevSessionMap_.erase(remoteDevId);
307     channelStatusMap_.erase(remoteDevId);
308 
309     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
310     if (dhFwkKit != nullptr) {
311         DHLOGD("Disable low Latency!");
312         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
313     }
314 
315     HiDumper::GetInstance().SetSessionStatus(remoteDevId, SessionStatus::CLOSED);
316     HiDumper::GetInstance().DeleteSessionInfo(remoteDevId);
317 }
318 
RegisterSrcHandleSessionCallback(std::shared_ptr<DInputTransbaseSourceCallback> callback)319 void DistributedInputTransportBase::RegisterSrcHandleSessionCallback(
320     std::shared_ptr<DInputTransbaseSourceCallback> callback)
321 {
322     DHLOGI("RegisterSrcHandleSessionCallback");
323     srcCallback_ = callback;
324 }
325 
RegisterSinkHandleSessionCallback(std::shared_ptr<DInputTransbaseSinkCallback> callback)326 void DistributedInputTransportBase::RegisterSinkHandleSessionCallback(
327     std::shared_ptr<DInputTransbaseSinkCallback> callback)
328 {
329     DHLOGI("RegisterSinkHandleSessionCallback");
330     sinkCallback_ = callback;
331 }
332 
RegisterSourceManagerCallback(std::shared_ptr<DInputSourceManagerCallback> callback)333 void DistributedInputTransportBase::RegisterSourceManagerCallback(
334     std::shared_ptr<DInputSourceManagerCallback> callback)
335 {
336     DHLOGI("RegisterSourceManagerCallback");
337     srcMgrCallback_ = callback;
338 }
339 
RegisterSinkManagerCallback(std::shared_ptr<DInputSinkManagerCallback> callback)340 void DistributedInputTransportBase::RegisterSinkManagerCallback(
341     std::shared_ptr<DInputSinkManagerCallback> callback)
342 {
343     DHLOGI("RegisterSinkManagerCallback");
344     sinkMgrCallback_ = callback;
345 }
346 
RegisterSessionStateCb(sptr<ISessionStateCallback> callback)347 void DistributedInputTransportBase::RegisterSessionStateCb(sptr<ISessionStateCallback> callback)
348 {
349     DHLOGI("RegisterSessionStateCb");
350     SessionStateCallback_ = callback;
351 }
352 
UnregisterSessionStateCb()353 void DistributedInputTransportBase::UnregisterSessionStateCb()
354 {
355     DHLOGI("UnregisterSessionStateCb");
356     SessionStateCallback_ = nullptr;
357 }
358 
RunSessionStateCallback(const std::string & remoteDevId,const uint32_t sessionState)359 void DistributedInputTransportBase::RunSessionStateCallback(const std::string &remoteDevId,
360     const uint32_t sessionState)
361 {
362     DHLOGI("RunSessionStateCallback start.");
363     if (SessionStateCallback_ != nullptr) {
364         SessionStateCallback_->OnResult(remoteDevId, sessionState);
365         return;
366     }
367     DHLOGE("RunSessionStateCallback SessionStateCallback_ is null.");
368 }
369 
CountSession(const std::string & remoteDevId)370 int32_t DistributedInputTransportBase::CountSession(const std::string &remoteDevId)
371 {
372     return remoteDevSessionMap_.count(remoteDevId);
373 }
374 
EraseSessionId(const std::string & remoteDevId)375 void DistributedInputTransportBase::EraseSessionId(const std::string &remoteDevId)
376 {
377     remoteDevSessionMap_.erase(remoteDevId);
378 }
379 
OnSessionOpened(int32_t sessionId,const PeerSocketInfo & info)380 int32_t DistributedInputTransportBase::OnSessionOpened(int32_t sessionId, const PeerSocketInfo &info)
381 {
382     std::string peerDevId;
383     peerDevId.assign(info.networkId);
384     DHLOGI("OnSessionOpened, socket: %{public}d, peerSocketName: %{public}s, peerNetworkId: %{public}s, "
385         "peerPkgName: %{public}s", sessionId, info.name, GetAnonyString(peerDevId).c_str(), info.pkgName);
386     FinishAsyncTrace(DINPUT_HITRACE_LABEL, DINPUT_OPEN_SESSION_START, DINPUT_OPEN_SESSION_TASK);
387 
388     {
389         std::unique_lock<std::mutex> sessionLock(operationMutex_);
390         remoteDevSessionMap_[peerDevId] = sessionId;
391         channelStatusMap_[peerDevId] = true;
392     }
393     RunSessionStateCallback(peerDevId, SESSION_STATUS_OPENED);
394     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
395     if (dhFwkKit != nullptr) {
396         DHLOGD("Enable low Latency!");
397         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, ENABLE_LOW_LATENCY.dump());
398     }
399     DHLOGI("OnSessionOpened finish");
400     return DH_SUCCESS;
401 }
402 
OnSessionClosed(int32_t sessionId,ShutdownReason reason)403 void DistributedInputTransportBase::OnSessionClosed(int32_t sessionId, ShutdownReason reason)
404 {
405     DHLOGI("OnSessionClosed, socket: %{public}d, reason: %{public}d", sessionId, (int32_t)reason);
406     std::string deviceId = GetDevIdBySessionId(sessionId);
407     DHLOGI("OnSessionClosed notify session closed, sessionId: %{public}d, peer deviceId:%{public}s",
408         sessionId, GetAnonyString(deviceId).c_str());
409     RunSessionStateCallback(deviceId, SESSION_STATUS_CLOSED);
410 
411     {
412         std::unique_lock<std::mutex> sessionLock(operationMutex_);
413         if (CountSession(deviceId) > 0) {
414             EraseSessionId(deviceId);
415         }
416         channelStatusMap_.erase(deviceId);
417 
418         if (sinkCallback_ == nullptr) {
419             DHLOGE("sinkCallback is nullptr.");
420             return;
421         }
422         sinkCallback_->NotifySessionClosed(sessionId);
423 
424         if (srcCallback_ == nullptr) {
425             DHLOGE("srcCallback is nullptr.");
426             return;
427         }
428         srcCallback_->NotifySessionClosed();
429 
430         if (srcMgrCallback_ == nullptr) {
431             DHLOGE("srcMgrCallback is nullptr.");
432             return;
433         }
434         srcMgrCallback_->ResetSrcMgrResStatus();
435 
436         if (sinkMgrCallback_ == nullptr) {
437             DHLOGE("sinkMgrCallback is nullptr.");
438             return;
439         }
440         sinkMgrCallback_->ResetSinkMgrResStatus();
441     }
442 
443     std::shared_ptr<DistributedHardwareFwkKit> dhFwkKit = DInputContext::GetInstance().GetDHFwkKit();
444     if (dhFwkKit != nullptr) {
445         DHLOGD("Disable low Latency!");
446         dhFwkKit->PublishMessage(DHTopic::TOPIC_LOW_LATENCY, DISABLE_LOW_LATENCY.dump());
447     }
448     DHLOGI("OnSessionClosed finish");
449 }
450 
CheckRecivedData(const std::string & message)451 bool DistributedInputTransportBase::CheckRecivedData(const std::string &message)
452 {
453     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
454     if (recMsg.is_discarded()) {
455         DHLOGE("OnBytesReceived jsonStr error.");
456         return false;
457     }
458 
459     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
460         DHLOGE("The key is invalid.");
461         return false;
462     }
463 
464     return true;
465 }
466 
OnBytesReceived(int32_t sessionId,const void * data,uint32_t dataLen)467 void DistributedInputTransportBase::OnBytesReceived(int32_t sessionId, const void *data, uint32_t dataLen)
468 {
469     if (sessionId < 0 || data == nullptr || dataLen == 0 || dataLen > MSG_MAX_SIZE) {
470         DHLOGE("OnBytesReceived param check failed");
471         return;
472     }
473 
474     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc(dataLen + 1, sizeof(uint8_t)));
475     if (buf == nullptr) {
476         DHLOGE("OnBytesReceived: malloc memory failed");
477         return;
478     }
479 
480     if (memcpy_s(buf, dataLen + 1,  reinterpret_cast<const uint8_t *>(data), dataLen) != EOK) {
481         DHLOGE("OnBytesReceived: memcpy memory failed");
482         free(buf);
483         return;
484     }
485 
486     std::string message(buf, buf + dataLen);
487     HandleSession(sessionId, message);
488 
489     free(buf);
490     return;
491 }
492 
HandleSession(int32_t sessionId,const std::string & message)493 void DistributedInputTransportBase::HandleSession(int32_t sessionId, const std::string &message)
494 {
495     if (CheckRecivedData(message) != true) {
496         return;
497     }
498     nlohmann::json recMsg = nlohmann::json::parse(message, nullptr, false);
499     if (recMsg.is_discarded()) {
500         DHLOGE("recMsg parse failed!");
501         return;
502     }
503     if (!IsUInt32(recMsg, DINPUT_SOFTBUS_KEY_CMD_TYPE)) {
504         DHLOGE("softbus cmd key is invalid");
505         return;
506     }
507     uint32_t cmdType = recMsg[DINPUT_SOFTBUS_KEY_CMD_TYPE];
508     if (cmdType < TRANS_MSG_SRC_SINK_SPLIT) {
509         if (srcCallback_ == nullptr) {
510             DHLOGE("srcCallback is nullptr.");
511             return;
512         }
513         srcCallback_->HandleSessionData(sessionId, message);
514         return;
515     }
516     if (cmdType > TRANS_MSG_SRC_SINK_SPLIT) {
517         if (sinkCallback_ == nullptr) {
518             DHLOGE("sinkCallback is nullptr.");
519             return;
520         }
521         sinkCallback_->HandleSessionData(sessionId, message);
522     }
523 }
524 
SendMsg(int32_t sessionId,std::string & message)525 int32_t DistributedInputTransportBase::SendMsg(int32_t sessionId, std::string &message)
526 {
527     if (message.size() > MSG_MAX_SIZE) {
528         DHLOGE("SendMessage error: message.size() > MSG_MAX_SIZE, msg size: %{public}zu", message.size());
529         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
530     }
531     uint8_t *buf = reinterpret_cast<uint8_t *>(calloc((MSG_MAX_SIZE), sizeof(uint8_t)));
532     if (buf == nullptr) {
533         DHLOGE("SendMsg: malloc memory failed");
534         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
535     }
536     int32_t outLen = 0;
537     if (memcpy_s(buf, MSG_MAX_SIZE, reinterpret_cast<const uint8_t *>(message.c_str()), message.size()) != EOK) {
538         DHLOGE("SendMsg: memcpy memory failed");
539         free(buf);
540         return ERR_DH_INPUT_SERVER_SOURCE_TRANSPORT_SENDMESSSAGE;
541     }
542     outLen = static_cast<int32_t>(message.size());
543     int32_t ret = SendBytes(sessionId, buf, outLen);
544     free(buf);
545     return ret;
546 }
547 
GetSessionIdByDevId(const std::string & srcId)548 int32_t DistributedInputTransportBase::GetSessionIdByDevId(const std::string &srcId)
549 {
550     std::unique_lock<std::mutex> sessionLock(operationMutex_);
551     std::map<std::string, int32_t>::iterator it = remoteDevSessionMap_.find(srcId);
552     if (it != remoteDevSessionMap_.end()) {
553         return it->second;
554     }
555     DHLOGE("get session id failed, srcId = %{public}s", GetAnonyString(srcId).c_str());
556     return ERR_DH_INPUT_SERVER_SINK_TRANSPORT_GET_SESSIONID_FAIL;
557 }
558 } // namespace DistributedInput
559 } // namespace DistributedHardware
560 } // namespace OHOS