1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io::{Cursor, Read}; 15 16 use ylong_http::body::{ChunkBodyDecoder, ChunkState, TextBodyDecoder}; 17 use ylong_http::headers::Headers; 18 19 use super::Body; 20 use crate::error::{ErrorKind, HttpClientError}; 21 use crate::sync_impl::conn::StreamData; 22 23 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 24 /// `HttpBody` implements `Body` trait, so users can call related methods to get 25 /// body data. 26 /// 27 /// # Examples 28 /// 29 /// ```no_run 30 /// use ylong_http_client::sync_impl::{Body, Client, EmptyBody, HttpBody, Request}; 31 /// 32 /// let mut client = Client::new(); 33 /// 34 /// // `HttpBody` is the body part of `response`. 35 /// let mut response = client.request(Request::new(EmptyBody)).unwrap(); 36 /// 37 /// // Users can use `Body::data` to get body data. 38 /// let mut buf = [0u8; 1024]; 39 /// loop { 40 /// let size = response.body_mut().data(&mut buf).unwrap(); 41 /// if size == 0 { 42 /// break; 43 /// } 44 /// let _data = &buf[..size]; 45 /// // Deals with the data. 46 /// } 47 /// ``` 48 pub struct HttpBody { 49 kind: Kind, 50 } 51 52 type BoxStreamData = Box<dyn StreamData>; 53 54 impl HttpBody { empty() -> Self55 pub(crate) fn empty() -> Self { 56 Self { kind: Kind::Empty } 57 } 58 text(len: u64, pre: &[u8], io: BoxStreamData) -> Self59 pub(crate) fn text(len: u64, pre: &[u8], io: BoxStreamData) -> Self { 60 Self { 61 kind: Kind::Text(Text::new(len, pre, io)), 62 } 63 } 64 chunk(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self65 pub(crate) fn chunk(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self { 66 Self { 67 kind: Kind::Chunk(Chunk::new(pre, io, is_trailer)), 68 } 69 } 70 } 71 72 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 73 enum Kind { 74 Empty, 75 Text(Text), 76 Chunk(Chunk), 77 } 78 79 struct Text { 80 decoder: TextBodyDecoder, 81 pre: Option<Cursor<Vec<u8>>>, 82 io: Option<BoxStreamData>, 83 } 84 85 impl Text { new(len: u64, pre: &[u8], io: BoxStreamData) -> Self86 pub(crate) fn new(len: u64, pre: &[u8], io: BoxStreamData) -> Self { 87 Self { 88 decoder: TextBodyDecoder::new(len), 89 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 90 io: Some(io), 91 } 92 } 93 } 94 95 impl Body for HttpBody { 96 type Error = HttpClientError; 97 data(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>98 fn data(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { 99 if buf.is_empty() { 100 return Ok(0); 101 } 102 103 match self.kind { 104 Kind::Empty => Ok(0), 105 Kind::Text(ref mut text) => text.data(buf), 106 Kind::Chunk(ref mut chunk) => chunk.data(buf), 107 } 108 } 109 trailer(&mut self) -> Result<Option<Headers>, Self::Error>110 fn trailer(&mut self) -> Result<Option<Headers>, Self::Error> { 111 match self.kind { 112 Kind::Chunk(ref mut chunk) => chunk.decoder.get_trailer().map_err(|_| { 113 HttpClientError::from_str(ErrorKind::BodyDecode, "Get trailer failed") 114 }), 115 _ => Ok(None), 116 } 117 } 118 } 119 120 impl Text { data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError>121 fn data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError> { 122 if buf.is_empty() { 123 return Ok(0); 124 } 125 126 let mut read = 0; 127 128 if let Some(pre) = self.pre.as_mut() { 129 // Here cursor read never failed. 130 let this_read = pre.read(buf).unwrap(); 131 if this_read == 0 { 132 self.pre = None; 133 } else { 134 read += this_read; 135 let (text, rem) = self.decoder.decode(&buf[..read]); 136 137 match (text.is_complete(), rem.is_empty()) { 138 (true, false) => { 139 if let Some(io) = self.io.take() { 140 io.shutdown(); 141 }; 142 return Err(HttpClientError::from_str(ErrorKind::BodyDecode, "Not Eof")); 143 } 144 (true, true) => { 145 self.io = None; 146 return Ok(read); 147 } 148 _ => {} 149 } 150 } 151 } 152 153 if !buf[read..].is_empty() { 154 if let Some(mut io) = self.io.take() { 155 match io.read(&mut buf[read..]) { 156 // Disconnected. 157 Ok(0) => { 158 io.shutdown(); 159 return Err(HttpClientError::from_str( 160 ErrorKind::BodyDecode, 161 "Response Body Incomplete", 162 )); 163 } 164 Ok(filled) => { 165 let (text, rem) = self.decoder.decode(&buf[read..read + filled]); 166 read += filled; 167 // Contains redundant `rem`, return error. 168 match (text.is_complete(), rem.is_empty()) { 169 (true, false) => { 170 io.shutdown(); 171 return Err(HttpClientError::from_str( 172 ErrorKind::BodyDecode, 173 "Not Eof", 174 )); 175 } 176 (true, true) => return Ok(read), 177 _ => {} 178 } 179 self.io = Some(io); 180 } 181 Err(e) => return Err(HttpClientError::from_error(ErrorKind::BodyTransfer, e)), 182 } 183 } 184 } 185 Ok(read) 186 } 187 } 188 189 struct Chunk { 190 decoder: ChunkBodyDecoder, 191 pre: Option<Cursor<Vec<u8>>>, 192 io: Option<BoxStreamData>, 193 } 194 195 impl Chunk { new(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self196 pub(crate) fn new(pre: &[u8], io: BoxStreamData, is_trailer: bool) -> Self { 197 Self { 198 decoder: ChunkBodyDecoder::new().contains_trailer(is_trailer), 199 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 200 io: Some(io), 201 } 202 } 203 } 204 205 impl Chunk { data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError>206 fn data(&mut self, buf: &mut [u8]) -> Result<usize, HttpClientError> { 207 if buf.is_empty() { 208 return Ok(0); 209 } 210 211 let mut read = 0; 212 213 while let Some(pre) = self.pre.as_mut() { 214 // Here cursor read never failed. 215 let size = pre.read(&mut buf[read..]).unwrap(); 216 217 if size == 0 { 218 self.pre = None; 219 } 220 221 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 222 read += size; 223 224 if flag { 225 // Return if we find a 0-sized chunk. 226 self.io = None; 227 return Ok(read); 228 } else if read != 0 { 229 // Return if we get some data. 230 return Ok(read); 231 } 232 } 233 234 // Here `read` must be 0. 235 while let Some(mut io) = self.io.take() { 236 match io.read(&mut buf[read..]) { 237 Ok(filled) => { 238 if filled == 0 { 239 io.shutdown(); 240 return Err(HttpClientError::from_str( 241 ErrorKind::BodyDecode, 242 "Response Body Incomplete", 243 )); 244 } 245 let (size, flag) = self.merge_chunks(&mut buf[read..read + filled])?; 246 read += size; 247 if flag { 248 // Return if we find a 0-sized chunk. 249 // Return if we get some data. 250 return Ok(read); 251 } 252 self.io = Some(io); 253 if read != 0 { 254 return Ok(read); 255 } 256 } 257 Err(e) => return Err(HttpClientError::from_error(ErrorKind::BodyTransfer, e)), 258 } 259 } 260 Ok(read) 261 } 262 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>263 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 264 // Here we need to merge the chunks into one data block and return. 265 // The data arrangement in buf is as follows: 266 // 267 // data in buf: 268 // +------+------+------+------+------+------+------+ 269 // | data | len | data | len | ... | data | len | 270 // +------+------+------+------+------+------+------+ 271 // 272 // We need to merge these data blocks into one block: 273 // 274 // after merge: 275 // +---------------------------+ 276 // | data | 277 // +---------------------------+ 278 279 let (chunks, junk) = self 280 .decoder 281 .decode(buf) 282 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 283 284 let mut finished = false; 285 let mut ptrs = Vec::new(); 286 for chunk in chunks.into_iter() { 287 if chunk.trailer().is_some() { 288 if chunk.state() == &ChunkState::Finish { 289 finished = true; 290 } 291 } else { 292 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 293 finished = true; 294 break; 295 } 296 let data = chunk.data(); 297 ptrs.push((data.as_ptr(), data.len())) 298 } 299 } 300 301 if finished && !junk.is_empty() { 302 return Err(HttpClientError::from_str( 303 ErrorKind::BodyDecode, 304 "Invalid Chunk Body", 305 )); 306 } 307 308 let start = buf.as_ptr(); 309 310 let mut idx = 0; 311 for (ptr, len) in ptrs.into_iter() { 312 let st = ptr as usize - start as usize; 313 let ed = st + len; 314 buf.copy_within(st..ed, idx); 315 idx += len; 316 } 317 Ok((idx, finished)) 318 } 319 } 320 321 #[cfg(test)] 322 mod ut_syn_http_body { 323 use crate::sync_impl::{Body, HttpBody}; 324 325 /// UT test cases for `HttpBody::empty`. 326 /// 327 /// # Brief 328 /// 1. Creates a `HttpBody` by calling `HttpBody::empty`. 329 /// 2. Calls `data` method. 330 /// 3. Checks if the result is correct. 331 #[test] ut_http_body_empty()332 fn ut_http_body_empty() { 333 let mut body = HttpBody::empty(); 334 let mut buf = []; 335 let data = body.data(&mut buf); 336 assert!(data.is_ok()); 337 assert_eq!(data.unwrap(), 0); 338 } 339 } 340