1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "epoll_multi_driver.h"
17 
18 #include "netstack_log.h"
19 #include "request_info.h"
20 
21 namespace OHOS::NetStack::HttpOverCurl {
22 
23 static constexpr size_t MAX_EPOLL_EVENTS = 10;
24 
EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo * >> & incomingQueue)25 EpollMultiDriver::EpollMultiDriver(const std::shared_ptr<HttpOverCurl::ThreadSafeStorage<RequestInfo *>> &incomingQueue)
26     : incomingQueue_(incomingQueue)
27 {
28     Initialize();
29 }
30 
Initialize()31 void EpollMultiDriver::Initialize()
32 {
33     timeoutTimer_.RegisterForPolling(poller_);
34     incomingQueue_->GetSyncEvent().RegisterForPolling(poller_);
35     multi_ = curl_multi_init();
36     if (!multi_) {
37         NETSTACK_LOGE("Failed to initialize curl_multi handle");
38         return;
39     }
40 
41     static auto socketCallback = +[](CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
42         auto instance = static_cast<EpollMultiDriver *>(userp);
43         return instance->MultiSocketCallback(s, action, static_cast<CurlSocketContext *>(socketp));
44     };
45     curl_multi_setopt(multi_, CURLMOPT_SOCKETDATA, this);
46     curl_multi_setopt(multi_, CURLMOPT_SOCKETFUNCTION, socketCallback);
47 
48     static auto timerCallback = +[](CURLM *multi, long timeout_ms, void *userp) {
49         auto instance = static_cast<EpollMultiDriver *>(userp);
50         instance->MultiTimeoutCallback(timeout_ms);
51     };
52     curl_multi_setopt(multi_, CURLMOPT_TIMERDATA, this);
53     curl_multi_setopt(multi_, CURLMOPT_TIMERFUNCTION, timerCallback);
54 }
55 
~EpollMultiDriver()56 EpollMultiDriver::~EpollMultiDriver()
57 {
58     if (multi_) {
59         curl_multi_cleanup(multi_);
60         multi_ = nullptr;
61     }
62 }
63 
Step(int waitEventsTimeoutMs)64 void EpollMultiDriver::Step(int waitEventsTimeoutMs)
65 {
66     epoll_event events[MAX_EPOLL_EVENTS];
67     int eventsToHandle = poller_.Wait(events, MAX_EPOLL_EVENTS, waitEventsTimeoutMs);
68     if (eventsToHandle == -1) {
69         return;
70     }
71 
72     for (int idx = 0; idx < eventsToHandle; ++idx) {
73         if (incomingQueue_->GetSyncEvent().IsItYours(events[idx].data.fd)) {
74             IncomingRequestCallback();
75         } else if (timeoutTimer_.IsItYours(events[idx].data.fd)) {
76             EpollTimerCallback();
77         } else { // curl socket event
78             EpollSocketCallback(events[idx].data.fd);
79         }
80     }
81 }
82 
IncomingRequestCallback()83 void EpollMultiDriver::IncomingRequestCallback()
84 {
85     auto requestsToAdd = incomingQueue_->Flush();
86     for (auto &request : requestsToAdd) {
87         ongoingRequests_[request->easyHandle] = request;
88         auto ret = curl_multi_add_handle(multi_, request->easyHandle);
89         if (ret != CURLM_OK) {
90             NETSTACK_LOGE("curl_multi_add_handle err, ret = %{public}d %{public}s", ret, curl_multi_strerror(ret));
91             continue;
92         }
93 
94         if (request->startedCallback) {
95             request->startedCallback(request->easyHandle, request->opaqueData);
96         }
97     }
98 }
99 
100 // Update the timer after curl_multi library does its thing. Curl will
101 // inform us through this callback what it wants the new timeout to be,
102 // after it does some work.
MultiTimeoutCallback(long timeoutMs)103 int EpollMultiDriver::MultiTimeoutCallback(long timeoutMs)
104 {
105     if (timeoutMs > 0) {
106         timeoutTimer_.SetTimeoutMs(timeoutMs);
107     } else if (timeoutMs == 0) {
108         // libcurl wants us to timeout now, however setting both fields of
109         // new_value.it_value to zero disarms the timer. The closest we can
110         // do is to schedule the timer to fire in 1 ns.
111         timeoutTimer_.SetTimeoutNs(1);
112     }
113 
114     return 0;
115 }
116 
117 // Called by main loop when our timeout expires
EpollTimerCallback()118 void EpollMultiDriver::EpollTimerCallback()
119 {
120     timeoutTimer_.ResetEvent();
121     auto rc = curl_multi_socket_action(multi_, CURL_SOCKET_TIMEOUT, 0, &stillRunning);
122     if (rc != CURLM_OK) {
123         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
124     } else {
125         CheckMultiInfo();
126     }
127 }
128 
CheckMultiInfo()129 __attribute__((no_sanitize("cfi"))) void EpollMultiDriver::CheckMultiInfo()
130 {
131     CURLMsg *message;
132     int pending;
133 
134     while ((message = curl_multi_info_read(multi_, &pending))) {
135         switch (message->msg) {
136             case CURLMSG_DONE: {
137                 auto easyHandle = message->easy_handle;
138                 curl_multi_remove_handle(multi_, easyHandle);
139                 auto requestInfo = ongoingRequests_[easyHandle];
140                 ongoingRequests_.erase(easyHandle);
141                 if (requestInfo != nullptr && requestInfo->doneCallback) {
142                     requestInfo->doneCallback(message, requestInfo->opaqueData);
143                 }
144                 delete requestInfo;
145                 break;
146             }
147             default:
148                 NETSTACK_LOGD("CURLMSG default");
149                 break;
150         }
151     }
152 }
153 
MultiSocketCallback(curl_socket_t socket,int action,CurlSocketContext * socketContext)154 int EpollMultiDriver::MultiSocketCallback(curl_socket_t socket, int action, CurlSocketContext *socketContext)
155 {
156     switch (action) {
157         case CURL_POLL_IN:
158         case CURL_POLL_OUT:
159         case CURL_POLL_INOUT:
160             if (!socketContext) {
161                 curl_multi_assign(multi_, socket, new CurlSocketContext(poller_, socket, action));
162             } else {
163                 socketContext->Reassign(socket, action);
164             }
165             break;
166         case CURL_POLL_REMOVE:
167             delete socketContext;
168             break;
169         default:
170             NETSTACK_LOGE("Unexpected socket action = %{public}d", action);
171     }
172 
173     return 0;
174 }
175 
CurlPollToEpoll(int action)176 static int CurlPollToEpoll(int action)
177 {
178     int kind = ((action & CURL_POLL_IN) ? EPOLLIN : (EPOLLIN & ~EPOLLIN)) |
179                ((action & CURL_POLL_OUT) ? EPOLLOUT : (EPOLLOUT & ~EPOLLOUT));
180     return kind;
181 }
182 
CurlSocketContext(HttpOverCurl::Epoller & poller,curl_socket_t sockDescriptor,int action)183 EpollMultiDriver::CurlSocketContext::CurlSocketContext(HttpOverCurl::Epoller &poller, curl_socket_t sockDescriptor,
184                                                        int action)
185     : poller_(poller), socketDescriptor_(sockDescriptor)
186 {
187     int kind = CurlPollToEpoll(action);
188     poller_.RegisterMe(socketDescriptor_, kind);
189 }
190 
Reassign(curl_socket_t sockDescriptor,int action)191 void EpollMultiDriver::CurlSocketContext::Reassign(curl_socket_t sockDescriptor, int action)
192 {
193     poller_.UnregisterMe(socketDescriptor_);
194     socketDescriptor_ = sockDescriptor;
195     int kind = CurlPollToEpoll(action);
196     poller_.RegisterMe(socketDescriptor_, kind);
197 }
198 
~CurlSocketContext()199 EpollMultiDriver::CurlSocketContext::~CurlSocketContext()
200 {
201     poller_.UnregisterMe(socketDescriptor_);
202 }
203 
204 // Called by main loop when we get action on a multi socket file descriptor
EpollSocketCallback(int fd)205 void EpollMultiDriver::EpollSocketCallback(int fd)
206 {
207     int action = CURL_CSELECT_IN | CURL_CSELECT_OUT;
208     auto rc = curl_multi_socket_action(multi_, fd, action, &stillRunning);
209     if (rc != CURLM_OK) {
210         NETSTACK_LOGE("curl_multi returned error = %{public}d", rc);
211     } else {
212         CheckMultiInfo();
213     }
214 
215     if (stillRunning <= 0) {
216         timeoutTimer_.Stop();
217     }
218 }
219 
220 } // namespace OHOS::NetStack::HttpOverCurl
221