1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_client.h"
17 
18 #include "include/util.h"
19 
20 #undef LOG_TAG
21 #define LOG_TAG "StreamClient"
22 
23 namespace OHOS {
24 namespace Msdp {
25 namespace DeviceStatus {
26 
StreamClient()27 StreamClient::StreamClient()
28 {
29     CALL_DEBUG_ENTER;
30 }
31 
~StreamClient()32 StreamClient::~StreamClient()
33 {
34     CALL_DEBUG_ENTER;
35 }
36 
StartConnect()37 int32_t StreamClient::StartConnect()
38 {
39     CALL_DEBUG_ENTER;
40     if (Socket() < 0) {
41         FI_HILOGE("Socket failed");
42         return RET_ERR;
43     }
44     OnConnected();
45     return RET_OK;
46 }
47 
SendMsg(const char * buf,size_t size) const48 bool StreamClient::SendMsg(const char *buf, size_t size) const
49 {
50     CHKPF(buf);
51     if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) {
52         FI_HILOGE("Stream buffer size out of range");
53         return false;
54     }
55     if (fd_ < 0) {
56         FI_HILOGE("The fd_ is less than 0");
57         return false;
58     }
59 
60     int32_t retryCount = 0;
61     int32_t idx = 0;
62     const int32_t bufSize = static_cast<int32_t>(size);
63     int32_t remSize = bufSize;
64     while (remSize > 0 && retryCount < SEND_RETRY_LIMIT) {
65         retryCount += 1;
66         ssize_t number = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL);
67         if (number < 0) {
68             if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
69                 FI_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno);
70                 continue;
71             }
72             FI_HILOGE("Send return failed, error:%{public}d, fd:%{public}d", errno, fd_);
73             return false;
74         }
75         idx += number;
76         remSize -= number;
77         if (remSize > 0) {
78             usleep(SEND_RETRY_SLEEP_TIME);
79         }
80     }
81     if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) {
82         FI_HILOGE("Too many times to send :%{public}d/%{public}d, size:%{public}d/%{public}d, fd:%{public}d",
83             retryCount, SEND_RETRY_LIMIT, idx, bufSize, fd_);
84         return false;
85     }
86     return true;
87 }
88 
SendMsg(const NetPacket & pkt) const89 bool StreamClient::SendMsg(const NetPacket &pkt) const
90 {
91     if (pkt.ChkRWError()) {
92         FI_HILOGE("Read and write status is error");
93         return false;
94     }
95     StreamBuffer buf;
96     if (!pkt.MakeData(buf)) {
97         FI_HILOGE("Failed to buffer packet");
98         return false;
99     }
100     return SendMsg(buf.Data(), buf.Size());
101 }
102 
StartClient(MsgClientFunCallback fun)103 bool StreamClient::StartClient(MsgClientFunCallback fun)
104 {
105     CALL_DEBUG_ENTER;
106     if (isRunning_ || hasConnected_) {
107         FI_HILOGE("Client is connected or started");
108         return false;
109     }
110     hasClient_ = true;
111     recvFun_ = fun;
112     if (StartConnect() < 0) {
113         FI_HILOGW("Client connection failed, Try again later");
114     }
115     return true;
116 }
117 
Stop()118 void StreamClient::Stop()
119 {
120     CALL_DEBUG_ENTER;
121     hasClient_ = false;
122     Close();
123 }
124 } // namespace DeviceStatus
125 } // namespace Msdp
126 } // namespace OHOS