1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "le_loop.h"
17 
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21 
22 #include "le_socket.h"
23 #include "le_task.h"
24 
HandleSendMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26     const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28     EventLoop *loop = (EventLoop *)loopHandle;
29     StreamTask *stream = (StreamTask *)taskHandle;
30     LE_Buffer *buffer = GetFirstBuffer(stream);
31     while (buffer) {
32         int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33         if (ret < 0 || (size_t)ret < buffer->dataSize) {
34             LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35                     buffer->dataSize, ret, errno);
36         }
37         LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38         buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39         if (complete != NULL) {
40             complete(taskHandle, buffer);
41         }
42         FreeBuffer(loopHandle, stream, buffer);
43         buffer = GetFirstBuffer(stream);
44     }
45     if (IsBufferEmpty(stream)) {
46         LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47         loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48         return LE_SUCCESS;
49     }
50     return LE_SUCCESS;
51 }
52 
HandleRecvMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_RecvMessage recvMessage,const LE_HandleRecvMsg handleRecvMsg)53 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54     const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55 {
56     LE_STATUS status = LE_SUCCESS;
57     LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58     int readLen = 0;
59     while (1) {
60         if (handleRecvMsg != NULL) {
61             readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
62         } else {
63             readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
64         }
65         LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
66         if (readLen < 0) {
67             if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
68                 continue;
69             }
70             status = LE_DIS_CONNECTED;
71             break;
72         } else if (readLen == 0) {
73             // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
74             status = LE_DIS_CONNECTED;
75             break;
76         } else {
77             break;
78         }
79     }
80     if (status != LE_SUCCESS) {
81         FreeBuffer(loopHandle, NULL, buffer);
82         return status;
83     }
84     if (recvMessage) {
85         recvMessage(taskHandle, buffer->data, readLen);
86     }
87     FreeBuffer(loopHandle, NULL, buffer);
88     return status;
89 }
90 
HandleStreamEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)91 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
92 {
93     StreamConnectTask *stream = (StreamConnectTask *)handle;
94     LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
95 
96     LE_STATUS status = LE_SUCCESS;
97     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
98         status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
99     }
100     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
101         status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
102     }
103     if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
104         if (stream->disConnectComplete) {
105             stream->disConnectComplete(handle);
106         }
107         LE_CloseStreamTask(loopHandle, handle);
108     }
109     return status;
110 }
111 
HandleClientEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)112 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
113 {
114     StreamClientTask *client = (StreamClientTask *)handle;
115     LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
116 
117     LE_STATUS status = LE_SUCCESS;
118     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
119         LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
120         client->connected = 1;
121         status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
122     }
123     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
124         status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
125     }
126     if (status == LE_DIS_CONNECTED) {
127         if (client->disConnectComplete) {
128             client->disConnectComplete(handle);
129         }
130         client->connected = 0;
131         LE_CloseStreamTask(loopHandle, handle);
132     }
133     return status;
134 }
135 
HandleStreamTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)136 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
137 {
138     BaseTask *task = (BaseTask *)taskHandle;
139     DelTask((EventLoop *)loopHandle, task);
140     CloseTask(loopHandle, task);
141     if (task->taskId.fd > 0) {
142         close(task->taskId.fd);
143     }
144 }
145 
DumpStreamServerTaskInfo_(const TaskHandle task)146 static void DumpStreamServerTaskInfo_(const TaskHandle task)
147 {
148     INIT_CHECK(task != NULL, return);
149     BaseTask *baseTask = (BaseTask *)task;
150     StreamServerTask *serverTask = (StreamServerTask *)baseTask;
151     printf("\tfd: %d \n", serverTask->base.taskId.fd);
152     printf("\t  TaskType: %s \n", "ServerTask");
153     if (strlen(serverTask->server) > 0) {
154         printf("\t  Server socket:%s \n", serverTask->server);
155     } else {
156         printf("\t  Server socket:%s \n", "NULL");
157     }
158 }
159 
DumpStreamConnectTaskInfo_(const TaskHandle task)160 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
161 {
162     INIT_CHECK(task != NULL, return);
163     BaseTask *baseTask = (BaseTask *)task;
164     StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
165     TaskHandle taskHandle = (TaskHandle)connectTask;
166     printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
167     printf("\t  TaskType: %s \n", "ConnectTask");
168     printf("\t  ServiceInfo: \n");
169     struct ucred cred = {-1, -1, -1};
170     socklen_t credSize  = sizeof(struct ucred);
171     if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
172         printf("\t    Service Pid: %d \n", cred.pid);
173         printf("\t    Service Uid: %u \n", cred.uid);
174         printf("\t    Service Gid: %u \n", cred.gid);
175     } else {
176         printf("\t    Service Pid: %s \n", "NULL");
177         printf("\t    Service Uid: %s \n", "NULL");
178         printf("\t    Service Gid: %s \n", "NULL");
179     }
180 }
181 
HandleServerEvent_(const LoopHandle loopHandle,const TaskHandle serverTask,uint32_t oper)182 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
183 {
184     LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
185     if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
186         return LE_FAILURE;
187     }
188     StreamServerTask *server = (StreamServerTask *)serverTask;
189     LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
190 
191     int ret = server->incommingConnect(loopHandle, serverTask);
192     if (ret != LE_SUCCESS) {
193         LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
194     }
195     EventLoop *loop = (EventLoop *)loopHandle;
196     loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
197     return LE_SUCCESS;
198 }
199 
LE_CreateStreamServer(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamServerInfo * info)200 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
201     TaskHandle *taskHandle, const LE_StreamServerInfo *info)
202 {
203     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
204     LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
205     LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
206         "Invalid parameters incommingConnect %s", info->server);
207 
208     int fd = info->socketId;
209     int ret = 0;
210     if (info->socketId <= 0) {
211         fd = CreateSocket(info->baseInfo.flags, info->server);
212         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
213     } else {
214         ret = listenSocket(fd, info->baseInfo.flags, info->server);
215         LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
216     }
217 
218     EventLoop *loop = (EventLoop *)loopHandle;
219     StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
220         sizeof(StreamServerTask) + strlen(info->server) + 1);
221     LE_CHECK(task != NULL, close(fd);
222         return LE_NO_MEMORY, "Failed to create task");
223     task->base.handleEvent = HandleServerEvent_;
224     task->base.innerClose = HandleStreamTaskClose_;
225     task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
226     task->incommingConnect = info->incommingConnect;
227     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
228     ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
229     LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
230     *taskHandle = (TaskHandle)task;
231     return LE_SUCCESS;
232 }
233 
LE_CreateStreamClient(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamInfo * info)234 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
235     TaskHandle *taskHandle, const LE_StreamInfo *info)
236 {
237     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
238     LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
239 
240     int fd = CreateSocket(info->baseInfo.flags, info->server);
241     LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
242 
243     StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
244     LE_CHECK(task != NULL, close(fd);
245         return LE_NO_MEMORY, "Failed to create task");
246     task->stream.base.handleEvent = HandleClientEvent_;
247     task->stream.base.innerClose = HandleStreamTaskClose_;
248     OH_ListInit(&task->stream.buffHead);
249     LoopMutexInit(&task->stream.mutex);
250 
251     task->connectComplete = info->connectComplete;
252     task->sendMessageComplete = info->sendMessageComplete;
253     task->recvMessage = info->recvMessage;
254     task->disConnectComplete = info->disConnectComplete;
255     task->handleRecvMsg = info->handleRecvMsg;
256     EventLoop *loop = (EventLoop *)loopHandle;
257     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
258     *taskHandle = (TaskHandle)task;
259     return LE_SUCCESS;
260 }
261 
LE_AcceptStreamClient(const LoopHandle loopHandle,const TaskHandle server,TaskHandle * taskHandle,const LE_StreamInfo * info)262 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
263     TaskHandle *taskHandle, const LE_StreamInfo *info)
264 {
265     LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
266     LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267     LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
268     int fd = -1;
269     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
270         fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
271         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
272     }
273     StreamConnectTask *task = (StreamConnectTask *)CreateTask(
274         loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
275     LE_CHECK(task != NULL, close(fd);
276         return LE_NO_MEMORY, "Failed to create task");
277     task->stream.base.handleEvent = HandleStreamEvent_;
278     task->stream.base.innerClose = HandleStreamTaskClose_;
279     task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
280     task->disConnectComplete = info->disConnectComplete;
281     task->sendMessageComplete = info->sendMessageComplete;
282     task->recvMessage = info->recvMessage;
283     task->serverTask = (StreamServerTask *)server;
284     task->handleRecvMsg = info->handleRecvMsg;
285     OH_ListInit(&task->stream.buffHead);
286     LoopMutexInit(&task->stream.mutex);
287     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
288         EventLoop *loop = (EventLoop *)loopHandle;
289         loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
290     }
291     *taskHandle = (TaskHandle)task;
292     return 0;
293 }
294 
LE_CloseStreamTask(const LoopHandle loopHandle,const TaskHandle taskHandle)295 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
296 {
297     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
298     LE_CloseTask(loopHandle, taskHandle);
299 }
300 
LE_GetSocketFd(const TaskHandle taskHandle)301 int LE_GetSocketFd(const TaskHandle taskHandle)
302 {
303     LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
304     return GetSocketFd(taskHandle);
305 }
306