1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::pin::Pin;
15 use std::sync::Arc;
16 use std::task::{Context, Poll};
17
18 use ylong_http::body::async_impl::Body;
19 use ylong_http::body::{ChunkBody, TextBody};
20 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
21 use ylong_http::request::uri::Scheme;
22 use ylong_http::response::ResponsePart;
23 use ylong_http::version::Version;
24
25 use super::StreamData;
26 use crate::async_impl::connector::ConnInfo;
27 use crate::async_impl::interceptor::Interceptors;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::dispatcher::http1::Http1Conn;
33 use crate::util::normalizer::BodyLengthParser;
34
35 const TEMP_BUF_SIZE: usize = 16 * 1024;
36
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,37 pub(crate) async fn request<S>(
38 mut conn: Http1Conn<S>,
39 mut message: Message,
40 ) -> Result<Response, HttpClientError>
41 where
42 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
43 {
44 message
45 .interceptor
46 .intercept_connection(conn.raw_mut().conn_detail())?;
47 message
48 .interceptor
49 .intercept_request(message.request.ref_mut())?;
50 let mut buf = vec![0u8; TEMP_BUF_SIZE];
51
52 encode_request_part(
53 message.request.ref_mut(),
54 &message.interceptor,
55 &mut conn,
56 &mut buf,
57 )
58 .await?;
59 encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
60
61 // Decodes response part.
62 let (part, pre) = {
63 let mut decoder = ResponseDecoder::new();
64 loop {
65 let size = match conn.raw_mut().read(buf.as_mut_slice()).await {
66 Ok(0) => {
67 conn.shutdown();
68 return err_from_msg!(Request, "Tcp closed");
69 }
70 Ok(size) => size,
71 Err(e) => {
72 conn.shutdown();
73 return err_from_io!(Request, e);
74 }
75 };
76
77 message.interceptor.intercept_output(&buf[..size])?;
78 match decoder.decode(&buf[..size]) {
79 Ok(None) => {}
80 Ok(Some((part, rem))) => break (part, rem),
81 Err(e) => {
82 conn.shutdown();
83 return err_from_other!(Request, e);
84 }
85 }
86 }
87 };
88
89 decode_response(message, part, conn, pre)
90 }
91
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,92 async fn encode_various_body<S>(
93 request: &mut Request,
94 conn: &mut Http1Conn<S>,
95 buf: &mut [u8],
96 ) -> Result<(), HttpClientError>
97 where
98 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
99 {
100 let content_length = request
101 .part()
102 .headers
103 .get("Content-Length")
104 .and_then(|v| v.to_string().ok())
105 .and_then(|v| v.parse::<u64>().ok())
106 .is_some();
107
108 let transfer_encoding = request
109 .part()
110 .headers
111 .get("Transfer-Encoding")
112 .and_then(|v| v.to_string().ok())
113 .map(|v| v.contains("chunked"))
114 .unwrap_or(false);
115
116 let body = request.body_mut();
117
118 match (content_length, transfer_encoding) {
119 (_, true) => {
120 let body = ChunkBody::from_async_reader(body);
121 encode_body(conn, body, buf).await?;
122 }
123 (true, false) => {
124 let body = TextBody::from_async_reader(body);
125 encode_body(conn, body, buf).await?;
126 }
127 (false, false) => {
128 let body = TextBody::from_async_reader(body);
129 encode_body(conn, body, buf).await?;
130 }
131 };
132 Ok(())
133 }
134
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,135 async fn encode_request_part<S>(
136 request: &Request,
137 interceptor: &Arc<Interceptors>,
138 conn: &mut Http1Conn<S>,
139 buf: &mut [u8],
140 ) -> Result<(), HttpClientError>
141 where
142 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
143 {
144 // Encodes and sends Request-line and Headers(non-body fields).
145 let mut part_encoder = RequestEncoder::new(request.part().clone());
146 if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
147 part_encoder.absolute_uri(true);
148 }
149 loop {
150 match part_encoder.encode(&mut buf[..]) {
151 Ok(0) => break,
152 Ok(written) => {
153 interceptor.intercept_input(&buf[..written])?;
154 // RequestEncoder writes `buf` as much as possible.
155 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
156 conn.shutdown();
157 return err_from_io!(Request, e);
158 }
159 }
160 Err(e) => {
161 conn.shutdown();
162 return err_from_other!(Request, e);
163 }
164 }
165 }
166 Ok(())
167 }
168
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,169 fn decode_response<S>(
170 mut message: Message,
171 part: ResponsePart,
172 conn: Http1Conn<S>,
173 pre: &[u8],
174 ) -> Result<Response, HttpClientError>
175 where
176 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
177 {
178 // The shutdown function only sets the current connection to the closed state
179 // and does not release the connection immediately.
180 // Instead, the connection will be completely closed
181 // when the body has finished reading or when the body is released.
182 match part.headers.get("Connection") {
183 None => {
184 if part.version == Version::HTTP1_0 {
185 conn.shutdown()
186 }
187 }
188 Some(value) => {
189 if part.version == Version::HTTP1_0 {
190 if value
191 .to_string()
192 .ok()
193 .and_then(|v| v.find("keep-alive"))
194 .is_none()
195 {
196 conn.shutdown()
197 }
198 } else if value
199 .to_string()
200 .ok()
201 .and_then(|v| v.find("close"))
202 .is_some()
203 {
204 conn.shutdown()
205 }
206 }
207 }
208
209 let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
210 Ok(length) => length,
211 Err(e) => {
212 conn.shutdown();
213 return Err(e);
214 }
215 };
216
217 let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
218 Ok(Response::new(
219 ylong_http::response::Response::from_raw_parts(part, body),
220 ))
221 }
222
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,223 async fn encode_body<S, T>(
224 conn: &mut Http1Conn<S>,
225 mut body: T,
226 buf: &mut [u8],
227 ) -> Result<(), HttpClientError>
228 where
229 T: Body,
230 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
231 {
232 // Encodes Request Body.
233 let mut written = 0;
234 let mut end_body = false;
235 while !end_body {
236 if written < buf.len() {
237 let result = body.data(&mut buf[written..]).await;
238 let (read, end) = read_body_result::<S, T>(conn, result)?;
239 written += read;
240 end_body = end;
241 }
242 if written == buf.len() || end_body {
243 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
244 conn.shutdown();
245 return err_from_io!(BodyTransfer, e);
246 }
247 written = 0;
248 }
249 }
250 Ok(())
251 }
252
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,253 fn read_body_result<S, T>(
254 conn: &mut Http1Conn<S>,
255 result: Result<usize, T::Error>,
256 ) -> Result<(usize, bool), HttpClientError>
257 where
258 T: Body,
259 {
260 let mut curr = 0;
261 let mut end_body = false;
262 match result {
263 Ok(0) => end_body = true,
264 Ok(size) => curr = size,
265 Err(e) => {
266 conn.shutdown();
267
268 let error = e.into();
269 // When using `Uploader`, here we can get `UserAborted` error.
270 return if error.source().is_some() {
271 Err(HttpClientError::user_aborted())
272 } else {
273 err_from_other!(BodyTransfer, error)
274 };
275 }
276 }
277 Ok((curr, end_body))
278 }
279
280 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>281 fn poll_read(
282 mut self: Pin<&mut Self>,
283 cx: &mut Context<'_>,
284 buf: &mut ReadBuf<'_>,
285 ) -> Poll<std::io::Result<()>> {
286 Pin::new(self.raw_mut()).poll_read(cx, buf)
287 }
288 }
289
290 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)291 fn shutdown(&self) {
292 Self::shutdown(self)
293 }
294 }
295