1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use core::pin::Pin;
15 use core::task::{Context, Poll};
16 use std::future::Future;
17 use std::io::{Cursor, Read};
18 use std::sync::Arc;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::TextBodyDecoder;
22 #[cfg(feature = "http1_1")]
23 use ylong_http::body::{ChunkBodyDecoder, ChunkState};
24 use ylong_http::headers::Headers;
25 
26 use super::conn::StreamData;
27 use crate::async_impl::interceptor::Interceptors;
28 use crate::error::{ErrorKind, HttpClientError};
29 use crate::runtime::{AsyncRead, ReadBuf, Sleep};
30 use crate::util::normalizer::BodyLength;
31 
32 const TRAILER_SIZE: usize = 1024;
33 
34 /// `HttpBody` is the body part of the `Response` returned by `Client::request`.
35 /// `HttpBody` implements `Body` trait, so users can call related methods to get
36 /// body data.
37 ///
38 /// # Examples
39 ///
40 /// ```no_run
41 /// use ylong_http_client::async_impl::{Body, Client, HttpBody, Request};
42 /// use ylong_http_client::HttpClientError;
43 ///
44 /// async fn read_body() -> Result<(), HttpClientError> {
45 ///     let client = Client::new();
46 ///
47 ///     // `HttpBody` is the body part of `response`.
48 ///     let mut response = client
49 ///         .request(Request::builder().body(Body::empty())?)
50 ///         .await?;
51 ///
52 ///     // Users can use `Body::data` to get body data.
53 ///     let mut buf = [0u8; 1024];
54 ///     loop {
55 ///         let size = response.data(&mut buf).await.unwrap();
56 ///         if size == 0 {
57 ///             break;
58 ///         }
59 ///         let _data = &buf[..size];
60 ///         // Deals with the data.
61 ///     }
62 ///     Ok(())
63 /// }
64 /// ```
65 pub struct HttpBody {
66     kind: Kind,
67     sleep: Option<Pin<Box<Sleep>>>,
68 }
69 
70 type BoxStreamData = Box<dyn StreamData + Sync + Send + Unpin>;
71 
72 impl HttpBody {
new( interceptors: Arc<Interceptors>, body_length: BodyLength, io: BoxStreamData, pre: &[u8], ) -> Result<Self, HttpClientError>73     pub(crate) fn new(
74         interceptors: Arc<Interceptors>,
75         body_length: BodyLength,
76         io: BoxStreamData,
77         pre: &[u8],
78     ) -> Result<Self, HttpClientError> {
79         let kind = match body_length {
80             BodyLength::Empty => {
81                 if !pre.is_empty() {
82                     // TODO: Consider the case where BodyLength is empty but pre is not empty.
83                     io.shutdown();
84                     return err_from_msg!(Request, "Body length is 0 but read extra data");
85                 }
86                 Kind::Empty
87             }
88             BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)),
89             BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)),
90 
91             #[cfg(feature = "http1_1")]
92             BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)),
93         };
94         Ok(Self { kind, sleep: None })
95     }
96 
set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>)97     pub(crate) fn set_sleep(&mut self, sleep: Option<Pin<Box<Sleep>>>) {
98         self.sleep = sleep;
99     }
100 }
101 
102 impl Body for HttpBody {
103     type Error = HttpClientError;
104 
poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, Self::Error>>105     fn poll_data(
106         mut self: Pin<&mut Self>,
107         cx: &mut Context<'_>,
108         buf: &mut [u8],
109     ) -> Poll<Result<usize, Self::Error>> {
110         if buf.is_empty() {
111             return Poll::Ready(Ok(0));
112         }
113 
114         if let Some(delay) = self.sleep.as_mut() {
115             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
116                 return Poll::Ready(err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()));
117             }
118         }
119 
120         match self.kind {
121             Kind::Empty => Poll::Ready(Ok(0)),
122             Kind::Text(ref mut text) => text.data(cx, buf),
123             Kind::UntilClose(ref mut until_close) => until_close.data(cx, buf),
124             #[cfg(feature = "http1_1")]
125             Kind::Chunk(ref mut chunk) => chunk.data(cx, buf),
126         }
127     }
128 
poll_trailer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<Headers>, Self::Error>>129     fn poll_trailer(
130         mut self: Pin<&mut Self>,
131         cx: &mut Context<'_>,
132     ) -> Poll<Result<Option<Headers>, Self::Error>> {
133         // Get trailer data from io
134         if let Some(delay) = self.sleep.as_mut() {
135             if let Poll::Ready(()) = Pin::new(delay).poll(cx) {
136                 return Poll::Ready(err_from_msg!(Timeout, "Request timeout"));
137             }
138         }
139 
140         let mut read_buf = [0_u8; TRAILER_SIZE];
141 
142         match self.kind {
143             #[cfg(feature = "http1_1")]
144             Kind::Chunk(ref mut chunk) => {
145                 match chunk.data(cx, &mut read_buf) {
146                     Poll::Ready(Ok(_)) => {}
147                     Poll::Pending => {
148                         return Poll::Pending;
149                     }
150                     Poll::Ready(Err(e)) => {
151                         return Poll::Ready(Err(e));
152                     }
153                 }
154                 Poll::Ready(Ok(chunk.decoder.get_trailer().map_err(|e| {
155                     HttpClientError::from_error(ErrorKind::BodyDecode, e)
156                 })?))
157             }
158             _ => Poll::Ready(Ok(None)),
159         }
160     }
161 }
162 
163 impl Drop for HttpBody {
drop(&mut self)164     fn drop(&mut self) {
165         let io = match self.kind {
166             Kind::Text(ref mut text) => text.io.as_mut(),
167             #[cfg(feature = "http1_1")]
168             Kind::Chunk(ref mut chunk) => chunk.io.as_mut(),
169             Kind::UntilClose(ref mut until_close) => until_close.io.as_mut(),
170             _ => None,
171         };
172         // If response body is not totally read, shutdown io.
173         if let Some(io) = io {
174             io.shutdown()
175         }
176     }
177 }
178 
179 // TODO: `TextBodyDecoder` implementation and `ChunkBodyDecoder` implementation.
180 enum Kind {
181     Empty,
182     Text(Text),
183     #[cfg(feature = "http1_1")]
184     Chunk(Chunk),
185     UntilClose(UntilClose),
186 }
187 
188 struct UntilClose {
189     interceptors: Arc<Interceptors>,
190     pre: Option<Cursor<Vec<u8>>>,
191     io: Option<BoxStreamData>,
192 }
193 
194 impl UntilClose {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self195     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
196         Self {
197             interceptors,
198             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
199             io: Some(io),
200         }
201     }
202 
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>203     fn data(
204         &mut self,
205         cx: &mut Context<'_>,
206         buf: &mut [u8],
207     ) -> Poll<Result<usize, HttpClientError>> {
208         if buf.is_empty() {
209             return Poll::Ready(Ok(0));
210         }
211         let mut read = 0;
212         if let Some(pre) = self.pre.as_mut() {
213             // Here cursor read never failed.
214             let this_read = Read::read(pre, buf).unwrap();
215             if this_read == 0 {
216                 self.pre = None;
217             } else {
218                 read += this_read;
219             }
220         }
221 
222         if !buf[read..].is_empty() {
223             if let Some(io) = self.io.take() {
224                 return self.poll_read_io(cx, io, read, buf);
225             }
226         }
227         Poll::Ready(Ok(read))
228     }
229 
poll_read_io( &mut self, cx: &mut Context<'_>, mut io: BoxStreamData, read: usize, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>230     fn poll_read_io(
231         &mut self,
232         cx: &mut Context<'_>,
233         mut io: BoxStreamData,
234         read: usize,
235         buf: &mut [u8],
236     ) -> Poll<Result<usize, HttpClientError>> {
237         let mut read = read;
238         let mut read_buf = ReadBuf::new(&mut buf[read..]);
239         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
240             // Disconnected.
241             Poll::Ready(Ok(())) => {
242                 let filled = read_buf.filled().len();
243                 if filled == 0 {
244                     io.shutdown();
245                 } else {
246                     self.interceptors
247                         .intercept_output(&buf[read..(read + filled)])?;
248                     self.io = Some(io);
249                 }
250                 read += filled;
251                 Poll::Ready(Ok(read))
252             }
253             Poll::Pending => {
254                 self.io = Some(io);
255                 if read != 0 {
256                     return Poll::Ready(Ok(read));
257                 }
258                 Poll::Pending
259             }
260             Poll::Ready(Err(e)) => {
261                 // If IO error occurs, shutdowns `io` before return.
262                 io.shutdown();
263                 Poll::Ready(err_from_io!(BodyTransfer, e))
264             }
265         }
266     }
267 }
268 
269 struct Text {
270     interceptors: Arc<Interceptors>,
271     decoder: TextBodyDecoder,
272     pre: Option<Cursor<Vec<u8>>>,
273     io: Option<BoxStreamData>,
274 }
275 
276 impl Text {
new( len: u64, pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>, ) -> Self277     pub(crate) fn new(
278         len: u64,
279         pre: &[u8],
280         io: BoxStreamData,
281         interceptors: Arc<Interceptors>,
282     ) -> Self {
283         Self {
284             interceptors,
285             decoder: TextBodyDecoder::new(len),
286             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
287             io: Some(io),
288         }
289     }
290 }
291 
292 impl Text {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>293     fn data(
294         &mut self,
295         cx: &mut Context<'_>,
296         buf: &mut [u8],
297     ) -> Poll<Result<usize, HttpClientError>> {
298         if buf.is_empty() {
299             return Poll::Ready(Ok(0));
300         }
301 
302         let mut read = 0;
303 
304         if let Some(pre) = self.pre.as_mut() {
305             // Here cursor read never failed.
306             let this_read = Read::read(pre, buf).unwrap();
307             if this_read == 0 {
308                 self.pre = None;
309             } else {
310                 read += this_read;
311                 if let Some(result) = self.read_remaining(buf, read) {
312                     return result;
313                 }
314             }
315         }
316 
317         if !buf[read..].is_empty() {
318             if let Some(io) = self.io.take() {
319                 return self.poll_read_io(cx, buf, io, read);
320             }
321         }
322         Poll::Ready(Ok(read))
323     }
324 
read_remaining( &mut self, buf: &mut [u8], read: usize, ) -> Option<Poll<Result<usize, HttpClientError>>>325     fn read_remaining(
326         &mut self,
327         buf: &mut [u8],
328         read: usize,
329     ) -> Option<Poll<Result<usize, HttpClientError>>> {
330         let (text, rem) = self.decoder.decode(&buf[..read]);
331 
332         // Contains redundant `rem`, return error.
333         match (text.is_complete(), rem.is_empty()) {
334             (true, false) => {
335                 if let Some(io) = self.io.take() {
336                     io.shutdown();
337                 };
338                 Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof")))
339             }
340             (true, true) => {
341                 self.io = None;
342                 Some(Poll::Ready(Ok(read)))
343             }
344             // TextBodyDecoder decodes as much as possible here.
345             _ => None,
346         }
347     }
348 
poll_read_io( &mut self, cx: &mut Context<'_>, buf: &mut [u8], mut io: BoxStreamData, read: usize, ) -> Poll<Result<usize, HttpClientError>>349     fn poll_read_io(
350         &mut self,
351         cx: &mut Context<'_>,
352         buf: &mut [u8],
353         mut io: BoxStreamData,
354         read: usize,
355     ) -> Poll<Result<usize, HttpClientError>> {
356         let mut read = read;
357         let mut read_buf = ReadBuf::new(&mut buf[read..]);
358         match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
359             // Disconnected.
360             Poll::Ready(Ok(())) => {
361                 let filled = read_buf.filled().len();
362                 if filled == 0 {
363                     io.shutdown();
364                     return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
365                 }
366                 let (text, rem) = self.decoder.decode(read_buf.filled());
367                 self.interceptors.intercept_output(read_buf.filled())?;
368                 read += filled;
369                 // Contains redundant `rem`, return error.
370                 match (text.is_complete(), rem.is_empty()) {
371                     (true, false) => {
372                         io.shutdown();
373                         Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))
374                     }
375                     (true, true) => Poll::Ready(Ok(read)),
376                     _ => {
377                         self.io = Some(io);
378                         Poll::Ready(Ok(read))
379                     }
380                 }
381             }
382             Poll::Pending => {
383                 self.io = Some(io);
384                 if read != 0 {
385                     return Poll::Ready(Ok(read));
386                 }
387                 Poll::Pending
388             }
389             Poll::Ready(Err(e)) => {
390                 // If IO error occurs, shutdowns `io` before return.
391                 io.shutdown();
392                 Poll::Ready(err_from_io!(BodyDecode, e))
393             }
394         }
395     }
396 }
397 
398 #[cfg(feature = "http1_1")]
399 struct Chunk {
400     interceptors: Arc<Interceptors>,
401     decoder: ChunkBodyDecoder,
402     pre: Option<Cursor<Vec<u8>>>,
403     io: Option<BoxStreamData>,
404 }
405 
406 #[cfg(feature = "http1_1")]
407 impl Chunk {
new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self408     pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc<Interceptors>) -> Self {
409         Self {
410             interceptors,
411             decoder: ChunkBodyDecoder::new().contains_trailer(true),
412             pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())),
413             io: Some(io),
414         }
415     }
416 }
417 
418 #[cfg(feature = "http1_1")]
419 impl Chunk {
data( &mut self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>>420     fn data(
421         &mut self,
422         cx: &mut Context<'_>,
423         buf: &mut [u8],
424     ) -> Poll<Result<usize, HttpClientError>> {
425         if buf.is_empty() {
426             return Poll::Ready(Ok(0));
427         }
428 
429         let mut read = 0;
430 
431         while let Some(pre) = self.pre.as_mut() {
432             // Here cursor read never failed.
433             let size = Read::read(pre, &mut buf[read..]).unwrap();
434             if size == 0 {
435                 self.pre = None;
436             }
437 
438             let (size, flag) = self.merge_chunks(&mut buf[read..read + size])?;
439             read += size;
440 
441             if flag {
442                 // Return if we find a 0-sized chunk.
443                 self.io = None;
444                 return Poll::Ready(Ok(read));
445             } else if read != 0 {
446                 // Return if we get some data.
447                 return Poll::Ready(Ok(read));
448             }
449         }
450 
451         // Here `read` must be 0.
452         while let Some(mut io) = self.io.take() {
453             let mut read_buf = ReadBuf::new(&mut buf[read..]);
454             match Pin::new(&mut io).poll_read(cx, &mut read_buf) {
455                 Poll::Ready(Ok(())) => {
456                     let filled = read_buf.filled().len();
457                     if filled == 0 {
458                         io.shutdown();
459                         return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete"));
460                     }
461                     let (size, flag) = self.merge_chunks(read_buf.filled_mut())?;
462                     self.interceptors.intercept_output(read_buf.filled_mut())?;
463                     read += size;
464                     if flag {
465                         // Return if we find a 0-sized chunk.
466                         // Return if we get some data.
467                         return Poll::Ready(Ok(read));
468                     }
469                     self.io = Some(io);
470                     if read != 0 {
471                         return Poll::Ready(Ok(read));
472                     }
473                 }
474                 Poll::Pending => {
475                     self.io = Some(io);
476                     return Poll::Pending;
477                 }
478                 Poll::Ready(Err(e)) => {
479                     // If IO error occurs, shutdowns `io` before return.
480                     io.shutdown();
481                     return Poll::Ready(err_from_io!(BodyDecode, e));
482                 }
483             }
484         }
485 
486         Poll::Ready(Ok(read))
487     }
488 
merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError>489     fn merge_chunks(&mut self, buf: &mut [u8]) -> Result<(usize, bool), HttpClientError> {
490         // Here we need to merge the chunks into one data block and return.
491         // The data arrangement in buf is as follows:
492         //
493         // data in buf:
494         // +------+------+------+------+------+------+------+
495         // | data | len  | data | len  |  ... | data |  len |
496         // +------+------+------+------+------+------+------+
497         //
498         // We need to merge these data blocks into one block:
499         //
500         // after merge:
501         // +---------------------------+
502         // |            data           |
503         // +---------------------------+
504 
505         let (chunks, junk) = self
506             .decoder
507             .decode(buf)
508             .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
509 
510         let mut finished = false;
511         let mut ptrs = Vec::new();
512 
513         for chunk in chunks.into_iter() {
514             if chunk.trailer().is_some() {
515                 if chunk.state() == &ChunkState::Finish {
516                     finished = true;
517                 }
518             } else {
519                 if chunk.size() == 0 && chunk.state() != &ChunkState::MetaSize {
520                     finished = true;
521                     break;
522                 }
523                 let data = chunk.data();
524                 ptrs.push((data.as_ptr(), data.len()))
525             }
526         }
527 
528         if finished && !junk.is_empty() {
529             return err_from_msg!(BodyDecode, "Invalid chunk body");
530         }
531 
532         let start = buf.as_ptr();
533 
534         let mut idx = 0;
535         for (ptr, len) in ptrs.into_iter() {
536             let st = ptr as usize - start as usize;
537             let ed = st + len;
538             buf.copy_within(st..ed, idx);
539             idx += len;
540         }
541         Ok((idx, finished))
542     }
543 }
544 
545 #[cfg(feature = "ylong_base")]
546 #[cfg(test)]
547 mod ut_async_http_body {
548     use std::sync::Arc;
549 
550     use ylong_http::body::async_impl;
551 
552     use crate::async_impl::interceptor::IdleInterceptor;
553     use crate::async_impl::HttpBody;
554     use crate::util::normalizer::BodyLength;
555     use crate::ErrorKind;
556 
557     /// UT test cases for `HttpBody::trailer`.
558     ///
559     /// # Brief
560     /// 1. Creates a `HttpBody` by calling `HttpBody::new`.
561     /// 2. Calls `trailer` to get headers.
562     /// 3. Checks if the test result is correct.
563     #[test]
ut_asnyc_chunk_trailer_1()564     fn ut_asnyc_chunk_trailer_1() {
565         let handle = ylong_runtime::spawn(async move {
566             async_chunk_trailer_1().await;
567             async_chunk_trailer_2().await;
568         });
569         ylong_runtime::block_on(handle).unwrap();
570     }
571 
async_chunk_trailer_1()572     async fn async_chunk_trailer_1() {
573         let box_stream = Box::new("".as_bytes());
574         let chunk_body_bytes = "\
575             5\r\n\
576             hello\r\n\
577             C ; type = text ;end = !\r\n\
578             hello world!\r\n\
579             000; message = last\r\n\
580             accept:text/html\r\n\r\n\
581             ";
582         let mut chunk = HttpBody::new(
583             Arc::new(IdleInterceptor),
584             BodyLength::Chunk,
585             box_stream,
586             chunk_body_bytes.as_bytes(),
587         )
588         .unwrap();
589         let res = async_impl::Body::trailer(&mut chunk)
590             .await
591             .unwrap()
592             .unwrap();
593         assert_eq!(
594             res.get("accept").unwrap().to_string().unwrap(),
595             "text/html".to_string()
596         );
597         let box_stream = Box::new("".as_bytes());
598         let chunk_body_no_trailer_bytes = "\
599             5\r\n\
600             hello\r\n\
601             C ; type = text ;end = !\r\n\
602             hello world!\r\n\
603             0\r\n\r\n\
604             ";
605 
606         let mut chunk = HttpBody::new(
607             Arc::new(IdleInterceptor),
608             BodyLength::Chunk,
609             box_stream,
610             chunk_body_no_trailer_bytes.as_bytes(),
611         )
612         .unwrap();
613 
614         let mut buf = [0u8; 32];
615         // Read body part
616         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
617         assert_eq!(read, 5);
618         assert_eq!(&buf[..read], b"hello");
619         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
620         assert_eq!(read, 12);
621         assert_eq!(&buf[..read], b"hello world!");
622         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
623         assert_eq!(read, 0);
624         assert_eq!(&buf[..read], b"");
625         // try read trailer part
626         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
627         assert!(res.is_none());
628     }
629 
async_chunk_trailer_2()630     async fn async_chunk_trailer_2() {
631         let box_stream = Box::new("".as_bytes());
632         let chunk_body_bytes = "\
633             5\r\n\
634             hello\r\n\
635             C ; type = text ;end = !\r\n\
636             hello world!\r\n\
637             000; message = last\r\n\
638             Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\
639             ";
640         let mut chunk = HttpBody::new(
641             Arc::new(IdleInterceptor),
642             BodyLength::Chunk,
643             box_stream,
644             chunk_body_bytes.as_bytes(),
645         )
646         .unwrap();
647         let res = async_impl::Body::trailer(&mut chunk)
648             .await
649             .unwrap()
650             .unwrap();
651         assert_eq!(
652             res.get("expires").unwrap().to_string().unwrap(),
653             "Wed, 21 Oct 2015 07:27:00 GMT".to_string()
654         );
655     }
656 
657     /// UT test cases for `Body::data`.
658     ///
659     /// # Brief
660     /// 1. Creates a chunk `HttpBody`.
661     /// 2. Calls `data` method get boxstream.
662     /// 3. Checks if data size is correct.
663     #[test]
ut_asnyc_http_body_chunk2()664     fn ut_asnyc_http_body_chunk2() {
665         let handle = ylong_runtime::spawn(async move {
666             http_body_chunk2().await;
667         });
668         ylong_runtime::block_on(handle).unwrap();
669     }
670 
http_body_chunk2()671     async fn http_body_chunk2() {
672         let box_stream = Box::new(
673             "\
674             5\r\n\
675             hello\r\n\
676             C ; type = text ;end = !\r\n\
677             hello world!\r\n\
678             000; message = last\r\n\
679             accept:text/html\r\n\r\n\
680         "
681             .as_bytes(),
682         );
683         let chunk_body_bytes = "";
684         let mut chunk = HttpBody::new(
685             Arc::new(IdleInterceptor),
686             BodyLength::Chunk,
687             box_stream,
688             chunk_body_bytes.as_bytes(),
689         )
690         .unwrap();
691 
692         let mut buf = [0u8; 32];
693         // Read body part
694         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
695         assert_eq!(read, 5);
696 
697         let box_stream = Box::new("".as_bytes());
698         let chunk_body_no_trailer_bytes = "\
699             5\r\n\
700             hello\r\n\
701             C ; type = text ;end = !\r\n\
702             hello world!\r\n\
703             0\r\n\r\n\
704             ";
705 
706         let mut chunk = HttpBody::new(
707             Arc::new(IdleInterceptor),
708             BodyLength::Chunk,
709             box_stream,
710             chunk_body_no_trailer_bytes.as_bytes(),
711         )
712         .unwrap();
713 
714         let mut buf = [0u8; 32];
715         // Read body part
716         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
717         assert_eq!(read, 5);
718         assert_eq!(&buf[..read], b"hello");
719         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
720         assert_eq!(read, 12);
721         assert_eq!(&buf[..read], b"hello world!");
722         let read = async_impl::Body::data(&mut chunk, &mut buf).await.unwrap();
723         assert_eq!(read, 0);
724         assert_eq!(&buf[..read], b"");
725         let res = async_impl::Body::trailer(&mut chunk).await.unwrap();
726         assert!(res.is_none());
727     }
728 
729     /// UT test cases for `Body::data`.
730     ///
731     /// # Brief
732     /// 1. Creates a empty `HttpBody`.
733     /// 2. Calls `HttpBody::new` to create empty http body.
734     /// 3. Checks if http body is empty.
735     #[test]
http_body_empty_err()736     fn http_body_empty_err() {
737         let box_stream = Box::new("".as_bytes());
738         let content_bytes = "hello";
739 
740         match HttpBody::new(
741             Arc::new(IdleInterceptor),
742             BodyLength::Empty,
743             box_stream,
744             content_bytes.as_bytes(),
745         ) {
746             Ok(_) => (),
747             Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request),
748         }
749     }
750 
751     /// UT test cases for text `HttpBody::new`.
752     ///
753     /// # Brief
754     /// 1. Creates a text `HttpBody`.
755     /// 2. Calls `HttpBody::new` to create text http body.
756     /// 3. Checks if result is correct.
757     #[test]
ut_http_body_text()758     fn ut_http_body_text() {
759         let handle = ylong_runtime::spawn(async move {
760             http_body_text().await;
761         });
762         ylong_runtime::block_on(handle).unwrap();
763     }
764 
http_body_text()765     async fn http_body_text() {
766         let box_stream = Box::new("hello world".as_bytes());
767         let content_bytes = "";
768 
769         let mut text = HttpBody::new(
770             Arc::new(IdleInterceptor),
771             BodyLength::Length(11),
772             box_stream,
773             content_bytes.as_bytes(),
774         )
775         .unwrap();
776 
777         let mut buf = [0u8; 5];
778         // Read body part
779         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
780         assert_eq!(read, 5);
781         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
782         assert_eq!(read, 5);
783         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
784         assert_eq!(read, 1);
785         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
786         assert_eq!(read, 0);
787 
788         let box_stream = Box::new("".as_bytes());
789         let content_bytes = "hello";
790 
791         let mut text = HttpBody::new(
792             Arc::new(IdleInterceptor),
793             BodyLength::Length(5),
794             box_stream,
795             content_bytes.as_bytes(),
796         )
797         .unwrap();
798 
799         let mut buf = [0u8; 32];
800         // Read body part
801         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
802         assert_eq!(read, 5);
803         let read = async_impl::Body::data(&mut text, &mut buf).await.unwrap();
804         assert_eq!(read, 0);
805     }
806 
807     /// UT test cases for until_close `HttpBody::new`.
808     ///
809     /// # Brief
810     /// 1. Creates a until_close `HttpBody`.
811     /// 2. Calls `HttpBody::new` to create until_close http body.
812     /// 3. Checks if result is correct.
813     #[test]
ut_http_body_until_close()814     fn ut_http_body_until_close() {
815         let handle = ylong_runtime::spawn(async move {
816             http_body_until_close().await;
817         });
818         ylong_runtime::block_on(handle).unwrap();
819     }
820 
http_body_until_close()821     async fn http_body_until_close() {
822         let box_stream = Box::new("hello world".as_bytes());
823         let content_bytes = "";
824 
825         let mut until_close = HttpBody::new(
826             Arc::new(IdleInterceptor),
827             BodyLength::UntilClose,
828             box_stream,
829             content_bytes.as_bytes(),
830         )
831         .unwrap();
832 
833         let mut buf = [0u8; 5];
834         // Read body part
835         let read = async_impl::Body::data(&mut until_close, &mut buf)
836             .await
837             .unwrap();
838         assert_eq!(read, 5);
839         let read = async_impl::Body::data(&mut until_close, &mut buf)
840             .await
841             .unwrap();
842         assert_eq!(read, 5);
843         let read = async_impl::Body::data(&mut until_close, &mut buf)
844             .await
845             .unwrap();
846         assert_eq!(read, 1);
847 
848         let box_stream = Box::new("".as_bytes());
849         let content_bytes = "hello";
850 
851         let mut until_close = HttpBody::new(
852             Arc::new(IdleInterceptor),
853             BodyLength::UntilClose,
854             box_stream,
855             content_bytes.as_bytes(),
856         )
857         .unwrap();
858 
859         let mut buf = [0u8; 5];
860         // Read body part
861         let read = async_impl::Body::data(&mut until_close, &mut buf)
862             .await
863             .unwrap();
864         assert_eq!(read, 5);
865         let read = async_impl::Body::data(&mut until_close, &mut buf)
866             .await
867             .unwrap();
868         assert_eq!(read, 0);
869     }
870 }
871