1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #![cfg(all(
15     feature = "async",
16     feature = "http2",
17     feature = "tokio_base",
18     not(feature = "__tls")
19 ))]
20 
21 use std::convert::Infallible;
22 use std::sync::Arc;
23 
24 use hyper::body::HttpBody;
25 use tokio::sync::mpsc::{Receiver, Sender};
26 use ylong_http::body::async_impl::Body as RespBody;
27 use ylong_http::response::status::StatusCode;
28 use ylong_http_client::async_impl::{Body, Client, Request};
29 
30 pub struct HttpHandle {
31     pub port: u16,
32 
33     // This channel allows the server to notify the client when it is up and running.
34     pub server_start: Receiver<()>,
35 
36     // This channel allows the client to notify the server when it is ready to shut down.
37     pub client_shutdown: Sender<()>,
38 
39     // This channel allows the server to notify the client when it has shut down.
40     pub server_shutdown: Receiver<()>,
41 }
42 
server_fn( req: hyper::Request<hyper::Body>, ) -> Result<hyper::Response<hyper::Body>, Infallible>43 async fn server_fn(
44     req: hyper::Request<hyper::Body>,
45 ) -> Result<hyper::Response<hyper::Body>, Infallible> {
46     let (parts, mut body) = req.into_parts();
47     assert_eq!(
48         parts.method.to_string(),
49         "GET",
50         "Assert request method failed"
51     );
52     assert_eq!(
53         format!("{:?}", parts.version),
54         "HTTP/2.0",
55         "Assert request version failed"
56     );
57 
58     let mut size = 0;
59     loop {
60         match body.data().await {
61             None => {
62                 break;
63             }
64             Some(Ok(bytes)) => {
65                 size += bytes.len();
66             }
67             Some(Err(_e)) => {
68                 panic!("server read request body data occurs error");
69             }
70         }
71     }
72     assert_eq!(
73         size,
74         10 * 1024 * 1024,
75         "Assert request body data length failed"
76     );
77 
78     let body_data = vec![b'q'; 10 * 1024 * 1024];
79     let response = hyper::Response::builder()
80         .version(hyper::Version::HTTP_2)
81         .status(hyper::StatusCode::OK)
82         .body(hyper::Body::from(body_data))
83         .expect("build hyper response failed");
84     Ok(response)
85 }
86 
87 #[macro_export]
88 macro_rules! start_http_server {
89     (
90         HTTP;
91         $server_fn: ident
92     ) => {{
93         use std::convert::Infallible;
94 
95         use hyper::service::{make_service_fn, service_fn};
96         use tokio::sync::mpsc::channel;
97 
98         let (start_tx, start_rx) = channel::<()>(1);
99         let (client_tx, mut client_rx) = channel::<()>(1);
100         let (server_tx, server_rx) = channel::<()>(1);
101 
102         let tcp_listener =
103             std::net::TcpListener::bind("127.0.0.1:0").expect("server bind port failed !");
104         let addr = tcp_listener
105             .local_addr()
106             .expect("get server local address failed!");
107         let port = addr.port();
108 
109         let server = hyper::Server::from_tcp(tcp_listener)
110             .expect("build hyper server from tcp listener failed !");
111 
112         tokio::spawn(async move {
113             let make_svc =
114                 make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn($server_fn)) });
115             server
116                 .serve(make_svc)
117                 .with_graceful_shutdown(async {
118                     start_tx
119                         .send(())
120                         .await
121                         .expect("Start channel (Client-Half) be closed unexpectedly");
122                     client_rx
123                         .recv()
124                         .await
125                         .expect("Client channel (Client-Half) be closed unexpectedly");
126                 })
127                 .await
128                 .expect("Start server failed");
129             server_tx
130                 .send(())
131                 .await
132                 .expect("Server channel (Client-Half) be closed unexpectedly");
133         });
134 
135         HttpHandle {
136             port,
137             server_start: start_rx,
138             client_shutdown: client_tx,
139             server_shutdown: server_rx,
140         }
141     }};
142 }
143 
144 #[test]
sdv_async_h2_client_send_request()145 fn sdv_async_h2_client_send_request() {
146     let rt = tokio::runtime::Builder::new_multi_thread()
147         .worker_threads(4)
148         .enable_all()
149         .build()
150         .expect("init runtime failed !");
151 
152     let (tx, rx) = std::sync::mpsc::channel();
153 
154     rt.block_on(async move {
155         let mut handle = start_http_server!(HTTP; server_fn);
156         handle
157             .server_start
158             .recv()
159             .await
160             .expect("recv server start msg failed !");
161         tx.send(handle)
162             .expect("send Handle out the server coroutine failed !");
163     });
164 
165     let mut handle = rx.recv().expect("recv Handle failed !");
166 
167     let body_date = vec![b'q'; 10 * 1024 * 1024];
168 
169     let client = Client::builder()
170         .http2_prior_knowledge()
171         .set_stream_recv_window_size(100 * 1024)
172         .build()
173         .expect("Build Client failed.");
174 
175     let request = Request::builder()
176         .version("HTTP/2.0")
177         .url(format!("{}:{}", "127.0.0.1", handle.port).as_str())
178         .method("GET")
179         .body(Body::slice(body_date))
180         .expect("Client build Request failed.");
181 
182     rt.block_on(async move {
183         let mut response = client.request(request).await.expect("get response failed");
184         assert_eq!(response.status(), StatusCode::OK);
185 
186         let mut buf = [0u8; 4096];
187         let mut size = 0;
188         loop {
189             let read = response
190                 .body_mut()
191                 .data(&mut buf[..])
192                 .await
193                 .expect("Response body read failed");
194             if read == 0 {
195                 break;
196             }
197             size += read;
198         }
199         assert_eq!(
200             size,
201             10 * 1024 * 1024,
202             "Assert response body data length failed"
203         );
204 
205         handle
206             .client_shutdown
207             .send(())
208             .await
209             .expect("send client shutdown");
210         handle
211             .server_shutdown
212             .recv()
213             .await
214             .expect("server shutdown");
215     })
216 }
217 
218 #[test]
sdv_async_h2_client_send_request_concurrently()219 fn sdv_async_h2_client_send_request_concurrently() {
220     let rt = tokio::runtime::Builder::new_multi_thread()
221         .worker_threads(4)
222         .enable_all()
223         .build()
224         .expect("Build Runtime failed.");
225 
226     let client = Client::builder()
227         .http2_prior_knowledge()
228         .set_stream_recv_window_size(100 * 1024)
229         .build()
230         .expect("Build Client failed.");
231 
232     let (tx, rx) = std::sync::mpsc::channel();
233 
234     rt.block_on(async move {
235         let mut handle = start_http_server!(HTTP; server_fn);
236         handle
237             .server_start
238             .recv()
239             .await
240             .expect("recv server start msg failed !");
241         tx.send(handle)
242             .expect("send Handle out the server coroutine failed !");
243     });
244 
245     let mut handle = rx.recv().expect("recv Handle failed !");
246 
247     let client_interface = Arc::new(client);
248     let mut shut_downs = vec![];
249 
250     for _i in 0..5 {
251         let client = client_interface.clone();
252         let handle = rt.spawn(async move {
253             let body_date = vec![b'q'; 1024 * 1024 * 10];
254 
255             let request = Request::builder()
256                 .version("HTTP/2.0")
257                 .url(format!("{}:{}", "127.0.0.1", handle.port).as_str())
258                 .method("GET")
259                 .body(Body::slice(body_date))
260                 .expect("Client build Request failed.");
261 
262             let mut response = client.request(request).await.expect("Get Response failed.");
263             let mut buf = [0u8; 4096];
264             let mut size = 0;
265 
266             loop {
267                 let read = response
268                     .body_mut()
269                     .data(&mut buf[..])
270                     .await
271                     .expect("Response body read failed");
272                 if read == 0 {
273                     break;
274                 }
275                 size += read;
276             }
277             assert_eq!(
278                 size,
279                 10 * 1024 * 1024,
280                 "Assert response body data length failed"
281             );
282         });
283 
284         shut_downs.push(handle);
285     }
286 
287     for shut_down in shut_downs {
288         rt.block_on(shut_down)
289             .expect("Runtime wait for server shutdown failed");
290     }
291 
292     rt.block_on(async move {
293         handle
294             .client_shutdown
295             .send(())
296             .await
297             .expect("send client shutdown");
298         handle
299             .server_shutdown
300             .recv()
301             .await
302             .expect("server shutdown");
303     });
304 }
305