1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::pin::Pin;
15 use std::sync::Arc;
16 use std::task::{Context, Poll};
17 
18 use ylong_http::body::async_impl::Body;
19 use ylong_http::body::{ChunkBody, TextBody};
20 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
21 use ylong_http::request::uri::Scheme;
22 use ylong_http::response::ResponsePart;
23 use ylong_http::version::Version;
24 
25 use super::StreamData;
26 use crate::async_impl::connector::ConnInfo;
27 use crate::async_impl::interceptor::Interceptors;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::dispatcher::http1::Http1Conn;
33 use crate::util::normalizer::BodyLengthParser;
34 
35 const TEMP_BUF_SIZE: usize = 16 * 1024;
36 
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,37 pub(crate) async fn request<S>(
38     mut conn: Http1Conn<S>,
39     mut message: Message,
40 ) -> Result<Response, HttpClientError>
41 where
42     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
43 {
44     message
45         .interceptor
46         .intercept_connection(conn.raw_mut().conn_detail())?;
47     message
48         .interceptor
49         .intercept_request(message.request.ref_mut())?;
50     let mut buf = vec![0u8; TEMP_BUF_SIZE];
51 
52     encode_request_part(
53         message.request.ref_mut(),
54         &message.interceptor,
55         &mut conn,
56         &mut buf,
57     )
58     .await?;
59     encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
60 
61     // Decodes response part.
62     let (part, pre) = {
63         let mut decoder = ResponseDecoder::new();
64         loop {
65             let size = match conn.raw_mut().read(buf.as_mut_slice()).await {
66                 Ok(0) => {
67                     conn.shutdown();
68                     return err_from_msg!(Request, "Tcp closed");
69                 }
70                 Ok(size) => size,
71                 Err(e) => {
72                     conn.shutdown();
73                     return err_from_io!(Request, e);
74                 }
75             };
76 
77             message.interceptor.intercept_output(&buf[..size])?;
78             match decoder.decode(&buf[..size]) {
79                 Ok(None) => {}
80                 Ok(Some((part, rem))) => break (part, rem),
81                 Err(e) => {
82                     conn.shutdown();
83                     return err_from_other!(Request, e);
84                 }
85             }
86         }
87     };
88 
89     decode_response(message, part, conn, pre)
90 }
91 
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,92 async fn encode_various_body<S>(
93     request: &mut Request,
94     conn: &mut Http1Conn<S>,
95     buf: &mut [u8],
96 ) -> Result<(), HttpClientError>
97 where
98     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
99 {
100     let content_length = request
101         .part()
102         .headers
103         .get("Content-Length")
104         .and_then(|v| v.to_string().ok())
105         .and_then(|v| v.parse::<u64>().ok())
106         .is_some();
107 
108     let transfer_encoding = request
109         .part()
110         .headers
111         .get("Transfer-Encoding")
112         .and_then(|v| v.to_string().ok())
113         .map(|v| v.contains("chunked"))
114         .unwrap_or(false);
115 
116     let body = request.body_mut();
117 
118     match (content_length, transfer_encoding) {
119         (_, true) => {
120             let body = ChunkBody::from_async_reader(body);
121             encode_body(conn, body, buf).await?;
122         }
123         (true, false) => {
124             let body = TextBody::from_async_reader(body);
125             encode_body(conn, body, buf).await?;
126         }
127         (false, false) => {
128             let body = TextBody::from_async_reader(body);
129             encode_body(conn, body, buf).await?;
130         }
131     };
132     Ok(())
133 }
134 
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,135 async fn encode_request_part<S>(
136     request: &Request,
137     interceptor: &Arc<Interceptors>,
138     conn: &mut Http1Conn<S>,
139     buf: &mut [u8],
140 ) -> Result<(), HttpClientError>
141 where
142     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
143 {
144     // Encodes and sends Request-line and Headers(non-body fields).
145     let mut part_encoder = RequestEncoder::new(request.part().clone());
146     if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
147         part_encoder.absolute_uri(true);
148     }
149     loop {
150         match part_encoder.encode(&mut buf[..]) {
151             Ok(0) => break,
152             Ok(written) => {
153                 interceptor.intercept_input(&buf[..written])?;
154                 // RequestEncoder writes `buf` as much as possible.
155                 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
156                     conn.shutdown();
157                     return err_from_io!(Request, e);
158                 }
159             }
160             Err(e) => {
161                 conn.shutdown();
162                 return err_from_other!(Request, e);
163             }
164         }
165     }
166     Ok(())
167 }
168 
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,169 fn decode_response<S>(
170     mut message: Message,
171     part: ResponsePart,
172     conn: Http1Conn<S>,
173     pre: &[u8],
174 ) -> Result<Response, HttpClientError>
175 where
176     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
177 {
178     // The shutdown function only sets the current connection to the closed state
179     // and does not release the connection immediately.
180     // Instead, the connection will be completely closed
181     // when the body has finished reading or when the body is released.
182     match part.headers.get("Connection") {
183         None => {
184             if part.version == Version::HTTP1_0 {
185                 conn.shutdown()
186             }
187         }
188         Some(value) => {
189             if part.version == Version::HTTP1_0 {
190                 if value
191                     .to_string()
192                     .ok()
193                     .and_then(|v| v.find("keep-alive"))
194                     .is_none()
195                 {
196                     conn.shutdown()
197                 }
198             } else if value
199                 .to_string()
200                 .ok()
201                 .and_then(|v| v.find("close"))
202                 .is_some()
203             {
204                 conn.shutdown()
205             }
206         }
207     }
208 
209     let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
210         Ok(length) => length,
211         Err(e) => {
212             conn.shutdown();
213             return Err(e);
214         }
215     };
216 
217     let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
218     Ok(Response::new(
219         ylong_http::response::Response::from_raw_parts(part, body),
220     ))
221 }
222 
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,223 async fn encode_body<S, T>(
224     conn: &mut Http1Conn<S>,
225     mut body: T,
226     buf: &mut [u8],
227 ) -> Result<(), HttpClientError>
228 where
229     T: Body,
230     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
231 {
232     // Encodes Request Body.
233     let mut written = 0;
234     let mut end_body = false;
235     while !end_body {
236         if written < buf.len() {
237             let result = body.data(&mut buf[written..]).await;
238             let (read, end) = read_body_result::<S, T>(conn, result)?;
239             written += read;
240             end_body = end;
241         }
242         if written == buf.len() || end_body {
243             if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
244                 conn.shutdown();
245                 return err_from_io!(BodyTransfer, e);
246             }
247             written = 0;
248         }
249     }
250     Ok(())
251 }
252 
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,253 fn read_body_result<S, T>(
254     conn: &mut Http1Conn<S>,
255     result: Result<usize, T::Error>,
256 ) -> Result<(usize, bool), HttpClientError>
257 where
258     T: Body,
259 {
260     let mut curr = 0;
261     let mut end_body = false;
262     match result {
263         Ok(0) => end_body = true,
264         Ok(size) => curr = size,
265         Err(e) => {
266             conn.shutdown();
267 
268             let error = e.into();
269             // When using `Uploader`, here we can get `UserAborted` error.
270             return if error.source().is_some() {
271                 Err(HttpClientError::user_aborted())
272             } else {
273                 err_from_other!(BodyTransfer, error)
274             };
275         }
276     }
277     Ok((curr, end_body))
278 }
279 
280 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>281     fn poll_read(
282         mut self: Pin<&mut Self>,
283         cx: &mut Context<'_>,
284         buf: &mut ReadBuf<'_>,
285     ) -> Poll<std::io::Result<()>> {
286         Pin::new(self.raw_mut()).poll_read(cx, buf)
287     }
288 }
289 
290 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)291     fn shutdown(&self) {
292         Self::shutdown(self)
293     }
294 }
295