1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 #![cfg(all(
15 feature = "async",
16 feature = "http2",
17 feature = "tokio_base",
18 not(feature = "__tls")
19 ))]
20
21 use std::convert::Infallible;
22 use std::sync::Arc;
23
24 use hyper::body::HttpBody;
25 use tokio::sync::mpsc::{Receiver, Sender};
26 use ylong_http::body::async_impl::Body as RespBody;
27 use ylong_http::response::status::StatusCode;
28 use ylong_http_client::async_impl::{Body, Client, Request};
29
30 pub struct HttpHandle {
31 pub port: u16,
32
33 // This channel allows the server to notify the client when it is up and running.
34 pub server_start: Receiver<()>,
35
36 // This channel allows the client to notify the server when it is ready to shut down.
37 pub client_shutdown: Sender<()>,
38
39 // This channel allows the server to notify the client when it has shut down.
40 pub server_shutdown: Receiver<()>,
41 }
42
server_fn( req: hyper::Request<hyper::Body>, ) -> Result<hyper::Response<hyper::Body>, Infallible>43 async fn server_fn(
44 req: hyper::Request<hyper::Body>,
45 ) -> Result<hyper::Response<hyper::Body>, Infallible> {
46 let (parts, mut body) = req.into_parts();
47 assert_eq!(
48 parts.method.to_string(),
49 "GET",
50 "Assert request method failed"
51 );
52 assert_eq!(
53 format!("{:?}", parts.version),
54 "HTTP/2.0",
55 "Assert request version failed"
56 );
57
58 let mut size = 0;
59 loop {
60 match body.data().await {
61 None => {
62 break;
63 }
64 Some(Ok(bytes)) => {
65 size += bytes.len();
66 }
67 Some(Err(_e)) => {
68 panic!("server read request body data occurs error");
69 }
70 }
71 }
72 assert_eq!(
73 size,
74 10 * 1024 * 1024,
75 "Assert request body data length failed"
76 );
77
78 let body_data = vec![b'q'; 10 * 1024 * 1024];
79 let response = hyper::Response::builder()
80 .version(hyper::Version::HTTP_2)
81 .status(hyper::StatusCode::OK)
82 .body(hyper::Body::from(body_data))
83 .expect("build hyper response failed");
84 Ok(response)
85 }
86
87 #[macro_export]
88 macro_rules! start_http_server {
89 (
90 HTTP;
91 $server_fn: ident
92 ) => {{
93 use std::convert::Infallible;
94
95 use hyper::service::{make_service_fn, service_fn};
96 use tokio::sync::mpsc::channel;
97
98 let (start_tx, start_rx) = channel::<()>(1);
99 let (client_tx, mut client_rx) = channel::<()>(1);
100 let (server_tx, server_rx) = channel::<()>(1);
101
102 let tcp_listener =
103 std::net::TcpListener::bind("127.0.0.1:0").expect("server bind port failed !");
104 let addr = tcp_listener
105 .local_addr()
106 .expect("get server local address failed!");
107 let port = addr.port();
108
109 let server = hyper::Server::from_tcp(tcp_listener)
110 .expect("build hyper server from tcp listener failed !");
111
112 tokio::spawn(async move {
113 let make_svc =
114 make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn($server_fn)) });
115 server
116 .serve(make_svc)
117 .with_graceful_shutdown(async {
118 start_tx
119 .send(())
120 .await
121 .expect("Start channel (Client-Half) be closed unexpectedly");
122 client_rx
123 .recv()
124 .await
125 .expect("Client channel (Client-Half) be closed unexpectedly");
126 })
127 .await
128 .expect("Start server failed");
129 server_tx
130 .send(())
131 .await
132 .expect("Server channel (Client-Half) be closed unexpectedly");
133 });
134
135 HttpHandle {
136 port,
137 server_start: start_rx,
138 client_shutdown: client_tx,
139 server_shutdown: server_rx,
140 }
141 }};
142 }
143
144 #[test]
sdv_async_h2_client_send_request()145 fn sdv_async_h2_client_send_request() {
146 let rt = tokio::runtime::Builder::new_multi_thread()
147 .worker_threads(4)
148 .enable_all()
149 .build()
150 .expect("init runtime failed !");
151
152 let (tx, rx) = std::sync::mpsc::channel();
153
154 rt.block_on(async move {
155 let mut handle = start_http_server!(HTTP; server_fn);
156 handle
157 .server_start
158 .recv()
159 .await
160 .expect("recv server start msg failed !");
161 tx.send(handle)
162 .expect("send Handle out the server coroutine failed !");
163 });
164
165 let mut handle = rx.recv().expect("recv Handle failed !");
166
167 let body_date = vec![b'q'; 10 * 1024 * 1024];
168
169 let client = Client::builder()
170 .http2_prior_knowledge()
171 .set_stream_recv_window_size(100 * 1024)
172 .build()
173 .expect("Build Client failed.");
174
175 let request = Request::builder()
176 .version("HTTP/2.0")
177 .url(format!("{}:{}", "127.0.0.1", handle.port).as_str())
178 .method("GET")
179 .body(Body::slice(body_date))
180 .expect("Client build Request failed.");
181
182 rt.block_on(async move {
183 let mut response = client.request(request).await.expect("get response failed");
184 assert_eq!(response.status(), StatusCode::OK);
185
186 let mut buf = [0u8; 4096];
187 let mut size = 0;
188 loop {
189 let read = response
190 .body_mut()
191 .data(&mut buf[..])
192 .await
193 .expect("Response body read failed");
194 if read == 0 {
195 break;
196 }
197 size += read;
198 }
199 assert_eq!(
200 size,
201 10 * 1024 * 1024,
202 "Assert response body data length failed"
203 );
204
205 handle
206 .client_shutdown
207 .send(())
208 .await
209 .expect("send client shutdown");
210 handle
211 .server_shutdown
212 .recv()
213 .await
214 .expect("server shutdown");
215 })
216 }
217
218 #[test]
sdv_async_h2_client_send_request_concurrently()219 fn sdv_async_h2_client_send_request_concurrently() {
220 let rt = tokio::runtime::Builder::new_multi_thread()
221 .worker_threads(4)
222 .enable_all()
223 .build()
224 .expect("Build Runtime failed.");
225
226 let client = Client::builder()
227 .http2_prior_knowledge()
228 .set_stream_recv_window_size(100 * 1024)
229 .build()
230 .expect("Build Client failed.");
231
232 let (tx, rx) = std::sync::mpsc::channel();
233
234 rt.block_on(async move {
235 let mut handle = start_http_server!(HTTP; server_fn);
236 handle
237 .server_start
238 .recv()
239 .await
240 .expect("recv server start msg failed !");
241 tx.send(handle)
242 .expect("send Handle out the server coroutine failed !");
243 });
244
245 let mut handle = rx.recv().expect("recv Handle failed !");
246
247 let client_interface = Arc::new(client);
248 let mut shut_downs = vec![];
249
250 for _i in 0..5 {
251 let client = client_interface.clone();
252 let handle = rt.spawn(async move {
253 let body_date = vec![b'q'; 1024 * 1024 * 10];
254
255 let request = Request::builder()
256 .version("HTTP/2.0")
257 .url(format!("{}:{}", "127.0.0.1", handle.port).as_str())
258 .method("GET")
259 .body(Body::slice(body_date))
260 .expect("Client build Request failed.");
261
262 let mut response = client.request(request).await.expect("Get Response failed.");
263 let mut buf = [0u8; 4096];
264 let mut size = 0;
265
266 loop {
267 let read = response
268 .body_mut()
269 .data(&mut buf[..])
270 .await
271 .expect("Response body read failed");
272 if read == 0 {
273 break;
274 }
275 size += read;
276 }
277 assert_eq!(
278 size,
279 10 * 1024 * 1024,
280 "Assert response body data length failed"
281 );
282 });
283
284 shut_downs.push(handle);
285 }
286
287 for shut_down in shut_downs {
288 rt.block_on(shut_down)
289 .expect("Runtime wait for server shutdown failed");
290 }
291
292 rt.block_on(async move {
293 handle
294 .client_shutdown
295 .send(())
296 .await
297 .expect("send client shutdown");
298 handle
299 .server_shutdown
300 .recv()
301 .await
302 .expect("server shutdown");
303 });
304 }
305