1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <mutex>
16 #include <string>
17 #include <thread>
18
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "data_level.h"
22 #include "device_manager_adapter.h"
23 #include "dfx_types.h"
24 #include "kvstore_utils.h"
25 #include "log_print.h"
26 #include "reporter.h"
27 #include "securec.h"
28 #include "session.h"
29 #include "softbus_adapter.h"
30 #include "softbus_bus_center.h"
31 #include "softbus_error_code.h"
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "SoftBusAdapter"
36
37 namespace OHOS {
38 namespace AppDistributedKv {
39 using Context = DistributedData::CommunicatorContext;
40 constexpr uint32_t DEFAULT_MTU_SIZE = 4096 * 1024u;
41 constexpr uint32_t DEFAULT_TIMEOUT = 30 * 1000;
42 using namespace std;
43 using namespace OHOS::DistributedDataDfx;
44 using namespace OHOS::DistributedKv;
45 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
46
47 class AppDataListenerWrap {
48 public:
49 static void SetDataHandler(SoftBusAdapter *handler);
50
51 static void OnClientShutdown(int32_t socket, ShutdownReason reason);
52 static void OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
53 static void OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount);
54
55 static void OnServerBind(int32_t socket, PeerSocketInfo info);
56 static void OnServerShutdown(int32_t socket, ShutdownReason reason);
57 static void OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen);
58
59 private:
60 // notify all listeners when received message
61 static void NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
62 const PipeInfo &pipeInfo);
63 static std::string GetPipeId(const std::string &name);
64
65 static SoftBusAdapter *softBusAdapter_;
66 };
67
68 SoftBusAdapter *AppDataListenerWrap::softBusAdapter_;
69 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::instance_;
70
71 namespace {
OnDataLevelChanged(const char * networkId,const DataLevel dataLevel)72 void OnDataLevelChanged(const char* networkId, const DataLevel dataLevel)
73 {
74 if (networkId == nullptr) {
75 return;
76 }
77 LevelInfo level = {
78 .dynamic = dataLevel.dynamicLevel,
79 .statics = dataLevel.staticLevel,
80 .switches = dataLevel.switchLevel,
81 .switchesLen = dataLevel.switchLength,
82 };
83 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
84 SoftBusAdapter::GetInstance()->OnBroadcast({ uuid }, std::move(level));
85 }
86
87 IDataLevelCb g_callback = {
88 .onDataLevelChanged = OnDataLevelChanged,
89 };
90 } // namespace
SoftBusAdapter()91 SoftBusAdapter::SoftBusAdapter()
92 {
93 ZLOGI("begin");
94 AppDataListenerWrap::SetDataHandler(this);
95
96 clientListener_.OnShutdown = AppDataListenerWrap::OnClientShutdown;
97 clientListener_.OnBytes = AppDataListenerWrap::OnClientBytesReceived;
98 clientListener_.OnMessage = AppDataListenerWrap::OnClientBytesReceived;
99 clientListener_.OnQos = AppDataListenerWrap::OnClientSocketChanged;
100
101 serverListener_.OnBind = AppDataListenerWrap::OnServerBind;
102 serverListener_.OnShutdown = AppDataListenerWrap::OnServerShutdown;
103 serverListener_.OnBytes = AppDataListenerWrap::OnServerBytesReceived;
104 serverListener_.OnMessage = AppDataListenerWrap::OnServerBytesReceived;
105
106 auto status = DmAdapter::GetInstance().StartWatchDeviceChange(this, { "softBusAdapter" });
107 if (status != Status::SUCCESS) {
108 ZLOGW("register device change failed, status:%d", static_cast<int>(status));
109 }
110
111 Context::GetInstance().SetSessionListener([this](const std::string &deviceId) {
112 StartCloseSessionTask(deviceId);
113 });
114
115 ConnectManager::GetInstance()->RegisterCloseSessionTask([this](const std::string &networkId) {
116 return CloseSession(networkId);
117 });
118 ConnectManager::GetInstance()->RegisterSessionCloseListener("context", [](const std::string &networkId) {
119 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
120 Context::GetInstance().NotifySessionClose(uuid);
121 });
122 ConnectManager::GetInstance()->OnStart();
123 }
124
~SoftBusAdapter()125 SoftBusAdapter::~SoftBusAdapter()
126 {
127 ZLOGI("begin");
128 if (onBroadcast_) {
129 UnregDataLevelChangeCb(PKG_NAME);
130 }
131 connects_.Clear();
132 ConnectManager::GetInstance()->OnDestory();
133 }
134
GetInstance()135 std::shared_ptr<SoftBusAdapter> SoftBusAdapter::GetInstance()
136 {
137 static std::once_flag onceFlag;
138 std::call_once(onceFlag, [&] {
139 instance_ = std::make_shared<SoftBusAdapter>();
140 });
141 return instance_;
142 }
143
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)144 Status SoftBusAdapter::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
145 {
146 ZLOGD("begin");
147 if (observer == nullptr) {
148 return Status::INVALID_ARGUMENT;
149 }
150
151 auto ret = dataChangeListeners_.Insert(pipeInfo.pipeId, observer);
152 if (!ret) {
153 ZLOGW("Add listener error or repeated adding.");
154 return Status::ERROR;
155 }
156
157 return Status::SUCCESS;
158 }
159
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)160 Status SoftBusAdapter::StopWatchDataChange(__attribute__((unused)) const AppDataChangeListener *observer,
161 const PipeInfo &pipeInfo)
162 {
163 ZLOGD("begin");
164 if (dataChangeListeners_.Erase(pipeInfo.pipeId) != 0) {
165 return Status::SUCCESS;
166 }
167 ZLOGW("stop data observer error, pipeInfo:%{public}s", pipeInfo.pipeId.c_str());
168 return Status::ERROR;
169 }
170
GetExpireTime(std::shared_ptr<SoftBusClient> & conn)171 void SoftBusAdapter::GetExpireTime(std::shared_ptr<SoftBusClient> &conn)
172 {
173 Time now = std::chrono::steady_clock::now();
174 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
175 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
176 if (taskId_ != ExecutorPool::INVALID_TASK_ID && expireTime < next_) {
177 taskId_ = Context::GetInstance().GetThreadPool()->Reset(taskId_, expireTime - now);
178 next_ = expireTime;
179 }
180 }
181
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t length,const MessageInfo & info)182 std::pair<Status, int32_t> SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId,
183 const DataInfo &dataInfo, uint32_t length, const MessageInfo &info)
184 {
185 std::shared_ptr<SoftBusClient> conn;
186 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId.deviceId);
187 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
188 connects_.Compute(deviceId.deviceId, [&pipeInfo, &deviceId, &conn, qosType, isOHOSType](const auto &key,
189 std::vector<std::shared_ptr<SoftBusClient>> &connects) -> bool {
190 for (auto &connect : connects) {
191 if (connect->GetQoSType() == qosType) {
192 conn = connect;
193 return true;
194 }
195 }
196 auto connect = std::make_shared<SoftBusClient>(pipeInfo, deviceId, qosType);
197 connects.emplace_back(connect);
198 conn = connect;
199 return true;
200 });
201 if (conn == nullptr) {
202 return std::make_pair(Status::ERROR, 0);
203 }
204 auto status = conn->CheckStatus();
205 if (status == Status::RATE_LIMIT) {
206 return std::make_pair(Status::RATE_LIMIT, 0);
207 }
208 if (status != Status::SUCCESS) {
209 return OpenConnect(conn, deviceId);
210 }
211 status = conn->SendData(dataInfo, &clientListener_);
212 if ((status != Status::NETWORK_ERROR) && (status != Status::RATE_LIMIT)) {
213 GetExpireTime(conn);
214 }
215 auto errCode = conn->GetSoftBusError();
216 return std::make_pair(status, errCode);
217 }
218
OpenConnect(const std::shared_ptr<SoftBusClient> & conn,const DeviceId & deviceId)219 std::pair<Status, int32_t> SoftBusAdapter::OpenConnect(const std::shared_ptr<SoftBusClient> &conn,
220 const DeviceId &deviceId)
221 {
222 auto task = [this, connect = std::weak_ptr<SoftBusClient>(conn)]() {
223 auto conn = connect.lock();
224 if (conn != nullptr) {
225 conn->OpenConnect(&clientListener_);
226 }
227 };
228 auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId.deviceId).networkId;
229 ConnectManager::GetInstance()->ApplyConnect(networkId, task);
230 return std::make_pair(Status::RATE_LIMIT, 0);
231 }
232
StartCloseSessionTask(const std::string & deviceId)233 void SoftBusAdapter::StartCloseSessionTask(const std::string &deviceId)
234 {
235 std::shared_ptr<SoftBusClient> conn;
236 bool isOHOSType = DmAdapter::GetInstance().IsOHOSType(deviceId);
237 uint32_t qosType = isOHOSType ? SoftBusClient::QOS_HML : SoftBusClient::QOS_BR;
238 auto connects = connects_.Find(deviceId);
239 if (!connects.first) {
240 return;
241 }
242 for (auto &connect : connects.second) {
243 if (connect->GetQoSType() == qosType) {
244 conn = connect;
245 break;
246 }
247 }
248 if (conn == nullptr) {
249 return;
250 }
251 Time now = std::chrono::steady_clock::now();
252 auto expireTime = conn->GetExpireTime() > now ? conn->GetExpireTime() : now;
253 lock_guard<decltype(taskMutex_)> lock(taskMutex_);
254 if (taskId_ == ExecutorPool::INVALID_TASK_ID) {
255 ZLOGI("Start close session, deviceId:%{public}s", KvStoreUtils::ToBeAnonymous(deviceId).c_str());
256 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(expireTime - now, GetCloseSessionTask());
257 next_ = expireTime;
258 }
259 }
260
GetCloseSessionTask()261 SoftBusAdapter::Task SoftBusAdapter::GetCloseSessionTask()
262 {
263 return [this]() mutable {
264 Time now = std::chrono::steady_clock::now();
265 std::vector<std::shared_ptr<SoftBusClient>> connToClose;
266 connects_.ForEach([&now, &connToClose](const auto &key, auto &connects) -> bool {
267 std::vector<std::shared_ptr<SoftBusClient>> holdConnects;
268 for (auto conn : connects) {
269 if (conn == nullptr) {
270 continue;
271 }
272 auto expireTime = conn->GetExpireTime();
273 if (expireTime <= now) {
274 ZLOGI("[timeout] close session socket:%{public}d", conn->GetSocket());
275 connToClose.emplace_back(conn);
276 } else {
277 holdConnects.emplace_back(conn);
278 }
279 }
280 connects = std::move(holdConnects);
281 return false;
282 });
283 connects_.EraseIf([](const auto &key, const auto &conn) -> bool {
284 if (conn.empty()) {
285 ConnectManager::GetInstance()->OnSessionClose(DmAdapter::GetInstance().GetDeviceInfo(key).networkId);
286 }
287 return conn.empty();
288 });
289 Time next = INVALID_NEXT;
290 lock_guard<decltype(taskMutex_)> lg(taskMutex_);
291 connects_.ForEach([&next](const auto &key, auto &connects) -> bool {
292 for (auto conn : connects) {
293 if (conn == nullptr) {
294 continue;
295 }
296 auto expireTime = conn->GetExpireTime();
297 if (expireTime < next) {
298 next = expireTime;
299 }
300 }
301 return false;
302 });
303 if (next == INVALID_NEXT) {
304 taskId_ = ExecutorPool::INVALID_TASK_ID;
305 return;
306 }
307 taskId_ = Context::GetInstance().GetThreadPool()->Schedule(
308 next > now ? next - now : ExecutorPool::INVALID_DELAY, GetCloseSessionTask());
309 next_ = next;
310 };
311 }
312
GetMtuSize(const DeviceId & deviceId)313 uint32_t SoftBusAdapter::GetMtuSize(const DeviceId &deviceId)
314 {
315 uint32_t mtuSize = DEFAULT_MTU_SIZE;
316 connects_.ComputeIfPresent(deviceId.deviceId, [&mtuSize](auto, auto &connects) {
317 uint32_t mtu = 0;
318 for (auto conn : connects) {
319 if (conn == nullptr) {
320 continue;
321 }
322 if (mtu < conn->GetMtuSize()) {
323 mtu = conn->GetMtuSize();
324 }
325 }
326 if (mtu != 0) {
327 mtuSize = mtu;
328 }
329 return true;
330 });
331 return mtuSize;
332 }
333
GetTimeout(const DeviceId & deviceId)334 uint32_t SoftBusAdapter::GetTimeout(const DeviceId &deviceId)
335 {
336 uint32_t interval = DEFAULT_TIMEOUT;
337 connects_.ComputeIfPresent(deviceId.deviceId, [&interval](auto, auto &connects) {
338 uint32_t time = 0;
339 for (auto conn : connects) {
340 if (conn == nullptr) {
341 continue;
342 }
343 if (time < conn->GetTimeout()) {
344 time = conn->GetTimeout();
345 }
346 }
347 if (time != 0) {
348 interval = time;
349 }
350 return true;
351 });
352 return interval;
353 }
354
DelConnect(int32_t socket,bool isForce)355 std::string SoftBusAdapter::DelConnect(int32_t socket, bool isForce)
356 {
357 std::string name;
358 std::set<std::string> closedConnect;
359 connects_.EraseIf([socket, isForce, &name, &closedConnect](const auto &deviceId, auto &connects) -> bool {
360 if (!isForce && DmAdapter::GetInstance().IsOHOSType(deviceId)) {
361 return false;
362 }
363 for (auto iter = connects.begin(); iter != connects.end();) {
364 if (*iter != nullptr && **iter == socket) {
365 name += deviceId;
366 name += " ";
367 iter = connects.erase(iter);
368 } else {
369 iter++;
370 }
371 }
372 if (connects.empty()) {
373 closedConnect.insert(deviceId);
374 return true;
375 }
376 return false;
377 });
378 for (const auto &deviceId : closedConnect) {
379 auto networkId = DmAdapter::GetInstance().GetDeviceInfo(deviceId).networkId;
380 ConnectManager::GetInstance()->OnSessionClose(networkId);
381 }
382 return name;
383 }
384
OnClientShutdown(int32_t socket,bool isForce)385 std::string SoftBusAdapter::OnClientShutdown(int32_t socket, bool isForce)
386 {
387 return DelConnect(socket, isForce);
388 }
389
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)390 bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo,
391 __attribute__((unused)) const struct DeviceId &peer)
392 {
393 ZLOGI("pipeInfo:%{public}s deviceId:%{public}s", pipeInfo.pipeId.c_str(),
394 KvStoreUtils::ToBeAnonymous(peer.deviceId).c_str());
395 return true;
396 }
397
SetMessageTransFlag(const PipeInfo & pipeInfo,bool flag)398 void SoftBusAdapter::SetMessageTransFlag(const PipeInfo &pipeInfo, bool flag)
399 {
400 ZLOGI("pipeInfo: %s flag: %d", pipeInfo.pipeId.c_str(), flag);
401 flag_ = flag;
402 }
403
CreateSessionServerAdapter(const std::string & sessionName)404 int SoftBusAdapter::CreateSessionServerAdapter(const std::string &sessionName)
405 {
406 ZLOGD("begin");
407 SocketInfo socketInfo;
408 std::string sessionServerName = sessionName;
409 socketInfo.name = const_cast<char *>(sessionServerName.c_str());
410 std::string pkgName = "ohos.distributeddata";
411 socketInfo.pkgName = pkgName.data();
412 socket_ = Socket(socketInfo);
413 return Listen(socket_, Qos, QOS_COUNT, &serverListener_);
414 }
415
RemoveSessionServerAdapter(const std::string & sessionName) const416 int SoftBusAdapter::RemoveSessionServerAdapter(const std::string &sessionName) const
417 {
418 ZLOGD("begin");
419 Shutdown(socket_);
420 return 0;
421 }
422
NotifyDataListeners(const uint8_t * data,int size,const std::string & deviceId,const PipeInfo & pipeInfo)423 void SoftBusAdapter::NotifyDataListeners(const uint8_t *data, int size, const std::string &deviceId,
424 const PipeInfo &pipeInfo)
425 {
426 ZLOGD("begin");
427 auto ret = dataChangeListeners_.ComputeIfPresent(pipeInfo.pipeId,
428 [&data, &size, &deviceId, &pipeInfo](const auto &key, const AppDataChangeListener *&value) {
429 ZLOGD("ready to notify, pipeName:%{public}s, deviceId:%{public}s.", pipeInfo.pipeId.c_str(),
430 KvStoreUtils::ToBeAnonymous(deviceId).c_str());
431 DeviceInfo deviceInfo = DmAdapter::GetInstance().GetDeviceInfo(deviceId);
432 value->OnMessage(deviceInfo, data, size, pipeInfo);
433 TrafficStat ts{ pipeInfo.pipeId, deviceId, 0, size };
434 Reporter::GetInstance()->TrafficStatistic()->Report(ts);
435 return true;
436 });
437 if (!ret) {
438 ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str());
439 }
440 }
441
Broadcast(const PipeInfo & pipeInfo,const LevelInfo & levelInfo)442 Status SoftBusAdapter::Broadcast(const PipeInfo &pipeInfo, const LevelInfo &levelInfo)
443 {
444 DataLevel level = {
445 .dynamicLevel = levelInfo.dynamic,
446 .staticLevel = levelInfo.statics,
447 .switchLevel = levelInfo.switches,
448 .switchLength = levelInfo.switchesLen,
449 };
450 auto status = SetDataLevel(&level);
451 if (status == SOFTBUS_FUNC_NOT_SUPPORT) {
452 return Status::NOT_SUPPORT_BROADCAST;
453 }
454 return status ? Status::ERROR : Status::SUCCESS;
455 }
456
OnBroadcast(const DeviceId & device,const LevelInfo & levelInfo)457 void SoftBusAdapter::OnBroadcast(const DeviceId &device, const LevelInfo &levelInfo)
458 {
459 ZLOGI("device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
460 if (!onBroadcast_) {
461 ZLOGW("no listener device:%{public}s", KvStoreUtils::ToBeAnonymous(device.deviceId).c_str());
462 return;
463 }
464 onBroadcast_(device.deviceId, levelInfo);
465 }
466
ListenBroadcastMsg(const PipeInfo & pipeInfo,std::function<void (const std::string &,const LevelInfo &)> listener)467 int32_t SoftBusAdapter::ListenBroadcastMsg(const PipeInfo &pipeInfo,
468 std::function<void(const std::string &, const LevelInfo &)> listener)
469 {
470 if (onBroadcast_) {
471 return SOFTBUS_ALREADY_EXISTED;
472 }
473 onBroadcast_ = std::move(listener);
474 return RegDataLevelChangeCb(pipeInfo.pipeId.c_str(), &g_callback);
475 }
476
SetDataHandler(SoftBusAdapter * handler)477 void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler)
478 {
479 ZLOGI("begin");
480 softBusAdapter_ = handler;
481 }
482
OnClientShutdown(int32_t socket,ShutdownReason reason)483 void AppDataListenerWrap::OnClientShutdown(int32_t socket, ShutdownReason reason)
484 {
485 // when the local close the session, this callback function will not be triggered;
486 // when the current function is called, soft bus has released the session resource, only connId is valid;
487 std::string name = softBusAdapter_->OnClientShutdown(socket);
488 ZLOGI("[shutdown] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
489 }
490
OnClientBytesReceived(int32_t socket,const void * data,uint32_t dataLen)491 void AppDataListenerWrap::OnClientBytesReceived(int32_t socket, const void *data, uint32_t dataLen) {}
492
OnClientSocketChanged(int32_t socket,QoSEvent eventId,const QosTV * qos,uint32_t qosCount)493 void AppDataListenerWrap::OnClientSocketChanged(int32_t socket, QoSEvent eventId, const QosTV *qos, uint32_t qosCount)
494 {
495 if (eventId == QoSEvent::QOS_SATISFIED && qos != nullptr && qos[0].qos == QOS_TYPE_MIN_BW && qosCount == 1) {
496 auto name = softBusAdapter_->OnClientShutdown(socket, false);
497 ZLOGI("[SocketChanged] socket:%{public}d, name:%{public}s", socket, KvStoreUtils::ToBeAnonymous(name).c_str());
498 }
499 }
500
OnServerBind(int32_t socket,PeerSocketInfo info)501 void AppDataListenerWrap::OnServerBind(int32_t socket, PeerSocketInfo info)
502 {
503 softBusAdapter_->OnBind(socket, info);
504 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
505
506 ZLOGI("[OnServerBind] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s", socket, info.name,
507 KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str());
508 }
509
OnServerShutdown(int32_t socket,ShutdownReason reason)510 void AppDataListenerWrap::OnServerShutdown(int32_t socket, ShutdownReason reason)
511 {
512 softBusAdapter_->OnServerShutdown(socket);
513 ZLOGI("Shut down reason:%{public}d socket id:%{public}d", reason, socket);
514 }
515
OnServerBytesReceived(int32_t socket,const void * data,uint32_t dataLen)516 void AppDataListenerWrap::OnServerBytesReceived(int32_t socket, const void *data, uint32_t dataLen)
517 {
518 SoftBusAdapter::ServerSocketInfo info;
519 if (!softBusAdapter_->GetPeerSocketInfo(socket, info)) {
520 ZLOGE("Get peer socket info failed, socket id %{public}d", socket);
521 return;
522 };
523 std::string peerDevUuid = DmAdapter::GetInstance().GetUuidByNetworkId(std::string(info.networkId));
524 ZLOGD("[OnBytesReceived] socket:%{public}d, peer name:%{public}s, peer devId:%{public}s, data len:%{public}u",
525 socket, info.name.c_str(), KvStoreUtils::ToBeAnonymous(peerDevUuid).c_str(), dataLen);
526
527 std::string pipeId = GetPipeId(info.name);
528 if (pipeId.empty()) {
529 ZLOGE("pipId is invalid");
530 return;
531 }
532
533 NotifyDataListeners(reinterpret_cast<const uint8_t *>(data), dataLen, peerDevUuid, { pipeId, "" });
534 }
535
GetPipeId(const std::string & name)536 std::string AppDataListenerWrap::GetPipeId(const std::string &name)
537 {
538 auto pos = name.find('_');
539 if (pos != std::string::npos) {
540 return name.substr(0, pos);
541 }
542 return name;
543 }
544
NotifyDataListeners(const uint8_t * data,const int size,const std::string & deviceId,const PipeInfo & pipeInfo)545 void AppDataListenerWrap::NotifyDataListeners(const uint8_t *data, const int size, const std::string &deviceId,
546 const PipeInfo &pipeInfo)
547 {
548 softBusAdapter_->NotifyDataListeners(data, size, deviceId, pipeInfo);
549 }
550
GetPeerSocketInfo(int32_t socket,ServerSocketInfo & info)551 bool SoftBusAdapter::GetPeerSocketInfo(int32_t socket, ServerSocketInfo &info)
552 {
553 auto it = peerSocketInfos_.Find(socket);
554 if (it.first) {
555 info = it.second;
556 return true;
557 }
558 return false;
559 }
560
OnBind(int32_t socket,PeerSocketInfo info)561 void SoftBusAdapter::OnBind(int32_t socket, PeerSocketInfo info)
562 {
563 ServerSocketInfo socketInfo;
564 socketInfo.name = info.name;
565 socketInfo.networkId = info.networkId;
566 socketInfo.pkgName = info.pkgName;
567 peerSocketInfos_.Insert(socket, socketInfo);
568 }
569
OnServerShutdown(int32_t socket)570 void SoftBusAdapter::OnServerShutdown(int32_t socket)
571 {
572 peerSocketInfos_.Erase(socket);
573 }
574
OnDeviceChanged(const AppDistributedKv::DeviceInfo & info,const AppDistributedKv::DeviceChangeType & type) const575 void SoftBusAdapter::OnDeviceChanged(const AppDistributedKv::DeviceInfo &info,
576 const AppDistributedKv::DeviceChangeType &type) const
577 {
578 return;
579 }
580
CloseSession(const std::string & networkId)581 bool SoftBusAdapter::CloseSession(const std::string &networkId)
582 {
583 auto uuid = DmAdapter::GetInstance().GetUuidByNetworkId(networkId);
584 auto ret = connects_.Erase(uuid);
585 if (ret != 0) {
586 ConnectManager::GetInstance()->OnSessionClose(networkId);
587 }
588 return ret != 0;
589 }
590 } // namespace AppDistributedKv
591 } // namespace OHOS