1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::error::Error; 15 use std::fmt::{Debug, Display, Formatter}; 16 use std::time::{Duration, Instant}; 17 18 use super::Body; 19 use crate::error::HttpClientError; 20 use crate::util::Timeout; 21 use crate::ErrorKind; 22 23 /// A reader used to read all the body data to a specified location and provide 24 /// echo function. 25 /// 26 /// # Examples 27 /// 28 /// ``` 29 /// use ylong_http_client::sync_impl::{BodyProcessError, BodyProcessor, BodyReader, TextBody}; 30 /// 31 /// // Defines a processor, which provides read and echo ability. 32 /// struct Processor { 33 /// vec: Vec<u8>, 34 /// echo: usize, 35 /// } 36 /// 37 /// // Implements `BodyProcessor` trait for `&mut Processor` instead of `Processor` 38 /// // if users want to get the result in struct after reading. 39 /// impl BodyProcessor for &mut Processor { 40 /// fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> { 41 /// self.vec.extend_from_slice(data); 42 /// Ok(()) 43 /// } 44 /// 45 /// fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> { 46 /// self.echo += 1; 47 /// Ok(()) 48 /// } 49 /// } 50 /// 51 /// let mut body = TextBody::from_bytes(b"HelloWorld"); 52 /// let mut processor = Processor { 53 /// vec: Vec::new(), 54 /// echo: 0, 55 /// }; 56 /// let _ = BodyReader::new(&mut processor).read_all(&mut body); 57 /// 58 /// // All data is read. 59 /// assert_eq!(processor.vec, b"HelloWorld"); 60 /// // It will be echoed multiple times during the reading process. 61 /// assert_ne!(processor.echo, 0); 62 /// ``` 63 pub struct BodyReader<T: BodyProcessor> { 64 pub(crate) read_timeout: Timeout, 65 pub(crate) processor: T, 66 } 67 68 impl<T: BodyProcessor> BodyReader<T> { 69 /// Creates a new `BodyReader` with the given `Processor`. 70 /// 71 /// # Examples 72 /// 73 /// ``` 74 /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor}; 75 /// 76 /// let reader = BodyReader::new(DefaultBodyProcessor::new()); 77 /// ``` new(processor: T) -> Self78 pub fn new(processor: T) -> Self { 79 Self { 80 read_timeout: Timeout::none(), 81 processor, 82 } 83 } 84 85 /// Sets body read timeout. 86 /// 87 /// # Examples 88 /// 89 /// ``` 90 /// use ylong_http_client::sync_impl::{BodyReader, DefaultBodyProcessor}; 91 /// use ylong_http_client::Timeout; 92 /// 93 /// let reader = BodyReader::new(DefaultBodyProcessor::new()).read_timeout(Timeout::none()); 94 /// ``` read_timeout(mut self, timeout: Timeout) -> Self95 pub fn read_timeout(mut self, timeout: Timeout) -> Self { 96 self.read_timeout = timeout; 97 self 98 } 99 100 /// Reads all the body data. During the read process, 101 /// [`BodyProcessor::write`] and [`BodyProcessor::progress`] will be 102 /// called multiple times. 103 /// 104 /// [`BodyProcessor::write`]: BodyProcessor::write 105 /// [`BodyProcessor::progress`]: BodyProcessor::progress 106 /// 107 /// # Examples 108 /// 109 /// ``` 110 /// use ylong_http_client::sync_impl::{BodyProcessor, BodyReader, TextBody}; 111 /// 112 /// let mut body = TextBody::from_bytes(b"HelloWorld"); 113 /// let _ = BodyReader::default().read_all(&mut body); 114 /// ``` read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError>115 pub fn read_all<B: Body>(&mut self, body: &mut B) -> Result<(), HttpClientError> { 116 // Use buffers up to 16K in size to read body. 117 const TEMP_BUF_SIZE: usize = 16 * 1024; 118 119 let mut last = Instant::now(); 120 let mut buf = [0u8; TEMP_BUF_SIZE]; 121 let mut written = 0usize; 122 123 loop { 124 let read_len = body 125 .data(&mut buf) 126 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 127 128 if read_len == 0 { 129 self.processor 130 .progress(written) 131 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 132 break; 133 } 134 135 self.processor 136 .write(&buf[..read_len]) 137 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 138 139 written += read_len; 140 141 let now = Instant::now(); 142 if now.duration_since(last) >= Duration::from_secs(1) { 143 self.processor 144 .progress(written) 145 .map_err(|e| HttpClientError::from_error(ErrorKind::BodyDecode, e))?; 146 } 147 last = now; 148 } 149 Ok(()) 150 } 151 } 152 153 impl Default for BodyReader<DefaultBodyProcessor> { default() -> Self154 fn default() -> Self { 155 Self::new(DefaultBodyProcessor::new()) 156 } 157 } 158 159 /// The trait defines methods for processing bodies of HTTP messages. Unlike the 160 /// async version, this is for synchronous usage. 161 pub trait BodyProcessor { 162 /// Writes the body data read each time to the specified location. 163 /// 164 /// This method will be called every time a part of the body data is read. write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>165 fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>; 166 167 /// Informs users how many bytes have been written to the specified location 168 /// at this time. Users can display the progress according to the number of 169 /// bytes written. progress(&mut self, filled: usize) -> Result<(), BodyProcessError>170 fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError>; 171 } 172 173 /// Error occurs when processing body data. 174 #[derive(Debug)] 175 pub struct BodyProcessError; 176 177 impl Display for BodyProcessError { fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 179 Debug::fmt(self, f) 180 } 181 } 182 183 impl Error for BodyProcessError {} 184 185 /// A default body processor that write data to console directly. 186 pub struct DefaultBodyProcessor; 187 188 impl DefaultBodyProcessor { 189 /// Creates a new `DefaultBodyProcessor`. 190 /// 191 /// # Examples 192 /// 193 /// ``` 194 /// use ylong_http_client::sync_impl::DefaultBodyProcessor; 195 /// 196 /// let processor = DefaultBodyProcessor::new(); 197 /// ``` new() -> Self198 pub fn new() -> Self { 199 Self 200 } 201 } 202 203 impl BodyProcessor for DefaultBodyProcessor { write(&mut self, data: &[u8]) -> Result<(), BodyProcessError>204 fn write(&mut self, data: &[u8]) -> Result<(), BodyProcessError> { 205 println!("{data:?}"); 206 Ok(()) 207 } 208 progress(&mut self, filled: usize) -> Result<(), BodyProcessError>209 fn progress(&mut self, filled: usize) -> Result<(), BodyProcessError> { 210 println!("filled: {filled}"); 211 Ok(()) 212 } 213 } 214 215 impl Default for DefaultBodyProcessor { default() -> Self216 fn default() -> Self { 217 Self::new() 218 } 219 } 220 221 #[cfg(test)] 222 mod ut_syn_reader { 223 use ylong_http::body::TextBody; 224 225 use crate::sync_impl::{BodyReader, DefaultBodyProcessor}; 226 use crate::util::Timeout; 227 228 /// UT test cases for `BodyReader::read_timeout`. 229 /// 230 /// # Brief 231 /// 1. Creates a `BodyReader` with `DefaultBodyProcessor::default` by 232 /// calling `BodyReader::new`. 233 /// 2. Calls `read_timeout`. 234 /// 3. Checks if the result is correct. 235 #[test] ut_body_reader_read_timeout()236 fn ut_body_reader_read_timeout() { 237 let reader = BodyReader::new(DefaultBodyProcessor).read_timeout(Timeout::none()); 238 assert_eq!(reader.read_timeout, Timeout::none()); 239 } 240 241 /// UT test cases for `BodyReader::read_all`. 242 /// 243 /// # Brief 244 /// 1. Creates a `BodyReader` by calling `BodyReader::default`. 245 /// 2. Creates a `TextBody`. 246 /// 3. Calls `read_all` method. 247 /// 4. Checks if the result is corrent. 248 #[test] ut_body_reader_read_all()249 fn ut_body_reader_read_all() { 250 let mut body = TextBody::from_bytes(b"HelloWorld"); 251 let res = BodyReader::default().read_all(&mut body); 252 assert!(res.is_ok()); 253 } 254 } 255