1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "le_loop.h"
17
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21
22 #include "le_socket.h"
23 #include "le_task.h"
24
HandleSendMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26 const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28 EventLoop *loop = (EventLoop *)loopHandle;
29 StreamTask *stream = (StreamTask *)taskHandle;
30 LE_Buffer *buffer = GetFirstBuffer(stream);
31 while (buffer) {
32 int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33 if (ret < 0 || (size_t)ret < buffer->dataSize) {
34 LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35 buffer->dataSize, ret, errno);
36 }
37 LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38 buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39 if (complete != NULL) {
40 complete(taskHandle, buffer);
41 }
42 FreeBuffer(loopHandle, stream, buffer);
43 buffer = GetFirstBuffer(stream);
44 }
45 if (IsBufferEmpty(stream)) {
46 LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47 loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48 return LE_SUCCESS;
49 }
50 return LE_SUCCESS;
51 }
52
HandleRecvMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_RecvMessage recvMessage,const LE_HandleRecvMsg handleRecvMsg)53 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54 const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
55 {
56 LE_STATUS status = LE_SUCCESS;
57 LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58 int readLen = 0;
59 while (1) {
60 if (handleRecvMsg != NULL) {
61 readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
62 } else {
63 readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
64 }
65 LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
66 if (readLen < 0) {
67 if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
68 continue;
69 }
70 status = LE_DIS_CONNECTED;
71 break;
72 } else if (readLen == 0) {
73 // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
74 status = LE_DIS_CONNECTED;
75 break;
76 } else {
77 break;
78 }
79 }
80 if (status != LE_SUCCESS) {
81 FreeBuffer(loopHandle, NULL, buffer);
82 return status;
83 }
84 if (recvMessage) {
85 recvMessage(taskHandle, buffer->data, readLen);
86 }
87 FreeBuffer(loopHandle, NULL, buffer);
88 return status;
89 }
90
HandleStreamEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)91 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
92 {
93 StreamConnectTask *stream = (StreamConnectTask *)handle;
94 LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
95
96 LE_STATUS status = LE_SUCCESS;
97 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
98 status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
99 }
100 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
101 status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
102 }
103 if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
104 if (stream->disConnectComplete) {
105 stream->disConnectComplete(handle);
106 }
107 LE_CloseStreamTask(loopHandle, handle);
108 }
109 return status;
110 }
111
HandleClientEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)112 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
113 {
114 StreamClientTask *client = (StreamClientTask *)handle;
115 LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
116
117 LE_STATUS status = LE_SUCCESS;
118 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
119 LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
120 client->connected = 1;
121 status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
122 }
123 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
124 status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
125 }
126 if (status == LE_DIS_CONNECTED) {
127 if (client->disConnectComplete) {
128 client->disConnectComplete(handle);
129 }
130 client->connected = 0;
131 LE_CloseStreamTask(loopHandle, handle);
132 }
133 return status;
134 }
135
HandleStreamTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)136 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
137 {
138 BaseTask *task = (BaseTask *)taskHandle;
139 DelTask((EventLoop *)loopHandle, task);
140 CloseTask(loopHandle, task);
141 if (task->taskId.fd > 0) {
142 close(task->taskId.fd);
143 }
144 }
145
DumpStreamServerTaskInfo_(const TaskHandle task)146 static void DumpStreamServerTaskInfo_(const TaskHandle task)
147 {
148 INIT_CHECK(task != NULL, return);
149 BaseTask *baseTask = (BaseTask *)task;
150 StreamServerTask *serverTask = (StreamServerTask *)baseTask;
151 printf("\tfd: %d \n", serverTask->base.taskId.fd);
152 printf("\t TaskType: %s \n", "ServerTask");
153 if (strlen(serverTask->server) > 0) {
154 printf("\t Server socket:%s \n", serverTask->server);
155 } else {
156 printf("\t Server socket:%s \n", "NULL");
157 }
158 }
159
DumpStreamConnectTaskInfo_(const TaskHandle task)160 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
161 {
162 INIT_CHECK(task != NULL, return);
163 BaseTask *baseTask = (BaseTask *)task;
164 StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
165 TaskHandle taskHandle = (TaskHandle)connectTask;
166 printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
167 printf("\t TaskType: %s \n", "ConnectTask");
168 printf("\t ServiceInfo: \n");
169 struct ucred cred = {-1, -1, -1};
170 socklen_t credSize = sizeof(struct ucred);
171 if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
172 printf("\t Service Pid: %d \n", cred.pid);
173 printf("\t Service Uid: %u \n", cred.uid);
174 printf("\t Service Gid: %u \n", cred.gid);
175 } else {
176 printf("\t Service Pid: %s \n", "NULL");
177 printf("\t Service Uid: %s \n", "NULL");
178 printf("\t Service Gid: %s \n", "NULL");
179 }
180 }
181
HandleServerEvent_(const LoopHandle loopHandle,const TaskHandle serverTask,uint32_t oper)182 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
183 {
184 LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
185 if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
186 return LE_FAILURE;
187 }
188 StreamServerTask *server = (StreamServerTask *)serverTask;
189 LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
190
191 int ret = server->incommingConnect(loopHandle, serverTask);
192 if (ret != LE_SUCCESS) {
193 LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
194 }
195 EventLoop *loop = (EventLoop *)loopHandle;
196 loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
197 return LE_SUCCESS;
198 }
199
LE_CreateStreamServer(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamServerInfo * info)200 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
201 TaskHandle *taskHandle, const LE_StreamServerInfo *info)
202 {
203 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
204 LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
205 LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
206 "Invalid parameters incommingConnect %s", info->server);
207
208 int fd = info->socketId;
209 int ret = 0;
210 if (info->socketId <= 0) {
211 fd = CreateSocket(info->baseInfo.flags, info->server);
212 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
213 } else {
214 ret = listenSocket(fd, info->baseInfo.flags, info->server);
215 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
216 }
217
218 EventLoop *loop = (EventLoop *)loopHandle;
219 StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
220 sizeof(StreamServerTask) + strlen(info->server) + 1);
221 LE_CHECK(task != NULL, close(fd);
222 return LE_NO_MEMORY, "Failed to create task");
223 task->base.handleEvent = HandleServerEvent_;
224 task->base.innerClose = HandleStreamTaskClose_;
225 task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
226 task->incommingConnect = info->incommingConnect;
227 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
228 ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
229 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
230 *taskHandle = (TaskHandle)task;
231 return LE_SUCCESS;
232 }
233
LE_CreateStreamClient(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamInfo * info)234 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
235 TaskHandle *taskHandle, const LE_StreamInfo *info)
236 {
237 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
238 LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
239
240 int fd = CreateSocket(info->baseInfo.flags, info->server);
241 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
242
243 StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
244 LE_CHECK(task != NULL, close(fd);
245 return LE_NO_MEMORY, "Failed to create task");
246 task->stream.base.handleEvent = HandleClientEvent_;
247 task->stream.base.innerClose = HandleStreamTaskClose_;
248 OH_ListInit(&task->stream.buffHead);
249 LoopMutexInit(&task->stream.mutex);
250
251 task->connectComplete = info->connectComplete;
252 task->sendMessageComplete = info->sendMessageComplete;
253 task->recvMessage = info->recvMessage;
254 task->disConnectComplete = info->disConnectComplete;
255 task->handleRecvMsg = info->handleRecvMsg;
256 EventLoop *loop = (EventLoop *)loopHandle;
257 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
258 *taskHandle = (TaskHandle)task;
259 return LE_SUCCESS;
260 }
261
LE_AcceptStreamClient(const LoopHandle loopHandle,const TaskHandle server,TaskHandle * taskHandle,const LE_StreamInfo * info)262 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
263 TaskHandle *taskHandle, const LE_StreamInfo *info)
264 {
265 LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
266 LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
267 LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
268 int fd = -1;
269 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
270 fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
271 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
272 }
273 StreamConnectTask *task = (StreamConnectTask *)CreateTask(
274 loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
275 LE_CHECK(task != NULL, close(fd);
276 return LE_NO_MEMORY, "Failed to create task");
277 task->stream.base.handleEvent = HandleStreamEvent_;
278 task->stream.base.innerClose = HandleStreamTaskClose_;
279 task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
280 task->disConnectComplete = info->disConnectComplete;
281 task->sendMessageComplete = info->sendMessageComplete;
282 task->recvMessage = info->recvMessage;
283 task->serverTask = (StreamServerTask *)server;
284 task->handleRecvMsg = info->handleRecvMsg;
285 OH_ListInit(&task->stream.buffHead);
286 LoopMutexInit(&task->stream.mutex);
287 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
288 EventLoop *loop = (EventLoop *)loopHandle;
289 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
290 }
291 *taskHandle = (TaskHandle)task;
292 return 0;
293 }
294
LE_CloseStreamTask(const LoopHandle loopHandle,const TaskHandle taskHandle)295 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
296 {
297 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
298 LE_CloseTask(loopHandle, taskHandle);
299 }
300
LE_GetSocketFd(const TaskHandle taskHandle)301 int LE_GetSocketFd(const TaskHandle taskHandle)
302 {
303 LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
304 return GetSocketFd(taskHandle);
305 }
306