1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::error::Error;
15 use std::fmt::{Debug, Display, Formatter};
16 use std::time::{Duration, Instant};
17 
18 use super::Body;
19 use crate::error::HttpClientError;
20 use crate::util::Timeout;
21 use crate::ErrorKind;
22 
23 /// A reader used to read all the body data to a specified location and provide
24 /// echo function.
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use ylong_http_client::sync_impl::{BodyProcessError, BodyProcessor, BodyReader, TextBody};
30 ///
31 /// // Defines a processor, which provides read and echo ability.
32 /// struct Processor {
33 ///     vec: Vec<u8>,
34 ///     echo: usize,
35 /// }
36 ///
37 /// // Implements `BodyProcessor` trait for `&mut Processor` instead of `Processor`
38 /// // if users want to get the result in struct after reading.
39 /// impl BodyProcessor for &mut Processor {
40 ///     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> {
41 ///         self.vec.extend_from_slice(data);
42 ///         Ok(())
43 ///     }
44 ///
45 ///     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> {
46 ///         self.echo += 1;
47 ///         Ok(())
48 ///     }
49 /// }
50 ///
51 /// let mut body = TextBody::from_bytes(b"HelloWorld");
52 /// let mut processor = Processor {
53 ///     vec: Vec::new(),
54 ///     echo: 0,
55 /// };
56 /// let _ = BodyReader::new(&mut processor).read_all(&mut body);
57 ///
58 /// // All data is read.
59 /// assert_eq!(processor.vec, b"HelloWorld");
60 /// // It will be echoed multiple times during the reading process.
61 /// assert_ne!(processor.echo, 0);
62 /// ```
63 pub struct BodyReader<T: BodyProcessor> {
64     pub(crate) read_timeout: Timeout,
65     pub(crate) processor: T,
66 }
67 
68 impl<T: BodyProcessor> BodyReader<T> {
69     /// Creates a new `BodyReader` with the given `Processor`.
70     ///
71     /// # Examples
72     ///
73     /// ```
74     /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor};
75     ///
76     /// let reader = BodyReader::new(DefaultBodyProcessor::new());
77     /// ```
new(processor: T) -> Self78     pub fn new(processor: T) -> Self {
79         Self {
80             read_timeout: Timeout::none(),
81             processor,
82         }
83     }
84 
85     /// Sets body read timeout.
86     ///
87     /// # Examples
88     ///
89     /// ```
90     /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor};
91     /// use ylong_http_client::Timeout;
92     ///
93     /// let reader = BodyReader::new(DefaultBodyProcessor::new()).read_timeout(Timeout::none());
94     /// ```
read_timeout(mut self, timeout: Timeout) -> Self95     pub fn read_timeout(mut self, timeout: Timeout) -> Self {
96         self.read_timeout = timeout;
97         self
98     }
99 
100     /// Reads all the body data. During the read process,
101     /// [`BodyProcessor::write`] and [`BodyProcessor::progress`] will be
102     /// called multiple times.
103     ///
104     /// [`BodyProcessor::write`]: BodyProcessor::write
105     /// [`BodyProcessor::progress`]: BodyProcessor::progress
106     ///
107     /// # Examples
108     ///
109     /// ```
110     /// use ylong_http_client::sync_impl::{BodyProcessor, BodyReader, TextBody};
111     ///
112     /// let mut body = TextBody::from_bytes(b"HelloWorld");
113     /// let _ = BodyReader::default().read_all(&mut body);
114     /// ```
read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError>115     pub fn read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError> {
116         // Use buffers up to 16K in size to read body.
117         const TEMP_BUF_SIZE: usize = 16 * 1024;
118 
119         let mut last = Instant::now();
120         let mut buf = [0u8; TEMP_BUF_SIZE];
121         let mut written = 0usize;
122 
123         loop {
124             let read_len = body
125                 .data(&mut buf)
126                 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
127 
128             if read_len == 0 {
129                 self.processor
130                     .progress(written)
131                     .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
132                 break;
133             }
134 
135             self.processor
136                 .write(&buf[..read_len])
137                 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
138 
139             written += read_len;
140 
141             let now = Instant::now();
142             if now.duration_since(last) >= Duration::from_secs(1) {
143                 self.processor
144                     .progress(written)
145                     .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?;
146             }
147             last = now;
148         }
149         Ok(())
150     }
151 }
152 
153 impl Default for BodyReader<DefaultBodyProcessor> {
default() -> Self154     fn default() -> Self {
155         Self::new(DefaultBodyProcessor::new())
156     }
157 }
158 
159 /// The trait defines methods for processing bodies of HTTP messages. Unlike the
160 /// async version, this is for synchronous usage.
161 pub trait BodyProcessor {
162     /// Writes the body data read each time to the specified location.
163     ///
164     /// This method will be called every time a part of the body data is read.
write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>165     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>;
166 
167     /// Informs users how many bytes have been written to the specified location
168     /// at this time. Users can display the progress according to the number of
169     /// bytes written.
progress(&mut self, filled: usize) -> Result<(), BodyProcessError>170     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError>;
171 }
172 
173 /// Error occurs when processing body data.
174 #[derive(Debug)]
175 pub struct BodyProcessError;
176 
177 impl Display for BodyProcessError {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result178     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179         Debug::fmt(self, f)
180     }
181 }
182 
183 impl Error for BodyProcessError {}
184 
185 /// A default body processor that write data to console directly.
186 pub struct DefaultBodyProcessor;
187 
188 impl DefaultBodyProcessor {
189     /// Creates a new `DefaultBodyProcessor`.
190     ///
191     /// # Examples
192     ///
193     /// ```
194     /// use ylong_http_client::sync_impl::DefaultBodyProcessor;
195     ///
196     /// let processor = DefaultBodyProcessor::new();
197     /// ```
new() -> Self198     pub fn new() -> Self {
199         Self
200     }
201 }
202 
203 impl BodyProcessor for DefaultBodyProcessor {
write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>204     fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> {
205         println!("{data:?}");
206         Ok(())
207     }
208 
progress(&mut self, filled: usize) -> Result<(), BodyProcessError>209     fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> {
210         println!("filled: {filled}");
211         Ok(())
212     }
213 }
214 
215 impl Default for DefaultBodyProcessor {
default() -> Self216     fn default() -> Self {
217         Self::new()
218     }
219 }
220 
221 #[cfg(test)]
222 mod ut_syn_reader {
223     use ylong_http::body::TextBody;
224 
225     use crate::sync_impl::{BodyReader, DefaultBodyProcessor};
226     use crate::util::Timeout;
227 
228     /// UT test cases for `BodyReader::read_timeout`.
229     ///
230     /// # Brief
231     /// 1. Creates a `BodyReader` with `DefaultBodyProcessor::default` by
232     ///    calling `BodyReader::new`.
233     /// 2. Calls `read_timeout`.
234     /// 3. Checks if the result is correct.
235     #[test]
ut_body_reader_read_timeout()236     fn ut_body_reader_read_timeout() {
237         let reader = BodyReader::new(DefaultBodyProcessor).read_timeout(Timeout::none());
238         assert_eq!(reader.read_timeout, Timeout::none());
239     }
240 
241     /// UT test cases for `BodyReader::read_all`.
242     ///
243     /// # Brief
244     /// 1. Creates a `BodyReader` by calling `BodyReader::default`.
245     /// 2. Creates a `TextBody`.
246     /// 3. Calls `read_all` method.
247     /// 4. Checks if the result is corrent.
248     #[test]
ut_body_reader_read_all()249     fn ut_body_reader_read_all() {
250         let mut body = TextBody::from_bytes(b"HelloWorld");
251         let res = BodyReader::default().read_all(&mut body);
252         assert!(res.is_ok());
253     }
254 }
255