1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_server.h"
17 
18 #include <cinttypes>
19 #include <list>
20 
21 #include <sys/socket.h>
22 
23 #include "devicestatus_service.h"
24 #include "fi_log.h"
25 
26 #undef LOG_TAG
27 #define LOG_TAG "StreamServer"
28 
29 namespace OHOS {
30 namespace Msdp {
31 namespace DeviceStatus {
32 
~StreamServer()33 StreamServer::~StreamServer()
34 {
35     CALL_DEBUG_ENTER;
36     UdsStop();
37 }
38 
UdsStop()39 void StreamServer::UdsStop()
40 {
41     if (epollFd_ != -1) {
42         if (close(epollFd_) < 0) {
43             FI_HILOGE("Close epoll fd failed, error:%{public}s, epollFd_:%{public}d", strerror(errno), epollFd_);
44         }
45         epollFd_ = -1;
46     }
47 
48     for (const auto &item : sessionss_) {
49         item.second->Close();
50     }
51     sessionss_.clear();
52 }
53 
GetClientFd(int32_t pid) const54 int32_t StreamServer::GetClientFd(int32_t pid) const
55 {
56     auto it = idxPids_.find(pid);
57     if (it == idxPids_.end()) {
58         return INVALID_FD;
59     }
60     return it->second;
61 }
62 
GetClientPid(int32_t fd) const63 int32_t StreamServer::GetClientPid(int32_t fd) const
64 {
65     auto it = sessionss_.find(fd);
66     if (it == sessionss_.end()) {
67         return INVALID_PID;
68     }
69     return it->second->GetPid();
70 }
71 
SendMsg(int32_t fd,NetPacket & pkt)72 bool StreamServer::SendMsg(int32_t fd, NetPacket &pkt)
73 {
74     if (fd < 0) {
75         FI_HILOGE("The fd is less than 0");
76         return false;
77     }
78     auto ses = GetSession(fd);
79     if (ses == nullptr) {
80         FI_HILOGE("The fd:%{public}d not found, The message was discarded, errCode:%{public}d",
81             fd, SESSION_NOT_FOUND);
82         return false;
83     }
84     return ses->SendMsg(pkt);
85 }
86 
Multicast(const std::vector<int32_t> & fdList,NetPacket & pkt)87 void StreamServer::Multicast(const std::vector<int32_t> &fdList, NetPacket &pkt)
88 {
89     for (const auto &item : fdList) {
90         SendMsg(item, pkt);
91     }
92 }
93 
AddSocketPairInfo(const std::string & programName,int32_t moduleType,int32_t uid,int32_t pid,int32_t & serverFd,int32_t & toReturnClientFd,int32_t & tokenType)94 int32_t StreamServer::AddSocketPairInfo(const std::string &programName, int32_t moduleType, int32_t uid, int32_t pid,
95     int32_t &serverFd, int32_t &toReturnClientFd, int32_t &tokenType)
96 {
97     CALL_DEBUG_ENTER;
98     int32_t sockFds[2] = { -1 };
99 
100     if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockFds) != 0) {
101         FI_HILOGE("Call socketpair failed, errno:%{public}d", errno);
102         return RET_ERR;
103     }
104     serverFd = sockFds[0];
105     toReturnClientFd = sockFds[1];
106     if (serverFd < 0 || toReturnClientFd < 0) {
107         FI_HILOGE("Call fcntl failed, errno:%{public}d", errno);
108         return RET_ERR;
109     }
110     int32_t setSockOptResult = SetSockOpt(serverFd, toReturnClientFd, tokenType);
111     if (RET_OK != setSockOptResult) {
112         return setSockOptResult;
113     }
114     SessionPtr sess = nullptr;
115     sess = std::make_shared<StreamSession>(programName, moduleType, serverFd, uid, pid);
116     sess->SetTokenType(tokenType);
117     if (!AddSession(sess)) {
118         FI_HILOGE("AddSession fail errCode:%{public}d", ADD_SESSION_FAIL);
119         return CloseFd(serverFd, toReturnClientFd);
120     }
121     if (AddEpoll(EPOLL_EVENT_SOCKET, serverFd) != RET_OK) {
122         FI_HILOGE("epoll_ctl EPOLL_CTL_ADD failed, errCode:%{public}d", EPOLL_MODIFY_FAIL);
123         return CloseFd(serverFd, toReturnClientFd);
124     }
125     OnConnected(sess);
126     return RET_OK;
127 }
128 
SetSockOpt(int32_t & serverFd,int32_t & toReturnClientFd,int32_t & tokenType)129 int32_t StreamServer::SetSockOpt(int32_t &serverFd, int32_t &toReturnClientFd, int32_t &tokenType)
130 {
131     CALL_DEBUG_ENTER;
132     static constexpr size_t bufferSize = 32 * 1024;
133     static constexpr size_t nativeBufferSize = 64 * 1024;
134 
135     if (setsockopt(serverFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
136         FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
137         return CloseFd(serverFd, toReturnClientFd);
138     }
139     if (setsockopt(serverFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
140         FI_HILOGE("setsockopt serverFd failed, errno:%{public}d", errno);
141         return CloseFd(serverFd, toReturnClientFd);
142     }
143     if (tokenType == TokenType::TOKEN_NATIVE) {
144         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
145             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
146             return CloseFd(serverFd, toReturnClientFd);
147         }
148         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &nativeBufferSize, sizeof(nativeBufferSize)) != 0) {
149             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
150             return CloseFd(serverFd, toReturnClientFd);
151         }
152     } else {
153         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize)) != 0) {
154             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
155             return CloseFd(serverFd, toReturnClientFd);
156         }
157         if (setsockopt(toReturnClientFd, SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize)) != 0) {
158             FI_HILOGE("setsockopt toReturnClientFd failed, errno:%{public}d", errno);
159             return CloseFd(serverFd, toReturnClientFd);
160         }
161     }
162     return RET_OK;
163 }
164 
CloseFd(int32_t & serverFd,int32_t & toReturnClientFd)165 int32_t StreamServer::CloseFd(int32_t &serverFd, int32_t &toReturnClientFd)
166 {
167     if (close(serverFd) < 0) {
168         FI_HILOGE("Close server fd failed, error:%{public}s, serverFd:%{public}d", strerror(errno), serverFd);
169     }
170     serverFd = -1;
171     if (close(toReturnClientFd) < 0) {
172         FI_HILOGE("Close fd failed, error:%{public}s, toReturnClientFd:%{public}d", strerror(errno), toReturnClientFd);
173     }
174     toReturnClientFd = -1;
175     return RET_ERR;
176 }
177 
SetRecvFun(MsgServerFunCallback fun)178 void StreamServer::SetRecvFun(MsgServerFunCallback fun)
179 {
180     recvFun_ = fun;
181 }
182 
ReleaseSession(int32_t fd,epoll_event & ev)183 void StreamServer::ReleaseSession(int32_t fd, epoll_event &ev)
184 {
185     auto secPtr = GetSession(fd);
186     if (secPtr != nullptr) {
187         OnDisconnected(secPtr);
188         DelSession(fd);
189     }
190     if (ev.data.ptr) {
191         free(ev.data.ptr);
192         ev.data.ptr = nullptr;
193     }
194     if (auto it = circleBufs_.find(fd); it != circleBufs_.end()) {
195         circleBufs_.erase(it);
196     }
197     auto DeviceStatusService = DeviceStatus::DelayedSpSingleton<DeviceStatus::DeviceStatusService>::GetInstance();
198     DeviceStatusService->DelEpoll(EPOLL_EVENT_SOCKET, fd);
199     if (close(fd) < 0) {
200         FI_HILOGE("Close fd failed, error:%{public}s, fd:%{public}d", strerror(errno), fd);
201     }
202 }
203 
OnPacket(int32_t fd,NetPacket & pkt)204 void StreamServer::OnPacket(int32_t fd, NetPacket &pkt)
205 {
206     auto sess = GetSession(fd);
207     CHKPV(sess);
208     recvFun_(sess, pkt);
209 }
210 
OnEpollRecv(int32_t fd,epoll_event & ev)211 void StreamServer::OnEpollRecv(int32_t fd, epoll_event &ev)
212 {
213     if (fd < 0) {
214         FI_HILOGE("Invalid fd:%{public}d", fd);
215         return;
216     }
217     auto& buf = circleBufs_[fd];
218     char szBuf[MAX_PACKET_BUF_SIZE] = { 0 };
219     for (int32_t i = 0; i < MAX_RECV_LIMIT; i++) {
220         ssize_t size = recv(fd, szBuf, MAX_PACKET_BUF_SIZE, MSG_DONTWAIT | MSG_NOSIGNAL);
221         if (size > 0) {
222             if (!buf.Write(szBuf, size)) {
223                 FI_HILOGW("Write data failed, size:%{public}zd", size);
224             }
225             OnReadPackets(buf, [this, fd](NetPacket &pkt) { this->OnPacket(fd, pkt); });
226         } else if (size < 0) {
227             if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
228                 FI_HILOGD("Continue for errno EAGAIN|EINTR|EWOULDBLOCK size:%{public}zd errno:%{public}d",
229                     size, errno);
230                 continue;
231             }
232             FI_HILOGE("Recv return %{public}zd, errno:%{public}d", size, errno);
233             break;
234         } else {
235             FI_HILOGE("The client side disconnect with the server, size:0, errno:%{public}d", errno);
236             ReleaseSession(fd, ev);
237             break;
238         }
239         if (static_cast<size_t>(size) < MAX_PACKET_BUF_SIZE) {
240             break;
241         }
242     }
243 }
244 
OnEpollEvent(epoll_event & ev)245 void StreamServer::OnEpollEvent(epoll_event &ev)
246 {
247     CHKPV(ev.data.ptr);
248     int32_t fd = *static_cast<int32_t*>(ev.data.ptr);
249     if (fd < 0) {
250         FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
251         return;
252     }
253     if ((ev.events & EPOLLERR) || (ev.events & EPOLLHUP)) {
254         FI_HILOGI("EPOLLERR or EPOLLHUP, fd:%{public}d, ev.events:0x%{public}x", fd, ev.events);
255         ReleaseSession(fd, ev);
256     } else if (ev.events & EPOLLIN) {
257         OnEpollRecv(fd, ev);
258     }
259 }
260 
DumpSession(const std::string & title)261 void StreamServer::DumpSession(const std::string &title)
262 {
263     FI_HILOGD("in %{public}s:%{public}s", __func__, title.c_str());
264     int32_t i = 0;
265     for (auto &[key, value] : sessionss_) {
266         CHKPV(value);
267         i++;
268     }
269 }
270 
GetSession(int32_t fd) const271 SessionPtr StreamServer::GetSession(int32_t fd) const
272 {
273     auto it = sessionss_.find(fd);
274     if (it == sessionss_.end()) {
275         FI_HILOGE("Session not found, fd:%{public}d", fd);
276         return nullptr;
277     }
278     CHKPP(it->second);
279     return it->second->GetSharedPtr();
280 }
281 
GetSessionByPid(int32_t pid) const282 SessionPtr StreamServer::GetSessionByPid(int32_t pid) const
283 {
284     int32_t fd = GetClientFd(pid);
285     if (fd <= 0) {
286         FI_HILOGE("Session not found, pid:%{public}d", pid);
287         return nullptr;
288     }
289     return GetSession(fd);
290 }
291 
AddSession(SessionPtr ses)292 bool StreamServer::AddSession(SessionPtr ses)
293 {
294     CHKPF(ses);
295     FI_HILOGI("pid:%{public}d, fd:%{public}d", ses->GetPid(), ses->GetFd());
296     int32_t fd = ses->GetFd();
297     if (fd < 0) {
298         FI_HILOGE("The fd is less than 0");
299         return false;
300     }
301     int32_t pid = ses->GetPid();
302     if (pid <= 0) {
303         FI_HILOGE("Get process failed");
304         return false;
305     }
306     if (sessionss_.size() > MAX_SESSION_ALARM) {
307         FI_HILOGE("Too many clients, Warning Value:%{public}zu, Current Value:%{public}zu",
308             MAX_SESSION_ALARM, sessionss_.size());
309         return false;
310     }
311     DumpSession("AddSession");
312     idxPids_[pid] = fd;
313     sessionss_[fd] = ses;
314     FI_HILOGI("Add session end");
315     return true;
316 }
317 
DelSession(int32_t fd)318 void StreamServer::DelSession(int32_t fd)
319 {
320     CALL_DEBUG_ENTER;
321     FI_HILOGI("fd:%{public}d", fd);
322     if (fd < 0) {
323         FI_HILOGE("The fd less than 0, errCode:%{public}d", PARAM_INPUT_INVALID);
324         return;
325     }
326     int32_t pid = GetClientPid(fd);
327     if (pid > 0) {
328         idxPids_.erase(pid);
329     }
330     auto it = sessionss_.find(fd);
331     if (it != sessionss_.end()) {
332         NotifySessionDeleted(it->second);
333         sessionss_.erase(it);
334     }
335     DumpSession("DelSession");
336 }
337 
AddSessionDeletedCallback(int32_t pid,std::function<void (SessionPtr)> callback)338 void StreamServer::AddSessionDeletedCallback(int32_t pid, std::function<void(SessionPtr)> callback)
339 {
340     CALL_DEBUG_ENTER;
341     auto it = callbacks_.find(pid);
342     if (it != callbacks_.end()) {
343         FI_HILOGW("Deleted session already exists");
344         return;
345     }
346     callbacks_[pid] = callback;
347 }
348 
NotifySessionDeleted(SessionPtr ses)349 void StreamServer::NotifySessionDeleted(SessionPtr ses)
350 {
351     CALL_DEBUG_ENTER;
352     auto it = callbacks_.find(ses->GetPid());
353     if (it != callbacks_.end()) {
354         it->second(ses);
355         callbacks_.erase(it);
356     }
357 }
358 } // namespace DeviceStatus
359 } // namespace Msdp
360 } // namespace OHOS
361