1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use core::pin::Pin; 15 use core::task::{Context, Poll}; 16 use std::future::Future; 17 use std::io::{Cursor, Read}; 18 use std::sync::Arc; 19 20 use ylong_http::body::async_impl::Body; 21 use ylong_http::body::TextBodyDecoder; 22 #[cfg(feature = "http1_1")] 23 use ylong_http::body::{ChunkBodyDecoder, ChunkState}; 24 use ylong_http::headers::Headers; 25 26 use super::conn::StreamData; 27 use crate::async_impl::interceptor::Interceptors; 28 use crate::error::{ErrorKind, HttpClientError}; 29 use crate::runtime::{AsyncRead, ReadBuf, Sleep}; 30 use crate::util::normalizer::BodyLength; 31 32 const TRAILER_SIZE: usize = 1024; 33 34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`. 35 /// `HttpBody` implements `Body` trait, so users can call related methods to get 36 /// body data. 37 /// 38 /// # Examples 39 /// 40 /// ```no_run 41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request}; 42 /// use ylong_http_client::HttpClientError; 43 /// 44 /// async fn read_body() -> Result<(), HttpClientError> { 45 /// let client = Client::new(); 46 /// 47 /// // `HttpBody` is the body part of `response`. 48 /// let mut response = client 49 /// .request(Request::builder().body(Body::empty())?) 50 /// .await?; 51 /// 52 /// // Users can use `Body::data` to get body data. 53 /// let mut buf = [0u8; 1024]; 54 /// loop { 55 /// let size = response.data(&mut buf).await.unwrap(); 56 /// if size == 0 { 57 /// break; 58 /// } 59 /// let _data = &buf[..size]; 60 /// // Deals with the data. 61 /// } 62 /// Ok(()) 63 /// } 64 /// ``` 65 pub struct HttpBody { 66 kind: Kind, 67 sleep: Option<Pin<Box<Sleep>>>, 68 } 69 70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>; 71 72 impl HttpBody { new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>73 pub(crate) fn new( 74 interceptors: Arc<Interceptors>, 75 body_length: BodyLength, 76 io: BoxStreamData, 77 pre: &[u8], 78 ) -> Result<Self, HttpClientError> { 79 let kind = match body_length { 80 BodyLength::Empty => { 81 if !pre.is_empty() { 82 // TODO: Consider the case where BodyLength is empty but pre is not empty. 83 io.shutdown(); 84 return err_from_msg!(Request, "Body length is 0 but read extra data"); 85 } 86 Kind::Empty 87 } 88 BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)), 89 BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)), 90 91 #[cfg(feature = "http1_1")] 92 BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)), 93 }; 94 Ok(Self { kind, sleep: None }) 95 } 96 set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)97 pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) { 98 self.sleep = sleep; 99 } 100 } 101 102 impl Body for HttpBody { 103 type Error = HttpClientError; 104 poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>105 fn poll_data( 106 mut self: Pin<&mut Self>, 107 cx: &mut Context<'_>, 108 buf: &mut [u8], 109 ) -> Poll<Result<usize, Self::Error>> { 110 if buf.is_empty() { 111 return Poll::Ready(Ok(0)); 112 } 113 114 if let Some(delay) = self.sleep.as_mut() { 115 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 116 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into())); 117 } 118 } 119 120 match self.kind { 121 Kind::Empty => Poll::Ready(Ok(0)), 122 Kind::Text(ref mut text) => text.data(cx, buf), 123 Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf), 124 #[cfg(feature = "http1_1")] 125 Kind::Chunk(ref mut chunk) => chunk.data(cx, buf), 126 } 127 } 128 poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>129 fn poll_trailer( 130 mut self: Pin<&mut Self>, 131 cx: &mut Context<'_>, 132 ) -> Poll<Result<Option<Headers>, Self::Error>> { 133 // Get trailer data from io 134 if let Some(delay) = self.sleep.as_mut() { 135 if let Poll::Ready(()) = Pin::new(delay).poll(cx) { 136 return Poll::Ready(err_from_msg!(Timeout, "Request timeout")); 137 } 138 } 139 140 let mut read_buf = [0_u8; TRAILER_SIZE]; 141 142 match self.kind { 143 #[cfg(feature = "http1_1")] 144 Kind::Chunk(ref mut chunk) => { 145 match chunk.data(cx, &mut read_buf) { 146 Poll::Ready(Ok(_)) => {} 147 Poll::Pending => { 148 return Poll::Pending; 149 } 150 Poll::Ready(Err(e)) => { 151 return Poll::Ready(Err(e)); 152 } 153 } 154 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| { 155 HttpClientError::from_error(ErrorKind::BodyDecode, e) 156 })?)) 157 } 158 _ => Poll::Ready(Ok(None)), 159 } 160 } 161 } 162 163 impl Drop for HttpBody { drop(&mut self)164 fn drop(&mut self) { 165 let io = match self.kind { 166 Kind::Text(ref mut text) => text.io.as_mut(), 167 #[cfg(feature = "http1_1")] 168 Kind::Chunk(ref mut chunk) => chunk.io.as_mut(), 169 Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(), 170 _ => None, 171 }; 172 // If response body is not totally read, shutdown io. 173 if let Some(io) = io { 174 io.shutdown() 175 } 176 } 177 } 178 179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation. 180 enum Kind { 181 Empty, 182 Text(Text), 183 #[cfg(feature = "http1_1")] 184 Chunk(Chunk), 185 UntilClose(UntilClose), 186 } 187 188 struct UntilClose { 189 interceptors: Arc<Interceptors>, 190 pre: Option<Cursor<Vec<u8>>>, 191 io: Option<BoxStreamData>, 192 } 193 194 impl UntilClose { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self195 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 196 Self { 197 interceptors, 198 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 199 io: Some(io), 200 } 201 } 202 data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>203 fn data( 204 &mut self, 205 cx: &mut Context<'_>, 206 buf: &mut [u8], 207 ) -> Poll<Result<usize, HttpClientError>> { 208 if buf.is_empty() { 209 return Poll::Ready(Ok(0)); 210 } 211 let mut read = 0; 212 if let Some(pre) = self.pre.as_mut() { 213 // Here cursor read never failed. 214 let this_read = Read::read(pre, buf).unwrap(); 215 if this_read == 0 { 216 self.pre = None; 217 } else { 218 read += this_read; 219 } 220 } 221 222 if !buf[read..].is_empty() { 223 if let Some(io) = self.io.take() { 224 return self.poll_read_io(cx, io, read, buf); 225 } 226 } 227 Poll::Ready(Ok(read)) 228 } 229 poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>230 fn poll_read_io( 231 &mut self, 232 cx: &mut Context<'_>, 233 mut io: BoxStreamData, 234 read: usize, 235 buf: &mut [u8], 236 ) -> Poll<Result<usize, HttpClientError>> { 237 let mut read = read; 238 let mut read_buf = ReadBuf::new(&mut buf[read..]); 239 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 240 // Disconnected. 241 Poll::Ready(Ok(())) => { 242 let filled = read_buf.filled().len(); 243 if filled == 0 { 244 io.shutdown(); 245 } else { 246 self.interceptors 247 .intercept_output(&buf[read..(read + filled)])?; 248 self.io = Some(io); 249 } 250 read += filled; 251 Poll::Ready(Ok(read)) 252 } 253 Poll::Pending => { 254 self.io = Some(io); 255 if read != 0 { 256 return Poll::Ready(Ok(read)); 257 } 258 Poll::Pending 259 } 260 Poll::Ready(Err(e)) => { 261 // If IO error occurs, shutdowns `io` before return. 262 io.shutdown(); 263 Poll::Ready(err_from_io!(BodyTransfer, e)) 264 } 265 } 266 } 267 } 268 269 struct Text { 270 interceptors: Arc<Interceptors>, 271 decoder: TextBodyDecoder, 272 pre: Option<Cursor<Vec<u8>>>, 273 io: Option<BoxStreamData>, 274 } 275 276 impl Text { new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self277 pub(crate) fn new( 278 len: u64, 279 pre: &[u8], 280 io: BoxStreamData, 281 interceptors: Arc<Interceptors>, 282 ) -> Self { 283 Self { 284 interceptors, 285 decoder: TextBodyDecoder::new(len), 286 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 287 io: Some(io), 288 } 289 } 290 } 291 292 impl Text { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>293 fn data( 294 &mut self, 295 cx: &mut Context<'_>, 296 buf: &mut [u8], 297 ) -> Poll<Result<usize, HttpClientError>> { 298 if buf.is_empty() { 299 return Poll::Ready(Ok(0)); 300 } 301 302 let mut read = 0; 303 304 if let Some(pre) = self.pre.as_mut() { 305 // Here cursor read never failed. 306 let this_read = Read::read(pre, buf).unwrap(); 307 if this_read == 0 { 308 self.pre = None; 309 } else { 310 read += this_read; 311 if let Some(result) = self.read_remaining(buf, read) { 312 return result; 313 } 314 } 315 } 316 317 if !buf[read..].is_empty() { 318 if let Some(io) = self.io.take() { 319 return self.poll_read_io(cx, buf, io, read); 320 } 321 } 322 Poll::Ready(Ok(read)) 323 } 324 read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>325 fn read_remaining( 326 &mut self, 327 buf: &mut [u8], 328 read: usize, 329 ) -> Option<Poll<Result<usize, HttpClientError>>> { 330 let (text, rem) = self.decoder.decode(&buf[..read]); 331 332 // Contains redundant `rem`, return error. 333 match (text.is_complete(), rem.is_empty()) { 334 (true, false) => { 335 if let Some(io) = self.io.take() { 336 io.shutdown(); 337 }; 338 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))) 339 } 340 (true, true) => { 341 self.io = None; 342 Some(Poll::Ready(Ok(read))) 343 } 344 // TextBodyDecoder decodes as much as possible here. 345 _ => None, 346 } 347 } 348 poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>349 fn poll_read_io( 350 &mut self, 351 cx: &mut Context<'_>, 352 buf: &mut [u8], 353 mut io: BoxStreamData, 354 read: usize, 355 ) -> Poll<Result<usize, HttpClientError>> { 356 let mut read = read; 357 let mut read_buf = ReadBuf::new(&mut buf[read..]); 358 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 359 // Disconnected. 360 Poll::Ready(Ok(())) => { 361 let filled = read_buf.filled().len(); 362 if filled == 0 { 363 io.shutdown(); 364 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 365 } 366 let (text, rem) = self.decoder.decode(read_buf.filled()); 367 self.interceptors.intercept_output(read_buf.filled())?; 368 read += filled; 369 // Contains redundant `rem`, return error. 370 match (text.is_complete(), rem.is_empty()) { 371 (true, false) => { 372 io.shutdown(); 373 Poll::Ready(err_from_msg!(BodyDecode, "Not eof")) 374 } 375 (true, true) => Poll::Ready(Ok(read)), 376 _ => { 377 self.io = Some(io); 378 Poll::Ready(Ok(read)) 379 } 380 } 381 } 382 Poll::Pending => { 383 self.io = Some(io); 384 if read != 0 { 385 return Poll::Ready(Ok(read)); 386 } 387 Poll::Pending 388 } 389 Poll::Ready(Err(e)) => { 390 // If IO error occurs, shutdowns `io` before return. 391 io.shutdown(); 392 Poll::Ready(err_from_io!(BodyDecode, e)) 393 } 394 } 395 } 396 } 397 398 #[cfg(feature = "http1_1")] 399 struct Chunk { 400 interceptors: Arc<Interceptors>, 401 decoder: ChunkBodyDecoder, 402 pre: Option<Cursor<Vec<u8>>>, 403 io: Option<BoxStreamData>, 404 } 405 406 #[cfg(feature = "http1_1")] 407 impl Chunk { new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self408 pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self { 409 Self { 410 interceptors, 411 decoder: ChunkBodyDecoder::new().contains_trailer(true), 412 pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), 413 io: Some(io), 414 } 415 } 416 } 417 418 #[cfg(feature = "http1_1")] 419 impl Chunk { data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>420 fn data( 421 &mut self, 422 cx: &mut Context<'_>, 423 buf: &mut [u8], 424 ) -> Poll<Result<usize, HttpClientError>> { 425 if buf.is_empty() { 426 return Poll::Ready(Ok(0)); 427 } 428 429 let mut read = 0; 430 431 while let Some(pre) = self.pre.as_mut() { 432 // Here cursor read never failed. 433 let size = Read::read(pre, &mut buf[read..]).unwrap(); 434 if size == 0 { 435 self.pre = None; 436 } 437 438 let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?; 439 read += size; 440 441 if flag { 442 // Return if we find a 0-sized chunk. 443 self.io = None; 444 return Poll::Ready(Ok(read)); 445 } else if read != 0 { 446 // Return if we get some data. 447 return Poll::Ready(Ok(read)); 448 } 449 } 450 451 // Here `read` must be 0. 452 while let Some(mut io) = self.io.take() { 453 let mut read_buf = ReadBuf::new(&mut buf[read..]); 454 match Pin::new(&mut io).poll_read(cx, &mut read_buf) { 455 Poll::Ready(Ok(())) => { 456 let filled = read_buf.filled().len(); 457 if filled == 0 { 458 io.shutdown(); 459 return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); 460 } 461 let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; 462 self.interceptors.intercept_output(read_buf.filled_mut())?; 463 read += size; 464 if flag { 465 // Return if we find a 0-sized chunk. 466 // Return if we get some data. 467 return Poll::Ready(Ok(read)); 468 } 469 self.io = Some(io); 470 if read != 0 { 471 return Poll::Ready(Ok(read)); 472 } 473 } 474 Poll::Pending => { 475 self.io = Some(io); 476 return Poll::Pending; 477 } 478 Poll::Ready(Err(e)) => { 479 // If IO error occurs, shutdowns `io` before return. 480 io.shutdown(); 481 return Poll::Ready(err_from_io!(BodyDecode, e)); 482 } 483 } 484 } 485 486 Poll::Ready(Ok(read)) 487 } 488 merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>489 fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> { 490 // Here we need to merge the chunks into one data block and return. 491 // The data arrangement in buf is as follows: 492 // 493 // data in buf: 494 // +------+------+------+------+------+------+------+ 495 // | data | len | data | len | ... | data | len | 496 // +------+------+------+------+------+------+------+ 497 // 498 // We need to merge these data blocks into one block: 499 // 500 // after merge: 501 // +---------------------------+ 502 // | data | 503 // +---------------------------+ 504 505 let (chunks, junk) = self 506 .decoder 507 .decode(buf) 508 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 509 510 let mut finished = false; 511 let mut ptrs = Vec::new(); 512 513 for chunk in chunks.into_iter() { 514 if chunk.trailer().is_some() { 515 if chunk.state() == &ChunkState::Finish { 516 finished = true; 517 } 518 } else { 519 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize { 520 finished = true; 521 break; 522 } 523 let data = chunk.data(); 524 ptrs.push((data.as_ptr(), data.len())) 525 } 526 } 527 528 if finished && !junk.is_empty() { 529 return err_from_msg!(BodyDecode, "Invalid chunk body"); 530 } 531 532 let start = buf.as_ptr(); 533 534 let mut idx = 0; 535 for (ptr, len) in ptrs.into_iter() { 536 let st = ptr as usize - start as usize; 537 let ed = st + len; 538 buf.copy_within(st..ed, idx); 539 idx += len; 540 } 541 Ok((idx, finished)) 542 } 543 } 544 545 #[cfg(feature = "ylong_base")] 546 #[cfg(test)] 547 mod ut_async_http_body { 548 use std::sync::Arc; 549 550 use ylong_http::body::async_impl; 551 552 use crate::async_impl::interceptor::IdleInterceptor; 553 use crate::async_impl::HttpBody; 554 use crate::util::normalizer::BodyLength; 555 use crate::ErrorKind; 556 557 /// UT test cases for `HttpBody::trailer`. 558 /// 559 /// # Brief 560 /// 1. Creates a `HttpBody` by calling `HttpBody::new`. 561 /// 2. Calls `trailer` to get headers. 562 /// 3. Checks if the test result is correct. 563 #[test] ut_asnyc_chunk_trailer_1()564 fn ut_asnyc_chunk_trailer_1() { 565 let handle = ylong_runtime::spawn(async move { 566 async_chunk_trailer_1().await; 567 async_chunk_trailer_2().await; 568 }); 569 ylong_runtime::block_on(handle).unwrap(); 570 } 571 async_chunk_trailer_1()572 async fn async_chunk_trailer_1() { 573 let box_stream = Box::new("".as_bytes()); 574 let chunk_body_bytes = "\ 575 5\r\n\ 576 hello\r\n\ 577 C ; type = text ;end = !\r\n\ 578 hello world!\r\n\ 579 000; message = last\r\n\ 580 accept:text/html\r\n\r\n\ 581 "; 582 let mut chunk = HttpBody::new( 583 Arc::new(IdleInterceptor), 584 BodyLength::Chunk, 585 box_stream, 586 chunk_body_bytes.as_bytes(), 587 ) 588 .unwrap(); 589 let res = async_impl::Body::trailer(&mut chunk) 590 .await 591 .unwrap() 592 .unwrap(); 593 assert_eq!( 594 res.get("accept").unwrap().to_string().unwrap(), 595 "text/html".to_string() 596 ); 597 let box_stream = Box::new("".as_bytes()); 598 let chunk_body_no_trailer_bytes = "\ 599 5\r\n\ 600 hello\r\n\ 601 C ; type = text ;end = !\r\n\ 602 hello world!\r\n\ 603 0\r\n\r\n\ 604 "; 605 606 let mut chunk = HttpBody::new( 607 Arc::new(IdleInterceptor), 608 BodyLength::Chunk, 609 box_stream, 610 chunk_body_no_trailer_bytes.as_bytes(), 611 ) 612 .unwrap(); 613 614 let mut buf = [0u8; 32]; 615 // Read body part 616 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 617 assert_eq!(read, 5); 618 assert_eq!(&buf[..read], b"hello"); 619 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 620 assert_eq!(read, 12); 621 assert_eq!(&buf[..read], b"hello world!"); 622 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 623 assert_eq!(read, 0); 624 assert_eq!(&buf[..read], b""); 625 // try read trailer part 626 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 627 assert!(res.is_none()); 628 } 629 async_chunk_trailer_2()630 async fn async_chunk_trailer_2() { 631 let box_stream = Box::new("".as_bytes()); 632 let chunk_body_bytes = "\ 633 5\r\n\ 634 hello\r\n\ 635 C ; type = text ;end = !\r\n\ 636 hello world!\r\n\ 637 000; message = last\r\n\ 638 Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\ 639 "; 640 let mut chunk = HttpBody::new( 641 Arc::new(IdleInterceptor), 642 BodyLength::Chunk, 643 box_stream, 644 chunk_body_bytes.as_bytes(), 645 ) 646 .unwrap(); 647 let res = async_impl::Body::trailer(&mut chunk) 648 .await 649 .unwrap() 650 .unwrap(); 651 assert_eq!( 652 res.get("expires").unwrap().to_string().unwrap(), 653 "Wed, 21 Oct 2015 07:27:00 GMT".to_string() 654 ); 655 } 656 657 /// UT test cases for `Body::data`. 658 /// 659 /// # Brief 660 /// 1. Creates a chunk `HttpBody`. 661 /// 2. Calls `data` method get boxstream. 662 /// 3. Checks if data size is correct. 663 #[test] ut_asnyc_http_body_chunk2()664 fn ut_asnyc_http_body_chunk2() { 665 let handle = ylong_runtime::spawn(async move { 666 http_body_chunk2().await; 667 }); 668 ylong_runtime::block_on(handle).unwrap(); 669 } 670 http_body_chunk2()671 async fn http_body_chunk2() { 672 let box_stream = Box::new( 673 "\ 674 5\r\n\ 675 hello\r\n\ 676 C ; type = text ;end = !\r\n\ 677 hello world!\r\n\ 678 000; message = last\r\n\ 679 accept:text/html\r\n\r\n\ 680 " 681 .as_bytes(), 682 ); 683 let chunk_body_bytes = ""; 684 let mut chunk = HttpBody::new( 685 Arc::new(IdleInterceptor), 686 BodyLength::Chunk, 687 box_stream, 688 chunk_body_bytes.as_bytes(), 689 ) 690 .unwrap(); 691 692 let mut buf = [0u8; 32]; 693 // Read body part 694 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 695 assert_eq!(read, 5); 696 697 let box_stream = Box::new("".as_bytes()); 698 let chunk_body_no_trailer_bytes = "\ 699 5\r\n\ 700 hello\r\n\ 701 C ; type = text ;end = !\r\n\ 702 hello world!\r\n\ 703 0\r\n\r\n\ 704 "; 705 706 let mut chunk = HttpBody::new( 707 Arc::new(IdleInterceptor), 708 BodyLength::Chunk, 709 box_stream, 710 chunk_body_no_trailer_bytes.as_bytes(), 711 ) 712 .unwrap(); 713 714 let mut buf = [0u8; 32]; 715 // Read body part 716 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 717 assert_eq!(read, 5); 718 assert_eq!(&buf[..read], b"hello"); 719 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 720 assert_eq!(read, 12); 721 assert_eq!(&buf[..read], b"hello world!"); 722 let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap(); 723 assert_eq!(read, 0); 724 assert_eq!(&buf[..read], b""); 725 let res = async_impl::Body::trailer(&mut chunk).await.unwrap(); 726 assert!(res.is_none()); 727 } 728 729 /// UT test cases for `Body::data`. 730 /// 731 /// # Brief 732 /// 1. Creates a empty `HttpBody`. 733 /// 2. Calls `HttpBody::new` to create empty http body. 734 /// 3. Checks if http body is empty. 735 #[test] http_body_empty_err()736 fn http_body_empty_err() { 737 let box_stream = Box::new("".as_bytes()); 738 let content_bytes = "hello"; 739 740 match HttpBody::new( 741 Arc::new(IdleInterceptor), 742 BodyLength::Empty, 743 box_stream, 744 content_bytes.as_bytes(), 745 ) { 746 Ok(_) => (), 747 Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request), 748 } 749 } 750 751 /// UT test cases for text `HttpBody::new`. 752 /// 753 /// # Brief 754 /// 1. Creates a text `HttpBody`. 755 /// 2. Calls `HttpBody::new` to create text http body. 756 /// 3. Checks if result is correct. 757 #[test] ut_http_body_text()758 fn ut_http_body_text() { 759 let handle = ylong_runtime::spawn(async move { 760 http_body_text().await; 761 }); 762 ylong_runtime::block_on(handle).unwrap(); 763 } 764 http_body_text()765 async fn http_body_text() { 766 let box_stream = Box::new("hello world".as_bytes()); 767 let content_bytes = ""; 768 769 let mut text = HttpBody::new( 770 Arc::new(IdleInterceptor), 771 BodyLength::Length(11), 772 box_stream, 773 content_bytes.as_bytes(), 774 ) 775 .unwrap(); 776 777 let mut buf = [0u8; 5]; 778 // Read body part 779 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 780 assert_eq!(read, 5); 781 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 782 assert_eq!(read, 5); 783 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 784 assert_eq!(read, 1); 785 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 786 assert_eq!(read, 0); 787 788 let box_stream = Box::new("".as_bytes()); 789 let content_bytes = "hello"; 790 791 let mut text = HttpBody::new( 792 Arc::new(IdleInterceptor), 793 BodyLength::Length(5), 794 box_stream, 795 content_bytes.as_bytes(), 796 ) 797 .unwrap(); 798 799 let mut buf = [0u8; 32]; 800 // Read body part 801 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 802 assert_eq!(read, 5); 803 let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap(); 804 assert_eq!(read, 0); 805 } 806 807 /// UT test cases for until_close `HttpBody::new`. 808 /// 809 /// # Brief 810 /// 1. Creates a until_close `HttpBody`. 811 /// 2. Calls `HttpBody::new` to create until_close http body. 812 /// 3. Checks if result is correct. 813 #[test] ut_http_body_until_close()814 fn ut_http_body_until_close() { 815 let handle = ylong_runtime::spawn(async move { 816 http_body_until_close().await; 817 }); 818 ylong_runtime::block_on(handle).unwrap(); 819 } 820 http_body_until_close()821 async fn http_body_until_close() { 822 let box_stream = Box::new("hello world".as_bytes()); 823 let content_bytes = ""; 824 825 let mut until_close = HttpBody::new( 826 Arc::new(IdleInterceptor), 827 BodyLength::UntilClose, 828 box_stream, 829 content_bytes.as_bytes(), 830 ) 831 .unwrap(); 832 833 let mut buf = [0u8; 5]; 834 // Read body part 835 let read = async_impl::Body::data(&mut until_close, &mut buf) 836 .await 837 .unwrap(); 838 assert_eq!(read, 5); 839 let read = async_impl::Body::data(&mut until_close, &mut buf) 840 .await 841 .unwrap(); 842 assert_eq!(read, 5); 843 let read = async_impl::Body::data(&mut until_close, &mut buf) 844 .await 845 .unwrap(); 846 assert_eq!(read, 1); 847 848 let box_stream = Box::new("".as_bytes()); 849 let content_bytes = "hello"; 850 851 let mut until_close = HttpBody::new( 852 Arc::new(IdleInterceptor), 853 BodyLength::UntilClose, 854 box_stream, 855 content_bytes.as_bytes(), 856 ) 857 .unwrap(); 858 859 let mut buf = [0u8; 5]; 860 // Read body part 861 let read = async_impl::Body::data(&mut until_close, &mut buf) 862 .await 863 .unwrap(); 864 assert_eq!(read, 5); 865 let read = async_impl::Body::data(&mut until_close, &mut buf) 866 .await 867 .unwrap(); 868 assert_eq!(read, 0); 869 } 870 } 871