1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsoftbus_adapter_impl.h"
17 
18 #ifdef ENABLE_PERFORMANCE_CHECK
19 #include <chrono>
20 #endif // ENABLE_PERFORMANCE_CHECK
21 
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 
25 #include "device_manager.h"
26 #include "dfs_session.h"
27 #include "securec.h"
28 #include "softbus_bus_center.h"
29 #include "softbus_error_code.h"
30 
31 #include "devicestatus_define.h"
32 #include "i_ddm_adapter.h"
33 #include "utility.h"
34 
35 #undef LOG_TAG
36 #define LOG_TAG "DSoftbusAdapterImpl"
37 
38 namespace OHOS {
39 namespace Msdp {
40 namespace DeviceStatus {
41 namespace {
42 #define SERVER_SESSION_NAME "ohos.msdp.device_status.intention.serversession"
43 #define D_DEV_MGR DistributedHardware::DeviceManager::GetInstance()
44 const std::string CLIENT_SESSION_NAME { "ohos.msdp.device_status.intention.clientsession." };
45 constexpr size_t BIND_STRING_LENGTH { 15 };
46 constexpr size_t DEVICE_NAME_SIZE_MAX { 256 };
47 constexpr size_t PKG_NAME_SIZE_MAX { 65 };
48 constexpr int32_t MIN_BW { 80 * 1024 * 1024 };
49 constexpr int32_t LATENCY { 3000 };
50 constexpr int32_t SOCKET_SERVER { 0 };
51 constexpr int32_t SOCKET_CLIENT { 1 };
52 constexpr int32_t INVALID_SOCKET { -1 };
53 }
54 
55 std::mutex DSoftbusAdapterImpl::mutex_;
56 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::instance_;
57 
GetInstance()58 std::shared_ptr<DSoftbusAdapterImpl> DSoftbusAdapterImpl::GetInstance()
59 {
60     if (instance_ == nullptr) {
61         std::lock_guard<std::mutex> lock(mutex_);
62         if (instance_ == nullptr) {
63             instance_ = std::make_shared<DSoftbusAdapterImpl>();
64         }
65     }
66     return instance_;
67 }
68 
DestroyInstance()69 void DSoftbusAdapterImpl::DestroyInstance()
70 {
71     std::lock_guard<std::mutex> lock(mutex_);
72     instance_.reset();
73 }
74 
~DSoftbusAdapterImpl()75 DSoftbusAdapterImpl::~DSoftbusAdapterImpl()
76 {
77     Disable();
78 }
79 
Enable()80 int32_t DSoftbusAdapterImpl::Enable()
81 {
82     CALL_DEBUG_ENTER;
83     std::lock_guard guard(lock_);
84     return SetupServer();
85 }
86 
Disable()87 void DSoftbusAdapterImpl::Disable()
88 {
89     CALL_DEBUG_ENTER;
90     std::lock_guard guard(lock_);
91     ShutdownServer();
92 }
93 
AddObserver(std::shared_ptr<IDSoftbusObserver> observer)94 void DSoftbusAdapterImpl::AddObserver(std::shared_ptr<IDSoftbusObserver> observer)
95 {
96     CALL_DEBUG_ENTER;
97     std::lock_guard guard(lock_);
98     CHKPV(observer);
99     observers_.erase(Observer());
100     observers_.emplace(observer);
101 }
102 
RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)103 void DSoftbusAdapterImpl::RemoveObserver(std::shared_ptr<IDSoftbusObserver> observer)
104 {
105     CALL_DEBUG_ENTER;
106     std::lock_guard guard(lock_);
107     if (auto iter = observers_.find(Observer(observer)); iter != observers_.end()) {
108         observers_.erase(iter);
109     }
110     observers_.erase(Observer());
111 }
112 
CheckDeviceOnline(const std::string & networkId)113 bool DSoftbusAdapterImpl::CheckDeviceOnline(const std::string &networkId)
114 {
115     CALL_DEBUG_ENTER;
116     std::vector<DistributedHardware::DmDeviceInfo> deviceList;
117     if (D_DEV_MGR.GetTrustedDeviceList(FI_PKG_NAME, "", deviceList) != RET_OK) {
118         FI_HILOGE("GetTrustedDeviceList failed");
119         return false;
120     }
121     if (deviceList.empty()) {
122         FI_HILOGE("Trust device list size is invalid");
123         return false;
124     }
125     for (const auto &deviceInfo : deviceList) {
126         if (std::string(deviceInfo.networkId) == networkId) {
127             return true;
128         }
129     }
130     return false;
131 }
132 
OpenSession(const std::string & networkId)133 int32_t DSoftbusAdapterImpl::OpenSession(const std::string &networkId)
134 {
135     CALL_DEBUG_ENTER;
136     std::lock_guard guard(lock_);
137 #ifdef ENABLE_PERFORMANCE_CHECK
138     auto startStamp = std::chrono::steady_clock::now();
139 #endif // ENABLE_PERFORMANCE_CHECK
140     if (!DSoftbusAdapterImpl::CheckDeviceOnline(networkId)) {
141         FI_HILOGE("CheckDeviceOnline failed, networkId:%{public}s", Utility::Anonymize(networkId).c_str());
142         return RET_ERR;
143     }
144     int32_t ret = OpenSessionLocked(networkId);
145 #ifdef ENABLE_PERFORMANCE_CHECK
146     auto openSessionDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
147         std::chrono::steady_clock::now() - startStamp).count();
148     FI_HILOGI("[PERF] OpenSessionLocked ret:%{public}d, elapsed: %{public}lld ms", ret, openSessionDuration);
149 #endif // ENABLE_PERFORMANCE_CHECK
150     return ret;
151 }
152 
CloseSession(const std::string & networkId)153 void DSoftbusAdapterImpl::CloseSession(const std::string &networkId)
154 {
155     CALL_INFO_TRACE;
156     std::lock_guard guard(lock_);
157     if (auto iter = sessions_.find(networkId); iter != sessions_.end()) {
158         ::Shutdown(iter->second.socket_);
159         sessions_.erase(iter);
160         FI_HILOGI("Shutdown session(%{public}d, %{public}s)", iter->second.socket_,
161             Utility::Anonymize(networkId).c_str());
162     }
163 }
164 
CloseAllSessions()165 void DSoftbusAdapterImpl::CloseAllSessions()
166 {
167     CALL_INFO_TRACE;
168     std::lock_guard guard(lock_);
169     CloseAllSessionsLocked();
170 }
171 
FindConnection(const std::string & networkId)172 int32_t DSoftbusAdapterImpl::FindConnection(const std::string &networkId)
173 {
174     CALL_DEBUG_ENTER;
175     auto iter = sessions_.find(networkId);
176     return (iter != sessions_.end() ? iter->second.socket_ : -1);
177 }
178 
SendPacket(const std::string & networkId,NetPacket & packet)179 int32_t DSoftbusAdapterImpl::SendPacket(const std::string &networkId, NetPacket &packet)
180 {
181     CALL_DEBUG_ENTER;
182     std::lock_guard guard(lock_);
183     int32_t socket = FindConnection(networkId);
184     if (socket < 0) {
185         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
186         return RET_ERR;
187     }
188     StreamBuffer buffer;
189     if (!packet.MakeData(buffer)) {
190         FI_HILOGE("Failed to buffer packet");
191         return RET_ERR;
192     }
193     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
194         FI_HILOGE("Packet is too large");
195         return RET_ERR;
196     }
197     int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size());
198     if (ret != SOFTBUS_OK) {
199         FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
200         return RET_ERR;
201     }
202     return RET_OK;
203 }
204 
SendParcel(const std::string & networkId,Parcel & parcel)205 int32_t DSoftbusAdapterImpl::SendParcel(const std::string &networkId, Parcel &parcel)
206 {
207     CALL_DEBUG_ENTER;
208     std::lock_guard guard(lock_);
209     int32_t socket = FindConnection(networkId);
210     if (socket < 0) {
211         FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(networkId).c_str());
212         return RET_ERR;
213     }
214     int32_t ret = ::SendBytes(socket, reinterpret_cast<const void*>(parcel.GetData()), parcel.GetDataSize());
215     if (ret != SOFTBUS_OK) {
216         FI_HILOGE("DSOFTBUS::SendBytes fail, error:%{public}d", ret);
217         return RET_ERR;
218     }
219     return RET_OK;
220 }
221 
BroadcastPacket(NetPacket & packet)222 int32_t DSoftbusAdapterImpl::BroadcastPacket(NetPacket &packet)
223 {
224     CALL_INFO_TRACE;
225     std::lock_guard guard(lock_);
226     if (sessions_.empty()) {
227         FI_HILOGE("No session connected");
228         return RET_ERR;
229     }
230     StreamBuffer buffer;
231     if (!packet.MakeData(buffer)) {
232         FI_HILOGE("Failed to buffer packet");
233         return RET_ERR;
234     }
235     if (buffer.Size() > MAX_PACKET_BUF_SIZE) {
236         FI_HILOGE("Packet is too large");
237         return RET_ERR;
238     }
239     for (const auto &elem : sessions_) {
240         int32_t socket = elem.second.socket_;
241         if (socket < 0) {
242             FI_HILOGE("Node \'%{public}s\' is not connected", Utility::Anonymize(elem.first).c_str());
243             continue;
244         }
245         if (int32_t ret = ::SendBytes(socket, buffer.Data(), buffer.Size()); ret != SOFTBUS_OK) {
246             FI_HILOGE("DSOFTBUS::SendBytes fail (%{public}d)", ret);
247             continue;
248         }
249         FI_HILOGI("BroadcastPacket to networkId:%{public}s success", Utility::Anonymize(elem.first).c_str());
250     }
251     return RET_OK;
252 }
253 
HasSessionExisted(const std::string & networkId)254 bool DSoftbusAdapterImpl::HasSessionExisted(const std::string &networkId)
255 {
256     CALL_DEBUG_ENTER;
257     auto iter = sessions_.find(networkId);
258     return (iter != sessions_.end() && iter->second.socket_ != INVALID_SOCKET);
259 }
260 
OnBindLink(int32_t socket,PeerSocketInfo info)261 static void OnBindLink(int32_t socket, PeerSocketInfo info)
262 {
263     DSoftbusAdapterImpl::GetInstance()->OnBind(socket, info);
264 }
265 
OnShutdownLink(int32_t socket,ShutdownReason reason)266 static void OnShutdownLink(int32_t socket, ShutdownReason reason)
267 {
268     DSoftbusAdapterImpl::GetInstance()->OnShutdown(socket, reason);
269 }
270 
OnBytesAvailable(int32_t socket,const void * data,uint32_t dataLen)271 static void OnBytesAvailable(int32_t socket, const void *data, uint32_t dataLen)
272 {
273     DSoftbusAdapterImpl::GetInstance()->OnBytes(socket, data, dataLen);
274 }
275 
OnBind(int32_t socket,PeerSocketInfo info)276 void DSoftbusAdapterImpl::OnBind(int32_t socket, PeerSocketInfo info)
277 {
278     CALL_INFO_TRACE;
279     std::lock_guard guard(lock_);
280     std::string networkId = info.networkId;
281     FI_HILOGI("Bind session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
282     if (auto iter = sessions_.find(networkId); iter != sessions_.cend()) {
283         if (iter->second.socket_ == socket) {
284             FI_HILOGI("(%{public}d, %{public}s) has bound", iter->second.socket_,
285                 Utility::Anonymize(networkId).c_str());
286             return;
287         }
288         FI_HILOGI("(%{public}d, %{public}s) need erase", iter->second.socket_, Utility::Anonymize(networkId).c_str());
289         sessions_.erase(iter);
290     }
291     ConfigTcpAlive(socket);
292     sessions_.emplace(networkId, Session(socket));
293 
294     for (const auto &item : observers_) {
295         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
296         if (observer != nullptr) {
297             FI_HILOGD("Notify binding (%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
298             observer->OnBind(networkId);
299         }
300     }
301 }
302 
OnShutdown(int32_t socket,ShutdownReason reason)303 void DSoftbusAdapterImpl::OnShutdown(int32_t socket, ShutdownReason reason)
304 {
305     CALL_INFO_TRACE;
306     std::lock_guard guard(lock_);
307     auto iter = std::find_if(sessions_.cbegin(), sessions_.cend(),
308         [socket](const auto &item) {
309             return (item.second.socket_ == socket);
310         });
311     if (iter == sessions_.cend()) {
312         FI_HILOGD("Session(%{public}d) is not bound", socket);
313         return;
314     }
315     std::string networkId = iter->first;
316     sessions_.erase(iter);
317     FI_HILOGI("Shutdown session(%{public}d, %{public}s)", socket, Utility::Anonymize(networkId).c_str());
318 
319     for (const auto &item : observers_) {
320         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
321         if (observer != nullptr) {
322             FI_HILOGD("Notify shutdown of session(%{public}d, %{public}s)",
323                 socket, Utility::Anonymize(networkId).c_str());
324             observer->OnShutdown(networkId);
325         }
326     }
327 }
328 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)329 void DSoftbusAdapterImpl::OnBytes(int32_t socket, const void *data, uint32_t dataLen)
330 {
331     CALL_DEBUG_ENTER;
332     std::lock_guard guard(lock_);
333     auto iter = std::find_if(sessions_.begin(), sessions_.end(),
334         [socket](const auto &item) {
335             return (item.second.socket_ == socket);
336         });
337     if (iter == sessions_.end()) {
338         FI_HILOGE("Invalid socket: %{public}d", socket);
339         return;
340     }
341     const std::string networkId = iter->first;
342 
343     if (*reinterpret_cast<const uint32_t*>(data) < static_cast<uint32_t>(MessageId::MAX_MESSAGE_ID)) {
344         CircleStreamBuffer &circleBuffer = iter->second.buffer_;
345 
346         if (!circleBuffer.Write(reinterpret_cast<const char*>(data), dataLen)) {
347             FI_HILOGE("Failed to write buffer");
348         }
349         HandleSessionData(networkId, circleBuffer);
350     } else {
351         HandleRawData(networkId, data, dataLen);
352     }
353 }
354 
InitSocket(SocketInfo info,int32_t socketType,int32_t & socket)355 int32_t DSoftbusAdapterImpl::InitSocket(SocketInfo info, int32_t socketType, int32_t &socket)
356 {
357     CALL_INFO_TRACE;
358     socket = ::Socket(info);
359     if (socket < 0) {
360         FI_HILOGE("DSOFTBUS::Socket failed");
361         return RET_ERR;
362     }
363     QosTV socketQos[] {
364         { .qos = QOS_TYPE_MIN_BW, .value = MIN_BW },
365         { .qos = QOS_TYPE_MAX_LATENCY, .value = LATENCY },
366         { .qos = QOS_TYPE_MIN_LATENCY, .value = LATENCY },
367     };
368     ISocketListener listener {
369         .OnBind = OnBindLink,
370         .OnShutdown = OnShutdownLink,
371         .OnBytes = OnBytesAvailable,
372     };
373     int32_t ret { -1 };
374 
375     if (socketType == SOCKET_SERVER) {
376         ret = ::Listen(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
377         if (ret != 0) {
378             FI_HILOGE("DSOFTBUS::Listen failed");
379         }
380     } else if (socketType == SOCKET_CLIENT) {
381         ret = ::Bind(socket, socketQos, sizeof(socketQos) / sizeof(socketQos[0]), &listener);
382         if (ret != 0) {
383             FI_HILOGE("DSOFTBUS::Bind failed");
384         }
385     }
386     if (ret != 0) {
387         ::Shutdown(socket);
388         socket = -1;
389         return ret;
390     }
391     return RET_OK;
392 }
393 
SetupServer()394 int32_t DSoftbusAdapterImpl::SetupServer()
395 {
396     CALL_INFO_TRACE;
397     if (socketFd_ > 0) {
398         return RET_OK;
399     }
400     char name[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
401     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
402     FI_HILOGI("Server session name: \'%{public}s\'", name);
403     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
404     SocketInfo info {
405         .name = name,
406         .pkgName = pkgName,
407         .dataType = DATA_TYPE_BYTES
408     };
409     int32_t ret = InitSocket(info, SOCKET_SERVER, socketFd_);
410     if (ret != RET_OK) {
411         FI_HILOGE("Failed to setup server");
412         return ret;
413     }
414     return RET_OK;
415 }
416 
ShutdownServer()417 void DSoftbusAdapterImpl::ShutdownServer()
418 {
419     CALL_INFO_TRACE;
420     CloseAllSessionsLocked();
421     if (socketFd_ > 0) {
422         ::Shutdown(socketFd_);
423         socketFd_ = -1;
424     }
425 }
426 
OpenSessionLocked(const std::string & networkId)427 int32_t DSoftbusAdapterImpl::OpenSessionLocked(const std::string &networkId)
428 {
429     CALL_DEBUG_ENTER;
430     if (sessions_.find(networkId) != sessions_.end()) {
431         FI_HILOGD("InputSoftbus session has already opened");
432         return RET_OK;
433     }
434     std::string sessionName = CLIENT_SESSION_NAME + networkId.substr(0, BIND_STRING_LENGTH);
435     char name[DEVICE_NAME_SIZE_MAX] {};
436     if (strcpy_s(name, sizeof(name), sessionName.c_str()) != EOK) {
437         FI_HILOGE("Invalid name:%{public}s", sessionName.c_str());
438         return RET_ERR;
439     }
440     char peerName[DEVICE_NAME_SIZE_MAX] { SERVER_SESSION_NAME };
441     char peerNetworkId[PKG_NAME_SIZE_MAX] {};
442     if (strcpy_s(peerNetworkId, sizeof(peerNetworkId), networkId.c_str()) != EOK) {
443         FI_HILOGE("Invalid peerNetworkId:%{public}s", Utility::Anonymize(networkId).c_str());
444         return RET_ERR;
445     }
446     char pkgName[PKG_NAME_SIZE_MAX] { FI_PKG_NAME };
447     FI_HILOGI("Client session name: \'%{public}s\'", name);
448     FI_HILOGI("Peer name: \'%{public}s\'", peerName);
449     FI_HILOGI("Peer network id: \'%{public}s\'", Utility::Anonymize(peerNetworkId).c_str());
450     FI_HILOGI("Package name: \'%{public}s\'", pkgName);
451     SocketInfo info {
452         .name = name,
453         .peerName = peerName,
454         .peerNetworkId = peerNetworkId,
455         .pkgName = pkgName,
456         .dataType = DATA_TYPE_BYTES
457     };
458     int32_t socket { -1 };
459 
460     int32_t ret = InitSocket(info, SOCKET_CLIENT, socket);
461     if (ret != RET_OK) {
462         FI_HILOGE("Failed to bind %{public}s", Utility::Anonymize(networkId).c_str());
463         return ret;
464     }
465     ConfigTcpAlive(socket);
466 
467     sessions_.emplace(networkId, Session(socket));
468     OnConnectedLocked(networkId);
469     return RET_OK;
470 }
471 
OnConnectedLocked(const std::string & networkId)472 void DSoftbusAdapterImpl::OnConnectedLocked(const std::string &networkId)
473 {
474     CALL_INFO_TRACE;
475     for (const auto &item : observers_) {
476         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
477         CHKPC(observer);
478         FI_HILOGI("Notify connected to networkId:%{public}s", Utility::Anonymize(networkId).c_str());
479         observer->OnConnected(networkId);
480     }
481 }
482 
CloseAllSessionsLocked()483 void DSoftbusAdapterImpl::CloseAllSessionsLocked()
484 {
485     std::for_each(sessions_.begin(), sessions_.end(), [](const auto &item) {
486         ::Shutdown(item.second.socket_);
487         FI_HILOGI("Shutdown connection with \'%{public}s\'", Utility::Anonymize(item.first).c_str());
488     });
489     sessions_.clear();
490 }
491 
ConfigTcpAlive(int32_t socket)492 void DSoftbusAdapterImpl::ConfigTcpAlive(int32_t socket)
493 {
494     CALL_DEBUG_ENTER;
495     if (socket < 0) {
496         FI_HILOGW("Config tcp alive, invalid sessionId");
497         return;
498     }
499     int32_t handle { -1 };
500     int32_t result = GetSessionHandle(socket, &handle);
501     if (result != RET_OK) {
502         FI_HILOGE("Failed to get the session handle, socketId:%{public}d, handle:%{public}d", socket, handle);
503         return;
504     }
505     int32_t keepAliveTimeout { 10 };
506     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPIDLE, &keepAliveTimeout, sizeof(keepAliveTimeout));
507     if (result != RET_OK) {
508         FI_HILOGE("Config tcp alive, setsockopt set idle failed, result:%{public}d", result);
509         return;
510     }
511     int32_t keepAliveCount { 5 };
512     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveCount, sizeof(keepAliveCount));
513     if (result != RET_OK) {
514         FI_HILOGE("Config tcp alive, setsockopt set cnt failed");
515         return;
516     }
517     int32_t interval { 1 };
518     result = setsockopt(handle, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
519     if (result != RET_OK) {
520         FI_HILOGE("Config tcp alive, setsockopt set intvl failed");
521         return;
522     }
523     int32_t enable { 1 };
524     result = setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable));
525     if (result != RET_OK) {
526         FI_HILOGE("Config tcp alive, setsockopt enable alive failed");
527         return;
528     }
529     int32_t TimeoutMs { 15000 };
530     result = setsockopt(handle, IPPROTO_TCP, TCP_USER_TIMEOUT, &TimeoutMs, sizeof(TimeoutMs));
531     if (result != RET_OK) {
532         FI_HILOGE("Failed to enable setsockopt for timeout, %{public}d", result);
533         return;
534     }
535 }
536 
HandleSessionData(const std::string & networkId,CircleStreamBuffer & circleBuffer)537 void DSoftbusAdapterImpl::HandleSessionData(const std::string &networkId, CircleStreamBuffer &circleBuffer)
538 {
539     CALL_DEBUG_ENTER;
540     while (circleBuffer.ResidualSize() >= static_cast<int32_t>(sizeof(PackHead))) {
541         const char *buf = circleBuffer.ReadBuf();
542         const PackHead *head = reinterpret_cast<const PackHead *>(buf);
543 
544         if ((head->size < 0) || (static_cast<size_t>(head->size) > MAX_PACKET_BUF_SIZE)) {
545             FI_HILOGE("Corrupted net packet");
546             break;
547         }
548         if ((head->size + static_cast<int32_t>(sizeof(PackHead))) > circleBuffer.ResidualSize()) {
549             FI_HILOGI("Incomplete package, package size:%{public}d, residual size:%{public}d",
550                 (head->size + static_cast<int32_t>(sizeof(PackHead))), circleBuffer.ResidualSize());
551             break;
552         }
553         NetPacket packet(head->idMsg);
554 
555         if ((head->size > 0) && !packet.Write(&buf[sizeof(PackHead)], head->size)) {
556             FI_HILOGE("Failed to fill packet, PacketSize:%{public}d", head->size);
557             break;
558         }
559         circleBuffer.SeekReadPos(packet.GetPacketLength());
560         HandlePacket(networkId, packet);
561     }
562 }
563 
HandlePacket(const std::string & networkId,NetPacket & packet)564 void DSoftbusAdapterImpl::HandlePacket(const std::string &networkId, NetPacket &packet)
565 {
566     CALL_DEBUG_ENTER;
567     for (const auto &item : observers_) {
568         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
569         if ((observer != nullptr) &&
570             observer->OnPacket(networkId, packet)) {
571             return;
572         }
573     }
574 }
575 
HandleRawData(const std::string & networkId,const void * data,uint32_t dataLen)576 void DSoftbusAdapterImpl::HandleRawData(const std::string &networkId, const void *data, uint32_t dataLen)
577 {
578     CALL_DEBUG_ENTER;
579     for (const auto &item : observers_) {
580         std::shared_ptr<IDSoftbusObserver> observer = item.Lock();
581         if ((observer != nullptr) &&
582             observer->OnRawData(networkId, data, dataLen)) {
583             return;
584         }
585     }
586 }
587 } // namespace DeviceStatus
588 } // namespace Msdp
589 } // namespace OHOS
590