1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "epoll_multi_driver.h"
17
18 #include "netstack_log.h"
19 #include "request_info.h"
20
21 namespace OHOS::NetStack::HttpOverCurl {
22
23 static constexpr size_t MAX_EPOLL_EVENTS = 10;
24
EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo * >> & incomingQueue)25 EpollMultiDriver::EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo *>> &incomingQueue)
26 : incomingQueue_(incomingQueue)
27 {
28 Initialize();
29 }
30
Initialize()31 void EpollMultiDriver::Initialize()
32 {
33 timeoutTimer_.RegisterForPolling(poller_);
34 incomingQueue_->GetSyncEvent().RegisterForPolling(poller_);
35 multi_ = curl_multi_init();
36 if (!multi_) {
37 NETSTACK_LOGE("Failed to initialize curl_multi handle");
38 return;
39 }
40
41 static auto socketCallback = +[](CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
42 auto instance = static_cast<EpollMultiDriver *>(userp);
43 return instance->MultiSocketCallback(s, action, static_cast<CurlSocketContext *>(socketp));
44 };
45 curl_multi_setopt(multi_, CURLMOPT_SOCKETDATA, this);
46 curl_multi_setopt(multi_, CURLMOPT_SOCKETFUNCTION, socketCallback);
47
48 static auto timerCallback = +[](CURLM *multi, long timeout_ms, void *userp) {
49 auto instance = static_cast<EpollMultiDriver *>(userp);
50 instance->MultiTimeoutCallback(timeout_ms);
51 };
52 curl_multi_setopt(multi_, CURLMOPT_TIMERDATA, this);
53 curl_multi_setopt(multi_, CURLMOPT_TIMERFUNCTION, timerCallback);
54 }
55
~EpollMultiDriver()56 EpollMultiDriver::~EpollMultiDriver()
57 {
58 if (multi_) {
59 curl_multi_cleanup(multi_);
60 multi_ = nullptr;
61 }
62 }
63
Step(int waitEventsTimeoutMs)64 void EpollMultiDriver::Step(int waitEventsTimeoutMs)
65 {
66 epoll_event events[MAX_EPOLL_EVENTS];
67 int eventsToHandle = poller_.Wait(events, MAX_EPOLL_EVENTS, waitEventsTimeoutMs);
68 if (eventsToHandle == -1) {
69 return;
70 }
71
72 for (int idx = 0; idx < eventsToHandle; ++idx) {
73 if (incomingQueue_->GetSyncEvent().IsItYours(events[idx].data.fd)) {
74 IncomingRequestCallback();
75 } else if (timeoutTimer_.IsItYours(events[idx].data.fd)) {
76 EpollTimerCallback();
77 } else { // curl socket event
78 EpollSocketCallback(events[idx].data.fd);
79 }
80 }
81 }
82
IncomingRequestCallback()83 void EpollMultiDriver::IncomingRequestCallback()
84 {
85 auto requestsToAdd = incomingQueue_->Flush();
86 for (auto &request : requestsToAdd) {
87 ongoingRequests_[request->easyHandle] = request;
88 auto ret = curl_multi_add_handle(multi_, request->easyHandle);
89 if (ret != CURLM_OK) {
90 NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d %{public}s", ret, curl_multi_strerror(ret));
91 continue;
92 }
93
94 if (request->startedCallback) {
95 request->startedCallback(request->easyHandle, request->opaqueData);
96 }
97 }
98 }
99
100 // Update the timer after curl_multi library does its thing. Curl will
101 // inform us through this callback what it wants the new timeout to be,
102 // after it does some work.
MultiTimeoutCallback(long timeoutMs)103 int EpollMultiDriver::MultiTimeoutCallback(long timeoutMs)
104 {
105 if (timeoutMs > 0) {
106 timeoutTimer_.SetTimeoutMs(timeoutMs);
107 } else if (timeoutMs == 0) {
108 // libcurl wants us to timeout now, however setting both fields of
109 // new_value.it_value to zero disarms the timer. The closest we can
110 // do is to schedule the timer to fire in 1 ns.
111 timeoutTimer_.SetTimeoutNs(1);
112 }
113
114 return 0;
115 }
116
117 // Called by main loop when our timeout expires
EpollTimerCallback()118 void EpollMultiDriver::EpollTimerCallback()
119 {
120 timeoutTimer_.ResetEvent();
121 auto rc = curl_multi_socket_action(multi_, CURL_SOCKET_TIMEOUT, 0, &stillRunning);
122 if (rc != CURLM_OK) {
123 NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
124 } else {
125 CheckMultiInfo();
126 }
127 }
128
CheckMultiInfo()129 __attribute__((no_sanitize("cfi"))) void EpollMultiDriver::CheckMultiInfo()
130 {
131 CURLMsg *message;
132 int pending;
133
134 while ((message = curl_multi_info_read(multi_, &pending))) {
135 switch (message->msg) {
136 case CURLMSG_DONE: {
137 auto easyHandle = message->easy_handle;
138 curl_multi_remove_handle(multi_, easyHandle);
139 auto requestInfo = ongoingRequests_[easyHandle];
140 ongoingRequests_.erase(easyHandle);
141 if (requestInfo != nullptr && requestInfo->doneCallback) {
142 requestInfo->doneCallback(message, requestInfo->opaqueData);
143 }
144 delete requestInfo;
145 break;
146 }
147 default:
148 NETSTACK_LOGD("CURLMSG default");
149 break;
150 }
151 }
152 }
153
MultiSocketCallback(curl_socket_t socket,int action,CurlSocketContext * socketContext)154 int EpollMultiDriver::MultiSocketCallback(curl_socket_t socket, int action, CurlSocketContext *socketContext)
155 {
156 switch (action) {
157 case CURL_POLL_IN:
158 case CURL_POLL_OUT:
159 case CURL_POLL_INOUT:
160 if (!socketContext) {
161 curl_multi_assign(multi_, socket, new CurlSocketContext(poller_, socket, action));
162 } else {
163 socketContext->Reassign(socket, action);
164 }
165 break;
166 case CURL_POLL_REMOVE:
167 delete socketContext;
168 break;
169 default:
170 NETSTACK_LOGE("Unexpected socket action = %{public}d", action);
171 }
172
173 return 0;
174 }
175
CurlPollToEpoll(int action)176 static int CurlPollToEpoll(int action)
177 {
178 int kind = ((action & CURL_POLL_IN) ? EPOLLIN : (EPOLLIN & ~EPOLLIN)) |
179 ((action & CURL_POLL_OUT) ? EPOLLOUT : (EPOLLOUT & ~EPOLLOUT));
180 return kind;
181 }
182
CurlSocketContext(HttpOverCurl::Epoller & poller,curl_socket_t sockDescriptor,int action)183 EpollMultiDriver::CurlSocketContext::CurlSocketContext(HttpOverCurl::Epoller &poller, curl_socket_t sockDescriptor,
184 int action)
185 : poller_(poller), socketDescriptor_(sockDescriptor)
186 {
187 int kind = CurlPollToEpoll(action);
188 poller_.RegisterMe(socketDescriptor_, kind);
189 }
190
Reassign(curl_socket_t sockDescriptor,int action)191 void EpollMultiDriver::CurlSocketContext::Reassign(curl_socket_t sockDescriptor, int action)
192 {
193 poller_.UnregisterMe(socketDescriptor_);
194 socketDescriptor_ = sockDescriptor;
195 int kind = CurlPollToEpoll(action);
196 poller_.RegisterMe(socketDescriptor_, kind);
197 }
198
~CurlSocketContext()199 EpollMultiDriver::CurlSocketContext::~CurlSocketContext()
200 {
201 poller_.UnregisterMe(socketDescriptor_);
202 }
203
204 // Called by main loop when we get action on a multi socket file descriptor
EpollSocketCallback(int fd)205 void EpollMultiDriver::EpollSocketCallback(int fd)
206 {
207 int action = CURL_CSELECT_IN | CURL_CSELECT_OUT;
208 auto rc = curl_multi_socket_action(multi_, fd, action, &stillRunning);
209 if (rc != CURLM_OK) {
210 NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
211 } else {
212 CheckMultiInfo();
213 }
214
215 if (stillRunning <= 0) {
216 timeoutTimer_.Stop();
217 }
218 }
219
220 } // namespace OHOS::NetStack::HttpOverCurl
221