1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod builder; 15 mod operator; 16 17 use std::time::Instant; 18 19 pub use builder::DownloaderBuilder; 20 use builder::WantsBody; 21 use operator::Console; 22 pub use operator::DownloadOperator; 23 24 use crate::async_impl::Response; 25 use crate::error::HttpClientError; 26 use crate::util::{SpeedLimit, Timeout}; 27 28 /// A downloader that can help you download the response body. 29 /// 30 /// A `Downloader` provides a template method for downloading the body and 31 /// needs to use a structure that implements [`DownloadOperator`] trait to read 32 /// the body. 33 /// 34 /// The `DownloadOperator` trait provides two kinds of methods - [`download`] 35 /// and [`progress`], where: 36 /// 37 /// - `download` methods are responsible for reading and copying the body to 38 /// certain places. 39 /// 40 /// - `progress` methods are responsible for progress display. 41 /// 42 /// You only need to provide a structure that implements the `DownloadOperator` 43 /// trait to complete the download process. 44 /// 45 /// A default structure `Console` which implements `DownloadOperator` is 46 /// provided to show download message on console. You can use 47 /// `Downloader::console` to build a `Downloader` which based on it. 48 /// 49 /// [`DownloadOperator`]: DownloadOperator 50 /// [`download`]: DownloadOperator::download 51 /// [`progress`]: DownloadOperator::progress 52 /// 53 /// # Examples 54 /// 55 /// `Console`: 56 /// ```no_run 57 /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response}; 58 /// 59 /// # async fn download_and_show_progress_on_console(response: Response) { 60 /// // Creates a default `Downloader` that show progress on console. 61 /// let mut downloader = Downloader::console(response); 62 /// let _ = downloader.download().await; 63 /// # } 64 /// ``` 65 /// 66 /// `Custom`: 67 /// ```no_run 68 /// # use std::pin::Pin; 69 /// # use std::task::{Context, Poll}; 70 /// # use ylong_http_client::async_impl::{Downloader, DownloadOperator, HttpBody, Response}; 71 /// # use ylong_http_client::{HttpClientError, SpeedLimit, Timeout}; 72 /// 73 /// # async fn download_and_show_progress(response: Response) { 74 /// // Customizes your own `DownloadOperator`. 75 /// struct MyDownloadOperator; 76 /// 77 /// impl DownloadOperator for MyDownloadOperator { 78 /// fn poll_download( 79 /// self: Pin<&mut Self>, 80 /// cx: &mut Context<'_>, 81 /// data: &[u8], 82 /// ) -> Poll<Result<usize, HttpClientError>> { 83 /// todo!() 84 /// } 85 /// 86 /// fn poll_progress( 87 /// self: Pin<&mut Self>, 88 /// cx: &mut Context<'_>, 89 /// downloaded: u64, 90 /// total: Option<u64>, 91 /// ) -> Poll<Result<(), HttpClientError>> { 92 /// // Writes your customize method. 93 /// todo!() 94 /// } 95 /// } 96 /// 97 /// // Creates a default `Downloader` based on `MyDownloadOperator`. 98 /// // Configures your downloader by using `DownloaderBuilder`. 99 /// let mut downloader = Downloader::builder() 100 /// .body(response) 101 /// .operator(MyDownloadOperator) 102 /// .timeout(Timeout::none()) 103 /// .speed_limit(SpeedLimit::none()) 104 /// .build(); 105 /// let _ = downloader.download().await; 106 /// # } 107 /// ``` 108 pub struct Downloader<T> { 109 operator: T, 110 body: Response, 111 config: DownloadConfig, 112 info: Option<DownloadInfo>, 113 } 114 115 impl Downloader<()> { 116 /// Creates a `Downloader` that based on a default `DownloadOperator` which 117 /// show progress on console. 118 /// 119 /// # Examples 120 /// 121 /// ```no_run 122 /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response}; 123 /// 124 /// # async fn download_and_show_progress_on_console(response: Response) { 125 /// // Creates a default `Downloader` that show progress on console. 126 /// let mut downloader = Downloader::console(response); 127 /// let _ = downloader.download().await; 128 /// # } 129 /// ``` console(response: Response) -> Downloader<Console>130 pub fn console(response: Response) -> Downloader<Console> { 131 Self::builder().body(response).console().build() 132 } 133 134 /// Creates a `DownloaderBuilder` and configures downloader step by step. 135 /// 136 /// # Examples 137 /// 138 /// ``` 139 /// # use ylong_http_client::async_impl::Downloader; 140 /// 141 /// let builder = Downloader::builder(); 142 /// ``` builder() -> DownloaderBuilder<WantsBody>143 pub fn builder() -> DownloaderBuilder<WantsBody> { 144 DownloaderBuilder::new() 145 } 146 } 147 148 impl<T: DownloadOperator + Unpin> Downloader<T> { 149 /// Starts downloading that uses this `Downloader`'s configurations. 150 /// 151 /// The download and progress methods of the `DownloadOperator` will be 152 /// called multiple times until the download is complete. 153 /// 154 /// # Examples 155 /// 156 /// ``` 157 /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response}; 158 /// 159 /// # async fn download_response_body(response: Response) { 160 /// let mut downloader = Downloader::console(response); 161 /// let _result = downloader.download().await; 162 /// # } 163 /// ``` download(&mut self) -> Result<(), HttpClientError>164 pub async fn download(&mut self) -> Result<(), HttpClientError> { 165 // Construct new download info, or reuse previous info. 166 if self.info.is_none() { 167 let content_length = self 168 .body 169 .headers() 170 .get("Content") 171 .and_then(|v| v.to_string().ok()) 172 .and_then(|v| v.parse::<u64>().ok()); 173 self.info = Some(DownloadInfo::new(content_length)); 174 } 175 self.limited_download().await 176 } 177 178 // Downloads response body with speed limitation. 179 // TODO: Speed Limit. limited_download(&mut self) -> Result<(), HttpClientError>180 async fn limited_download(&mut self) -> Result<(), HttpClientError> { 181 self.show_progress().await?; 182 self.check_timeout()?; 183 184 let mut buf = [0; 16 * 1024]; 185 186 loop { 187 let data_size = match self.body.data(&mut buf).await? { 188 0 => { 189 self.show_progress().await?; 190 return Ok(()); 191 } 192 size => size, 193 }; 194 195 let data = &buf[..data_size]; 196 let mut size = 0; 197 while size != data.len() { 198 self.check_timeout()?; 199 size += self.operator.download(&data[size..]).await?; 200 self.info.as_mut().unwrap().downloaded_bytes += data.len() as u64; 201 self.show_progress().await?; 202 } 203 } 204 } 205 check_timeout(&mut self) -> Result<(), HttpClientError>206 fn check_timeout(&mut self) -> Result<(), HttpClientError> { 207 if let Some(timeout) = self.config.timeout.inner() { 208 let now = Instant::now(); 209 if now.duration_since(self.info.as_mut().unwrap().start_time) >= timeout { 210 return err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into()); 211 } 212 } 213 Ok(()) 214 } 215 show_progress(&mut self) -> Result<(), HttpClientError>216 async fn show_progress(&mut self) -> Result<(), HttpClientError> { 217 let info = self.info.as_mut().unwrap(); 218 self.operator 219 .progress(info.downloaded_bytes, info.total_bytes) 220 .await 221 } 222 } 223 224 struct DownloadInfo { 225 pub(crate) start_time: Instant, 226 pub(crate) downloaded_bytes: u64, 227 pub(crate) total_bytes: Option<u64>, 228 } 229 230 impl DownloadInfo { new(total_bytes: Option<u64>) -> Self231 fn new(total_bytes: Option<u64>) -> Self { 232 Self { 233 start_time: Instant::now(), 234 downloaded_bytes: 0, 235 total_bytes, 236 } 237 } 238 } 239 240 struct DownloadConfig { 241 pub(crate) timeout: Timeout, 242 pub(crate) speed_limit: SpeedLimit, 243 } 244 245 impl Default for DownloadConfig { default() -> Self246 fn default() -> Self { 247 Self { 248 timeout: Timeout::none(), 249 speed_limit: SpeedLimit::none(), 250 } 251 } 252 } 253 254 #[cfg(all(test, feature = "ylong_base"))] 255 mod ut_downloader { 256 use std::sync::Arc; 257 258 use ylong_http::h1::ResponseDecoder; 259 use ylong_http::response::Response; 260 261 use crate::async_impl::conn::StreamData; 262 use crate::async_impl::interceptor::IdleInterceptor; 263 use crate::async_impl::{Downloader, HttpBody, Response as adpater_resp}; 264 use crate::util::normalizer::BodyLength; 265 266 impl StreamData for &[u8] { shutdown(&self)267 fn shutdown(&self) { 268 println!("Shutdown") 269 } 270 } 271 272 /// UT test cases for `Downloader::download`. 273 /// 274 /// # Brief 275 /// 1. Creates a `Downloader`. 276 /// 2. Calls `download` method. 277 /// 3. Checks if the result is correct. 278 #[test] ut_download()279 fn ut_download() { 280 let handle = ylong_runtime::spawn(async move { 281 download().await; 282 }); 283 ylong_runtime::block_on(handle).unwrap(); 284 } 285 download()286 async fn download() { 287 let response_str = "HTTP/1.1 304 \r\nAge: \t 270646 \t \t\r\nLocation: \t example3.com:80 \t \t\r\nDate: \t Mon, 19 Dec 2022 01:46:59 GMT \t \t\r\nEtag:\t \"3147526947+gzip\" \t \t\r\n\r\n".as_bytes(); 288 let box_stream = Box::new("".as_bytes()); 289 let chunk_body_bytes = "\ 290 5\r\n\ 291 hello\r\n\ 292 C ; type = text ;end = !\r\n\ 293 hello world!\r\n\ 294 000; message = last\r\n\ 295 \r\n\ 296 "; 297 let chunk = HttpBody::new( 298 Arc::new(IdleInterceptor), 299 BodyLength::Chunk, 300 box_stream, 301 chunk_body_bytes.as_bytes(), 302 ) 303 .unwrap(); 304 let mut decoder = ResponseDecoder::new(); 305 let result = decoder.decode(response_str).unwrap().unwrap(); 306 let response = Response::from_raw_parts(result.0, chunk); 307 let mut downloader = Downloader::console(adpater_resp::new(response)); 308 let res = downloader.download().await; 309 assert!(res.is_ok()); 310 } 311 } 312