1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_socket.h"
17
18 #include <cinttypes>
19
20 #include "sensor_errors.h"
21
22 namespace OHOS {
23 namespace Sensors {
24 #ifndef OHOS_BUILD_ENABLE_RUST
25 #undef LOG_TAG
26 #define LOG_TAG "StreamSocket"
27 #endif // OHOS_BUILD_ENABLE_RUST
28
StreamSocket()29 StreamSocket::StreamSocket() {}
30
~StreamSocket()31 StreamSocket::~StreamSocket()
32 {
33 #ifdef OHOS_BUILD_ENABLE_RUST
34 StreamSocketClose(streamSocketPtr_.get());
35 #else
36 Close();
37 #endif // OHOS_BUILD_ENABLE_RUST
38 }
39
40 #ifndef OHOS_BUILD_ENABLE_RUST
OnReadPackets(CircleStreamBuffer & circBuf,StreamSocket::PacketCallBackFun callbackFun)41 void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun)
42 {
43 constexpr size_t headSize = sizeof(PackHead);
44 for (size_t i = 0; i < ONCE_PROCESS_NETPACKET_LIMIT; ++i) {
45 const size_t unreadSize = circBuf.UnreadSize();
46 if (unreadSize < headSize) {
47 break;
48 }
49 size_t dataSize = unreadSize - headSize;
50 char *buf = const_cast<char *>(circBuf.ReadBuf());
51 CHKPB(buf);
52 PackHead *head = reinterpret_cast<PackHead *>(buf);
53 CHKPB(head);
54 if (head->size < 0 || head->size > MAX_PACKET_BUF_SIZE) {
55 SEN_HILOGE("Packet header parsing error, and this error cannot be recovered. The buffer will be reset"
56 " head->size:%{public}zu, unreadSize:%{public}zu", head->size, unreadSize);
57 circBuf.Reset();
58 break;
59 }
60 if (head->size > dataSize) {
61 break;
62 }
63 NetPacket pkt(head->idMsg);
64 if ((head->size > 0) && (!pkt.Write(&buf[headSize], head->size))) {
65 SEN_HILOGW("Error writing data in the NetPacket. It will be retried next time. messageid:%{public}d,"
66 "size:%{public}zu", head->idMsg, head->size);
67 break;
68 }
69 if (!circBuf.SeekReadPos(pkt.GetPacketLength())) {
70 SEN_HILOGW("Set read position error, and this error cannot be recovered, and the buffer will be reset"
71 " packetSize:%{public}zu, unreadSize:%{public}zu", pkt.GetPacketLength(), unreadSize);
72 circBuf.Reset();
73 break;
74 }
75 callbackFun(pkt);
76 if (circBuf.IsEmpty()) {
77 circBuf.Reset();
78 break;
79 }
80 }
81 }
82 #endif // OHOS_BUILD_ENABLE_RUST
83
Close()84 void StreamSocket::Close()
85 {
86 #ifdef OHOS_BUILD_ENABLE_RUST
87 StreamSocketClose(streamSocketPtr_.get());
88 #else
89 if (fd_ >= 0) {
90 auto rf = close(fd_);
91 if (rf != 0) {
92 SEN_HILOGE("Socket close failed, rf:%{public}d", rf);
93 }
94 }
95 fd_ = -1;
96 #endif // OHOS_BUILD_ENABLE_RUST
97 }
98
GetFd() const99 int32_t StreamSocket::GetFd() const
100 {
101 #ifdef OHOS_BUILD_ENABLE_RUST
102 return StreamSocketGetFd(streamSocketPtr_.get());
103 #else
104 return fd_;
105 #endif // OHOS_BUILD_ENABLE_RUST
106 }
107 } // namespace Sensors
108 } // namespace OHOS