1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsoftbus_adapter_impl.h"
17
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24
25 #include "cooperate_hisysevent.h"
26 #include "device_manager.h"
27 #include "dfs_session.h"
28 #include "securec.h"
29 #include "softbus_bus_center.h"
30 #include "softbus_error_code.h"
31
32 #include "devicestatus_define.h"
33 #include "i_ddm_adapter.h"
34 #include "utility.h"
35
36 #undef LOG_TAG
37 #define LOG_TAG "DSoftbusAdapterImpl"
38
39 namespace OHOS {
40 namespace Msdp {
41 namespace DeviceStatus {
42 namespace {
43 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
44 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
45 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
46 constexpr size_t BIND_STRING_LENGTH { 15 };
47 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
48 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
49 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
50 constexpr int32_t LATENCY { 3000 };
51 constexpr int32_t SOCKET_SERVER { 0 };
52 constexpr int32_t SOCKET_CLIENT { 1 };
53 }
54
55 std::mutex DSoftbusAdapterImpl::mutex_;
56 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
57
GetInstance()58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
59 {
60 if (instance_ == nullptr) {
61 std::lock_guard<std::mutex> lock(mutex_);
62 if (instance_ == nullptr) {
63 instance_ = std::make_shared<DSoftbusAdapterImpl>();
64 }
65 }
66 return instance_;
67 }
68
DestroyInstance()69 void DSoftbusAdapterImpl::DestroyInstance()
70 {
71 std::lock_guard<std::mutex> lock(mutex_);
72 instance_.reset();
73 }
74
~DSoftbusAdapterImpl()75 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
76 {
77 Disable();
78 }
79
Enable()80 int32_t DSoftbusAdapterImpl::Enable()
81 {
82 CALL_DEBUG_ENTER;
83 std::lock_guard guard(lock_);
84 return SetupServer();
85 }
86
Disable()87 void DSoftbusAdapterImpl::Disable()
88 {
89 CALL_DEBUG_ENTER;
90 std::lock_guard guard(lock_);
91 ShutdownServer();
92 }
93
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)94 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
95 {
96 CALL_DEBUG_ENTER;
97 std::lock_guard guard(lock_);
98 CHKPV(observer);
99 observers_.erase(Observer());
100 observers_.emplace(observer);
101 }
102
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)103 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
104 {
105 CALL_DEBUG_ENTER;
106 std::lock_guard guard(lock_);
107 if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
108 observers_.erase(iter);
109 }
110 observers_.erase(Observer());
111 }
112
CheckDeviceOnline(const std::string & networkId)113 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
114 {
115 CALL_DEBUG_ENTER;
116 std::vector<DistributedHardware::DmDeviceInfo> deviceList;
117 if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
118 FI_HILOGE("GetTrustedDeviceList failed");
119 return false;
120 }
121 if (deviceList.empty()) {
122 FI_HILOGE("Trust device list size is invalid");
123 return false;
124 }
125 for (const auto &deviceInfo : deviceList) {
126 if (std::string(deviceInfo.networkId) == networkId) {
127 return true;
128 }
129 }
130 return false;
131 }
132
OpenSession(const std::string & networkId)133 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
134 {
135 CALL_DEBUG_ENTER;
136 std::lock_guard guard(lock_);
137 #ifdef ENABLE_PERFORMANCE_CHECK
138 auto startStamp = std::chrono::steady_clock::now();
139 #endif // ENABLE_PERFORMANCE_CHECK
140 if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
141 FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
142 return RET_ERR;
143 }
144 int32_t ret = OpenSessionLocked(networkId);
145 #ifdef ENABLE_PERFORMANCE_CHECK
146 auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
147 std::chrono::steady_clock::now() - startStamp).count();
148 FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
149 #endif // ENABLE_PERFORMANCE_CHECK
150 if (ret != RET_OK) {
151 CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::FAULT);
152 } else {
153 CooperateDFX::WriteOpenSession(OHOS::HiviewDFX::HiSysEvent::EventType::BEHAVIOR);
154 }
155 return ret;
156 }
157
CloseSession(const std::string & networkId)158 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
159 {
160 CALL_INFO_TRACE;
161 std::lock_guard guard(lock_);
162 if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
163 ::Shutdown(iter->second.socket_);
164 sessions_.erase(iter);
165 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
166 Utility::Anonymize(networkId).c_str());
167 }
168 }
169
CloseAllSessions()170 void DSoftbusAdapterImpl::CloseAllSessions()
171 {
172 CALL_INFO_TRACE;
173 std::lock_guard guard(lock_);
174 CloseAllSessionsLocked();
175 }
176
FindConnection(const std::string & networkId)177 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
178 {
179 CALL_DEBUG_ENTER;
180 auto iter = sessions_.find(networkId);
181 return (iter != sessions_.end() ? iter->second.socket_ : -1);
182 }
183
SendPacket(const std::string & networkId,NetPacket & packet)184 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
185 {
186 CALL_DEBUG_ENTER;
187 std::lock_guard guard(lock_);
188 int32_t socket = FindConnection(networkId);
189 if (socket < 0) {
190 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
191 return RET_ERR;
192 }
193 StreamBuffer buffer;
194 if (!packet.MakeData(buffer)) {
195 FI_HILOGE("Failed to buffer packet");
196 return RET_ERR;
197 }
198 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
199 FI_HILOGE("Packet is too large");
200 return RET_ERR;
201 }
202 int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
203 if (ret != SOFTBUS_OK) {
204 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
205 return RET_ERR;
206 }
207 return RET_OK;
208 }
209
SendParcel(const std::string & networkId,Parcel & parcel)210 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
211 {
212 CALL_DEBUG_ENTER;
213 std::lock_guard guard(lock_);
214 int32_t socket = FindConnection(networkId);
215 if (socket < 0) {
216 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
217 return RET_ERR;
218 }
219 int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
220 if (ret != SOFTBUS_OK) {
221 FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
222 return RET_ERR;
223 }
224 return RET_OK;
225 }
226
BroadcastPacket(NetPacket & packet)227 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
228 {
229 CALL_INFO_TRACE;
230 std::lock_guard guard(lock_);
231 if (sessions_.empty()) {
232 FI_HILOGE("No session connected");
233 return RET_ERR;
234 }
235 StreamBuffer buffer;
236 if (!packet.MakeData(buffer)) {
237 FI_HILOGE("Failed to buffer packet");
238 return RET_ERR;
239 }
240 if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
241 FI_HILOGE("Packet is too large");
242 return RET_ERR;
243 }
244 for (const auto &elem : sessions_) {
245 int32_t socket = elem.second.socket_;
246 if (socket < 0) {
247 FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
248 continue;
249 }
250 if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
251 FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
252 continue;
253 }
254 FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
255 }
256 return RET_OK;
257 }
258
OnBindLink(int32_t socket,PeerSocketInfo info)259 static void OnBindLink(int32_t socket, PeerSocketInfo info)
260 {
261 DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
262 }
263
OnShutdownLink(int32_t socket,ShutdownReason reason)264 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
265 {
266 DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
267 }
268
OnBytesAvailable(int32_t socket,const void * data,uint32_t dataLen)269 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
270 {
271 DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
272 }
273
OnBind(int32_t socket,PeerSocketInfo info)274 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
275 {
276 CALL_INFO_TRACE;
277 std::lock_guard guard(lock_);
278 std::string networkId = info.networkId;
279 FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
280 if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
281 if (iter->second.socket_ == socket) {
282 FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
283 Utility::Anonymize(networkId).c_str());
284 return;
285 }
286 FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
287 sessions_.erase(iter);
288 }
289 ConfigTcpAlive(socket);
290 sessions_.emplace(networkId, Session(socket));
291
292 for (const auto &item : observers_) {
293 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
294 if (observer != nullptr) {
295 FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
296 observer->OnBind(networkId);
297 }
298 }
299 }
300
OnShutdown(int32_t socket,ShutdownReason reason)301 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
302 {
303 CALL_INFO_TRACE;
304 std::lock_guard guard(lock_);
305 auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
306 [socket](const auto &item) {
307 return (item.second.socket_ == socket);
308 });
309 if (iter == sessions_.cend()) {
310 FI_HILOGD("Session(%{public}d) is not bound", socket);
311 return;
312 }
313 std::string networkId = iter->first;
314 sessions_.erase(iter);
315 FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
316
317 for (const auto &item : observers_) {
318 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
319 if (observer != nullptr) {
320 FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
321 socket, Utility::Anonymize(networkId).c_str());
322 observer->OnShutdown(networkId);
323 }
324 }
325 }
326
OnBytes(int32_t socket,const void * data,uint32_t dataLen)327 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
328 {
329 CALL_DEBUG_ENTER;
330 std::lock_guard guard(lock_);
331 auto iter = std::find_if(sessions_.begin(), sessions_.end(),
332 [socket](const auto &item) {
333 return (item.second.socket_ == socket);
334 });
335 if (iter == sessions_.end()) {
336 FI_HILOGE("Invalid socket: %{public}d", socket);
337 return;
338 }
339 const std::string networkId = iter->first;
340
341 if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
342 CircleStreamBuffer &circleBuffer = iter->second.buffer_;
343
344 if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
345 FI_HILOGE("Failed to write buffer");
346 }
347 HandleSessionData(networkId, circleBuffer);
348 } else {
349 HandleRawData(networkId, data, dataLen);
350 }
351 }
352
InitSocket(SocketInfo info,int32_t socketType,int32_t & socket)353 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
354 {
355 CALL_INFO_TRACE;
356 socket = ::Socket(info);
357 if (socket < 0) {
358 FI_HILOGE("DSOFTBUS::Socket failed");
359 return RET_ERR;
360 }
361 QosTV socketQos[] {
362 { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
363 { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
364 { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
365 };
366 ISocketListener listener {
367 .OnBind = OnBindLink,
368 .OnShutdown = OnShutdownLink,
369 .OnBytes = OnBytesAvailable,
370 };
371 int32_t ret { -1 };
372
373 if (socketType == SOCKET_SERVER) {
374 ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
375 if (ret != 0) {
376 FI_HILOGE("DSOFTBUS::Listen failed");
377 }
378 } else if (socketType == SOCKET_CLIENT) {
379 ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
380 if (ret != 0) {
381 FI_HILOGE("DSOFTBUS::Bind failed");
382 }
383 }
384 if (ret != 0) {
385 ::Shutdown(socket);
386 socket = -1;
387 return ret;
388 }
389 return RET_OK;
390 }
391
SetupServer()392 int32_t DSoftbusAdapterImpl::SetupServer()
393 {
394 CALL_INFO_TRACE;
395 if (socketFd_ > 0) {
396 return RET_OK;
397 }
398 char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
399 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
400 FI_HILOGI("Server session name: \'%{public}s\'", name);
401 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
402 SocketInfo info {
403 .name = name,
404 .pkgName = pkgName,
405 .dataType = DATA_TYPE_BYTES
406 };
407 int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
408 if (ret != RET_OK) {
409 FI_HILOGE("Failed to setup server");
410 return ret;
411 }
412 return RET_OK;
413 }
414
ShutdownServer()415 void DSoftbusAdapterImpl::ShutdownServer()
416 {
417 CALL_INFO_TRACE;
418 CloseAllSessionsLocked();
419 if (socketFd_ > 0) {
420 ::Shutdown(socketFd_);
421 socketFd_ = -1;
422 }
423 }
424
OpenSessionLocked(const std::string & networkId)425 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
426 {
427 CALL_INFO_TRACE;
428 if (sessions_.find(networkId) != sessions_.end()) {
429 FI_HILOGD("InputSoftbus session has already opened");
430 return RET_OK;
431 }
432 std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
433 char name[DEVICE_NAME_SIZE_MAX] {};
434 if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
435 FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
436 return RET_ERR;
437 }
438 char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
439 char peerNetworkId[PKG_NAME_SIZE_MAX] {};
440 if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
441 FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
442 return RET_ERR;
443 }
444 char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
445 FI_HILOGI("Client session name: \'%{public}s\'", name);
446 FI_HILOGI("Peer name: \'%{public}s\'", peerName);
447 FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
448 FI_HILOGI("Package name: \'%{public}s\'", pkgName);
449 SocketInfo info {
450 .name = name,
451 .peerName = peerName,
452 .peerNetworkId = peerNetworkId,
453 .pkgName = pkgName,
454 .dataType = DATA_TYPE_BYTES
455 };
456 int32_t socket { -1 };
457
458 int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
459 if (ret != RET_OK) {
460 FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
461 return ret;
462 }
463 ConfigTcpAlive(socket);
464
465 sessions_.emplace(networkId, Session(socket));
466 OnConnectedLocked(networkId);
467 return RET_OK;
468 }
469
OnConnectedLocked(const std::string & networkId)470 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
471 {
472 CALL_INFO_TRACE;
473 for (const auto &item : observers_) {
474 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
475 CHKPC(observer);
476 FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
477 observer->OnConnected(networkId);
478 }
479 }
480
CloseAllSessionsLocked()481 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
482 {
483 std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
484 ::Shutdown(item.second.socket_);
485 FI_HILOGI("Shutdown connection with \'%{public}s\'", Utility::Anonymize(item.first).c_str());
486 });
487 sessions_.clear();
488 }
489
ConfigTcpAlive(int32_t socket)490 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
491 {
492 CALL_DEBUG_ENTER;
493 if (socket < 0) {
494 FI_HILOGW("Config tcp alive, invalid sessionId");
495 return;
496 }
497 int32_t handle { -1 };
498 int32_t result = GetSessionHandle(socket, &handle);
499 if (result != RET_OK) {
500 FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
501 return;
502 }
503 int32_t keepAliveTimeout { 10 };
504 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
505 if (result != RET_OK) {
506 FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
507 return;
508 }
509 int32_t keepAliveCount { 5 };
510 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
511 if (result != RET_OK) {
512 FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
513 return;
514 }
515 int32_t interval { 1 };
516 result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
517 if (result != RET_OK) {
518 FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
519 return;
520 }
521 int32_t enable { 1 };
522 result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
523 if (result != RET_OK) {
524 FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
525 return;
526 }
527 int32_t TimeoutMs { 15000 };
528 result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
529 if (result != RET_OK) {
530 FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
531 return;
532 }
533 }
534
HandleSessionData(const std::string & networkId,CircleStreamBuffer & circleBuffer)535 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
536 {
537 CALL_DEBUG_ENTER;
538 while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
539 const char *buf = circleBuffer.ReadBuf();
540 const PackHead *head = reinterpret_cast<const PackHead *>(buf);
541
542 if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
543 FI_HILOGE("Corrupted net packet");
544 break;
545 }
546 if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
547 FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
548 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
549 break;
550 }
551 NetPacket packet(head->idMsg);
552
553 if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
554 FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
555 break;
556 }
557 circleBuffer.SeekReadPos(packet.GetPacketLength());
558 HandlePacket(networkId, packet);
559 }
560 }
561
HandlePacket(const std::string & networkId,NetPacket & packet)562 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
563 {
564 CALL_DEBUG_ENTER;
565 for (const auto &item : observers_) {
566 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
567 if ((observer != nullptr) &&
568 observer->OnPacket(networkId, packet)) {
569 return;
570 }
571 }
572 }
573
HandleRawData(const std::string & networkId,const void * data,uint32_t dataLen)574 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
575 {
576 CALL_DEBUG_ENTER;
577 for (const auto &item : observers_) {
578 std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
579 if ((observer != nullptr) &&
580 observer->OnRawData(networkId, data, dataLen)) {
581 return;
582 }
583 }
584 }
585 } // namespace DeviceStatus
586 } // namespace Msdp
587 } // namespace OHOS
588