1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18 
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36 
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46 
47 class AppDataListenerWrap {
48 public:
49     static void SetDataHandler(SoftBusAdapter *handler);
50 
51     static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52     static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53     static void OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount);
54 
55     static void OnServerBind(int32_t socket, PeerSocketInfo info);
56     static void OnServerShutdown(int32_t socket, ShutdownReason reason);
57     static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
58 
59 private:
60     // notify all listeners when received message
61     static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
62         const PipeInfo &pipeInfo);
63     static std::string GetPipeId(const std::string &name);
64 
65     static SoftBusAdapter *softBusAdapter_;
66 };
67 
68 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
69 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
70 
71 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)72 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
73 {
74     if (networkId == nullptr) {
75         return;
76     }
77     LevelInfo level = {
78         .dynamic = dataLevel.dynamicLevel,
79         .statics = dataLevel.staticLevel,
80         .switches = dataLevel.switchLevel,
81         .switchesLen = dataLevel.switchLength,
82     };
83     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
84     SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
85 }
86 
87 IDataLevelCb g_callback = {
88     .onDataLevelChanged = OnDataLevelChanged,
89 };
90 } // namespace
SoftBusAdapter()91 SoftBusAdapter::SoftBusAdapter()
92 {
93     ZLOGI("begin");
94     AppDataListenerWrap::SetDataHandler(this);
95 
96     clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
97     clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
98     clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
99     clientListener_.OnQos = AppDataListenerWrap::OnClientSocketChanged;
100 
101     serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
102     serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
103     serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
104     serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
105 
106     auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
107     if (status != Status::SUCCESS) {
108         ZLOGW("register device change failed, status:%d", static_cast<int>(status));
109     }
110 
111     Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
112         StartCloseSessionTask(deviceId);
113     });
114 
115     ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
116         return CloseSession(networkId);
117     });
118     ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
119         auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
120         Context::GetInstance().NotifySessionClose(uuid);
121     });
122     ConnectManager::GetInstance()->OnStart();
123 }
124 
~SoftBusAdapter()125 SoftBusAdapter::~SoftBusAdapter()
126 {
127     ZLOGI("begin");
128     if (onBroadcast_) {
129         UnregDataLevelChangeCb(PKG_NAME);
130     }
131     connects_.Clear();
132     ConnectManager::GetInstance()->OnDestory();
133 }
134 
GetInstance()135 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
136 {
137     static std::once_flag onceFlag;
138     std::call_once(onceFlag, [&] {
139         instance_ = std::make_shared<SoftBusAdapter>();
140     });
141     return instance_;
142 }
143 
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)144 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
145 {
146     ZLOGD("begin");
147     if (observer == nullptr) {
148         return Status::INVALID_ARGUMENT;
149     }
150 
151     auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
152     if (!ret) {
153         ZLOGW("Add listener error or repeated adding.");
154         return Status::ERROR;
155     }
156 
157     return Status::SUCCESS;
158 }
159 
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)160 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
161     const PipeInfo &pipeInfo)
162 {
163     ZLOGD("begin");
164     if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
165         return Status::SUCCESS;
166     }
167     ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
168     return Status::ERROR;
169 }
170 
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)171 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
172 {
173     Time now = std::chrono::steady_clock::now();
174     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
175     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
176     if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
177         taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
178         next_ = expireTime;
179     }
180 }
181 
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)182 std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
183     const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
184 {
185     std::shared_ptr<SoftBusClient> conn;
186     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
187     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
188     connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType](const auto &key,
189         std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
190         for (auto &connect : connects) {
191             if (connect->GetQoSType() == qosType) {
192                 conn = connect;
193                 return true;
194             }
195         }
196         auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
197         connects.emplace_back(connect);
198         conn = connect;
199         return true;
200     });
201     if (conn == nullptr) {
202         return std::make_pair(Status::ERROR, 0);
203     }
204     auto status = conn->CheckStatus();
205     if (status == Status::RATE_LIMIT) {
206         return std::make_pair(Status::RATE_LIMIT, 0);
207     }
208     if (status != Status::SUCCESS) {
209         return OpenConnect(conn, deviceId);
210     }
211     status = conn->SendData(dataInfo, &clientListener_);
212     if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
213         GetExpireTime(conn);
214     }
215     auto errCode = conn->GetSoftBusError();
216     return std::make_pair(status, errCode);
217 }
218 
OpenConnect(const std::shared_ptr<SoftBusClient> & conn,const DeviceId & deviceId)219 std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
220     const DeviceId &deviceId)
221 {
222     auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
223         auto conn = connect.lock();
224         if (conn != nullptr) {
225             conn->OpenConnect(&clientListener_);
226         }
227     };
228     auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
229     ConnectManager::GetInstance()->ApplyConnect(networkId, task);
230     return std::make_pair(Status::RATE_LIMIT, 0);
231 }
232 
StartCloseSessionTask(const std::string & deviceId)233 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
234 {
235     std::shared_ptr<SoftBusClient> conn;
236     bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
237     uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
238     auto connects = connects_.Find(deviceId);
239     if (!connects.first) {
240         return;
241     }
242     for (auto &connect : connects.second) {
243         if (connect->GetQoSType() == qosType) {
244             conn = connect;
245             break;
246         }
247     }
248     if (conn == nullptr) {
249         return;
250     }
251     Time now = std::chrono::steady_clock::now();
252     auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
253     lock_guard<decltype(taskMutex_)> lock(taskMutex_);
254     if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
255         ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
256         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
257         next_ = expireTime;
258     }
259 }
260 
GetCloseSessionTask()261 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
262 {
263     return [this]() mutable {
264         Time now = std::chrono::steady_clock::now();
265         std::vector<std::shared_ptr<SoftBusClient>> connToClose;
266         connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
267             std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
268             for (auto conn : connects) {
269                 if (conn == nullptr) {
270                     continue;
271                 }
272                 auto expireTime = conn->GetExpireTime();
273                 if (expireTime <= now) {
274                     ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
275                     connToClose.emplace_back(conn);
276                 } else {
277                     holdConnects.emplace_back(conn);
278                 }
279             }
280             connects = std::move(holdConnects);
281             return false;
282         });
283         connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
284             if (conn.empty()) {
285                 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
286             }
287             return conn.empty();
288         });
289         Time next = INVALID_NEXT;
290         lock_guard<decltype(taskMutex_)> lg(taskMutex_);
291         connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
292             for (auto conn : connects) {
293                 if (conn == nullptr) {
294                     continue;
295                 }
296                 auto expireTime = conn->GetExpireTime();
297                 if (expireTime < next) {
298                     next = expireTime;
299                 }
300             }
301             return false;
302         });
303         if (next == INVALID_NEXT) {
304             taskId_ = ExecutorPool::INVALID_TASK_ID;
305             return;
306         }
307         taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
308             next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
309         next_ = next;
310     };
311 }
312 
GetMtuSize(const DeviceId & deviceId)313 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
314 {
315     uint32_t mtuSize = DEFAULT_MTU_SIZE;
316     connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
317         uint32_t mtu = 0;
318         for (auto conn : connects) {
319             if (conn == nullptr) {
320                 continue;
321             }
322             if (mtu < conn->GetMtuSize()) {
323                 mtu = conn->GetMtuSize();
324             }
325         }
326         if (mtu != 0) {
327             mtuSize = mtu;
328         }
329         return true;
330     });
331     return mtuSize;
332 }
333 
GetTimeout(const DeviceId & deviceId)334 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
335 {
336     uint32_t interval = DEFAULT_TIMEOUT;
337     connects_.ComputeIfPresent(deviceId.deviceId, [&interval](auto, auto &connects) {
338         uint32_t time = 0;
339         for (auto conn : connects) {
340             if (conn == nullptr) {
341                 continue;
342             }
343             if (time < conn->GetTimeout()) {
344                 time = conn->GetTimeout();
345             }
346         }
347         if (time != 0) {
348             interval = time;
349         }
350         return true;
351     });
352     return interval;
353 }
354 
DelConnect(int32_t socket,bool isForce)355 std::string SoftBusAdapter::DelConnect(int32_t socket, bool isForce)
356 {
357     std::string name;
358     std::set<std::string> closedConnect;
359     connects_.EraseIf([socket, isForce, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
360         if (!isForce && DmAdapter::GetInstance().IsOHOSType(deviceId)) {
361             return false;
362         }
363         for (auto iter = connects.begin(); iter != connects.end();) {
364             if (*iter != nullptr && **iter == socket) {
365                 name += deviceId;
366                 name += " ";
367                 iter = connects.erase(iter);
368             } else {
369                 iter++;
370             }
371         }
372         if (connects.empty()) {
373             closedConnect.insert(deviceId);
374             return true;
375         }
376         return false;
377     });
378     for (const auto &deviceId : closedConnect) {
379         auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId).networkId;
380         ConnectManager::GetInstance()->OnSessionClose(networkId);
381     }
382     return name;
383 }
384 
OnClientShutdown(int32_t socket,bool isForce)385 std::string SoftBusAdapter::OnClientShutdown(int32_t socket, bool isForce)
386 {
387     return DelConnect(socket, isForce);
388 }
389 
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)390 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
391     __attribute__((unused)) const struct DeviceId &peer)
392 {
393     ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
394         KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
395     return true;
396 }
397 
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)398 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
399 {
400     ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
401     flag_ = flag;
402 }
403 
CreateSessionServerAdapter(const std::string & sessionName)404 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
405 {
406     ZLOGD("begin");
407     SocketInfo socketInfo;
408     std::string sessionServerName = sessionName;
409     socketInfo.name = const_cast<char *>(sessionServerName.c_str());
410     std::string pkgName = "ohos.distributeddata";
411     socketInfo.pkgName = pkgName.data();
412     socket_ = Socket(socketInfo);
413     return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
414 }
415 
RemoveSessionServerAdapter(const std::string & sessionName) const416 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
417 {
418     ZLOGD("begin");
419     Shutdown(socket_);
420     return 0;
421 }
422 
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)423 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
424     const PipeInfo &pipeInfo)
425 {
426     ZLOGD("begin");
427     auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
428         [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
429             ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
430                 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
431             DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
432             value->OnMessage(deviceInfo, data, size, pipeInfo);
433             TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
434             Reporter::GetInstance()->TrafficStatistic()->Report(ts);
435             return true;
436         });
437     if (!ret) {
438         ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
439     }
440 }
441 
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)442 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
443 {
444     DataLevel level = {
445         .dynamicLevel = levelInfo.dynamic,
446         .staticLevel = levelInfo.statics,
447         .switchLevel = levelInfo.switches,
448         .switchLength = levelInfo.switchesLen,
449     };
450     auto status = SetDataLevel(&level);
451     if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
452         return Status::NOT_SUPPORT_BROADCAST;
453     }
454     return status ? Status::ERROR : Status::SUCCESS;
455 }
456 
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)457 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
458 {
459     ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
460     if (!onBroadcast_) {
461         ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
462         return;
463     }
464     onBroadcast_(device.deviceId, levelInfo);
465 }
466 
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)467 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
468     std::function<void(const std::string &, const LevelInfo &)> listener)
469 {
470     if (onBroadcast_) {
471         return SOFTBUS_ALREADY_EXISTED;
472     }
473     onBroadcast_ = std::move(listener);
474     return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
475 }
476 
SetDataHandler(SoftBusAdapter * handler)477 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
478 {
479     ZLOGI("begin");
480     softBusAdapter_ = handler;
481 }
482 
OnClientShutdown(int32_t socket,ShutdownReason reason)483 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
484 {
485     // when the local close the session, this callback function will not be triggered;
486     // when the current function is called, soft bus has released the session resource, only connId is valid;
487     std::string name = softBusAdapter_->OnClientShutdown(socket);
488     ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
489 }
490 
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)491 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
492 
OnClientSocketChanged(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)493 void AppDataListenerWrap::OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
494 {
495     if (eventId == QoSEvent::QOS_SATISFIED && qos != nullptr && qos[0].qos == QOS_TYPE_MIN_BW && qosCount == 1) {
496         auto name = softBusAdapter_->OnClientShutdown(socket, false);
497         ZLOGI("[SocketChanged] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
498     }
499 }
500 
OnServerBind(int32_t socket,PeerSocketInfo info)501 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
502 {
503     softBusAdapter_->OnBind(socket, info);
504     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
505 
506     ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
507         KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
508 }
509 
OnServerShutdown(int32_t socket,ShutdownReason reason)510 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
511 {
512     softBusAdapter_->OnServerShutdown(socket);
513     ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
514 }
515 
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)516 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
517 {
518     SoftBusAdapter::ServerSocketInfo info;
519     if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
520         ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
521         return;
522     };
523     std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
524     ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
525         socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
526 
527     std::string pipeId = GetPipeId(info.name);
528     if (pipeId.empty()) {
529         ZLOGE("pipId is invalid");
530         return;
531     }
532 
533     NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
534 }
535 
GetPipeId(const std::string & name)536 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
537 {
538     auto pos = name.find('_');
539     if (pos != std::string::npos) {
540         return name.substr(0, pos);
541     }
542     return name;
543 }
544 
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)545 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
546     const PipeInfo &pipeInfo)
547 {
548     softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
549 }
550 
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)551 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
552 {
553     auto it = peerSocketInfos_.Find(socket);
554     if (it.first) {
555         info = it.second;
556         return true;
557     }
558     return false;
559 }
560 
OnBind(int32_t socket,PeerSocketInfo info)561 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
562 {
563     ServerSocketInfo socketInfo;
564     socketInfo.name = info.name;
565     socketInfo.networkId = info.networkId;
566     socketInfo.pkgName = info.pkgName;
567     peerSocketInfos_.Insert(socket, socketInfo);
568 }
569 
OnServerShutdown(int32_t socket)570 void SoftBusAdapter::OnServerShutdown(int32_t socket)
571 {
572     peerSocketInfos_.Erase(socket);
573 }
574 
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const575 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
576     const AppDistributedKv::DeviceChangeType &type) const
577 {
578     return;
579 }
580 
CloseSession(const std::string & networkId)581 bool SoftBusAdapter::CloseSession(const std::string &networkId)
582 {
583     auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
584     auto ret = connects_.Erase(uuid);
585     if (ret != 0) {
586         ConnectManager::GetInstance()->OnSessionClose(networkId);
587     }
588     return ret != 0;
589 }
590 } // namespace AppDistributedKv
591 } // namespace OHOS