1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod builder;
15 mod operator;
16 
17 use std::time::Instant;
18 
19 pub use builder::DownloaderBuilder;
20 use builder::WantsBody;
21 use operator::Console;
22 pub use operator::DownloadOperator;
23 
24 use crate::async_impl::Response;
25 use crate::error::HttpClientError;
26 use crate::util::{SpeedLimit, Timeout};
27 
28 /// A downloader that can help you download the response body.
29 ///
30 /// A `Downloader` provides a template method for downloading the body and
31 /// needs to use a structure that implements [`DownloadOperator`] trait to read
32 /// the body.
33 ///
34 /// The `DownloadOperator` trait provides two kinds of methods - [`download`]
35 /// and [`progress`], where:
36 ///
37 /// - `download` methods are responsible for reading and copying the body to
38 /// certain places.
39 ///
40 /// - `progress` methods are responsible for progress display.
41 ///
42 /// You only need to provide a structure that implements the `DownloadOperator`
43 /// trait to complete the download process.
44 ///
45 /// A default structure `Console` which implements `DownloadOperator` is
46 /// provided to show download message on console. You can use
47 /// `Downloader::console` to build a `Downloader` which based on it.
48 ///
49 /// [`DownloadOperator`]: DownloadOperator
50 /// [`download`]: DownloadOperator::download
51 /// [`progress`]: DownloadOperator::progress
52 ///
53 /// # Examples
54 ///
55 /// `Console`:
56 /// ```no_run
57 /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response};
58 ///
59 /// # async fn download_and_show_progress_on_console(response: Response) {
60 /// // Creates a default `Downloader` that show progress on console.
61 /// let mut downloader = Downloader::console(response);
62 /// let _ = downloader.download().await;
63 /// # }
64 /// ```
65 ///
66 /// `Custom`:
67 /// ```no_run
68 /// # use std::pin::Pin;
69 /// # use std::task::{Context, Poll};
70 /// # use ylong_http_client::async_impl::{Downloader, DownloadOperator, HttpBody, Response};
71 /// # use ylong_http_client::{HttpClientError, SpeedLimit, Timeout};
72 ///
73 /// # async fn download_and_show_progress(response: Response) {
74 /// // Customizes your own `DownloadOperator`.
75 /// struct MyDownloadOperator;
76 ///
77 /// impl DownloadOperator for MyDownloadOperator {
78 ///     fn poll_download(
79 ///         self: Pin<&mut Self>,
80 ///         cx: &mut Context<'_>,
81 ///         data: &[u8],
82 ///     ) -> Poll<Result<usize, HttpClientError>> {
83 ///         todo!()
84 ///     }
85 ///
86 ///     fn poll_progress(
87 ///         self: Pin<&mut Self>,
88 ///         cx: &mut Context<'_>,
89 ///         downloaded: u64,
90 ///         total: Option<u64>,
91 ///     ) -> Poll<Result<(), HttpClientError>> {
92 ///         // Writes your customize method.
93 ///         todo!()
94 ///     }
95 /// }
96 ///
97 /// // Creates a default `Downloader` based on `MyDownloadOperator`.
98 /// // Configures your downloader by using `DownloaderBuilder`.
99 /// let mut downloader = Downloader::builder()
100 ///     .body(response)
101 ///     .operator(MyDownloadOperator)
102 ///     .timeout(Timeout::none())
103 ///     .speed_limit(SpeedLimit::none())
104 ///     .build();
105 /// let _ = downloader.download().await;
106 /// # }
107 /// ```
108 pub struct Downloader<T> {
109     operator: T,
110     body: Response,
111     config: DownloadConfig,
112     info: Option<DownloadInfo>,
113 }
114 
115 impl Downloader<()> {
116     /// Creates a `Downloader` that based on a default `DownloadOperator` which
117     /// show progress on console.
118     ///
119     /// # Examples
120     ///
121     /// ```no_run
122     /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response};
123     ///
124     /// # async fn download_and_show_progress_on_console(response: Response) {
125     /// // Creates a default `Downloader` that show progress on console.
126     /// let mut downloader = Downloader::console(response);
127     /// let _ = downloader.download().await;
128     /// # }
129     /// ```
console(response: Response) -> Downloader<Console>130     pub fn console(response: Response) -> Downloader<Console> {
131         Self::builder().body(response).console().build()
132     }
133 
134     /// Creates a `DownloaderBuilder` and configures downloader step by step.
135     ///
136     /// # Examples
137     ///
138     /// ```
139     /// # use ylong_http_client::async_impl::Downloader;
140     ///
141     /// let builder = Downloader::builder();
142     /// ```
builder() -> DownloaderBuilder<WantsBody>143     pub fn builder() -> DownloaderBuilder<WantsBody> {
144         DownloaderBuilder::new()
145     }
146 }
147 
148 impl<T: DownloadOperator + Unpin> Downloader<T> {
149     /// Starts downloading that uses this `Downloader`'s configurations.
150     ///
151     /// The download and progress methods of the `DownloadOperator` will be
152     /// called multiple times until the download is complete.
153     ///
154     /// # Examples
155     ///
156     /// ```
157     /// # use ylong_http_client::async_impl::{Downloader, HttpBody, Response};
158     ///
159     /// # async fn download_response_body(response: Response) {
160     /// let mut downloader = Downloader::console(response);
161     /// let _result = downloader.download().await;
162     /// # }
163     /// ```
download(&mut self) -> Result<(), HttpClientError>164     pub async fn download(&mut self) -> Result<(), HttpClientError> {
165         // Construct new download info, or reuse previous info.
166         if self.info.is_none() {
167             let content_length = self
168                 .body
169                 .headers()
170                 .get("Content")
171                 .and_then(|v| v.to_string().ok())
172                 .and_then(|v| v.parse::<u64>().ok());
173             self.info = Some(DownloadInfo::new(content_length));
174         }
175         self.limited_download().await
176     }
177 
178     // Downloads response body with speed limitation.
179     // TODO: Speed Limit.
limited_download(&mut self) -> Result<(), HttpClientError>180     async fn limited_download(&mut self) -> Result<(), HttpClientError> {
181         self.show_progress().await?;
182         self.check_timeout()?;
183 
184         let mut buf = [0; 16 * 1024];
185 
186         loop {
187             let data_size = match self.body.data(&mut buf).await? {
188                 0 => {
189                     self.show_progress().await?;
190                     return Ok(());
191                 }
192                 size => size,
193             };
194 
195             let data = &buf[..data_size];
196             let mut size = 0;
197             while size != data.len() {
198                 self.check_timeout()?;
199                 size += self.operator.download(&data[size..]).await?;
200                 self.info.as_mut().unwrap().downloaded_bytes += data.len() as u64;
201                 self.show_progress().await?;
202             }
203         }
204     }
205 
check_timeout(&mut self) -> Result<(), HttpClientError>206     fn check_timeout(&mut self) -> Result<(), HttpClientError> {
207         if let Some(timeout) = self.config.timeout.inner() {
208             let now = Instant::now();
209             if now.duration_since(self.info.as_mut().unwrap().start_time) >= timeout {
210                 return err_from_io!(Timeout, std::io::ErrorKind::TimedOut.into());
211             }
212         }
213         Ok(())
214     }
215 
show_progress(&mut self) -> Result<(), HttpClientError>216     async fn show_progress(&mut self) -> Result<(), HttpClientError> {
217         let info = self.info.as_mut().unwrap();
218         self.operator
219             .progress(info.downloaded_bytes, info.total_bytes)
220             .await
221     }
222 }
223 
224 struct DownloadInfo {
225     pub(crate) start_time: Instant,
226     pub(crate) downloaded_bytes: u64,
227     pub(crate) total_bytes: Option<u64>,
228 }
229 
230 impl DownloadInfo {
new(total_bytes: Option<u64>) -> Self231     fn new(total_bytes: Option<u64>) -> Self {
232         Self {
233             start_time: Instant::now(),
234             downloaded_bytes: 0,
235             total_bytes,
236         }
237     }
238 }
239 
240 struct DownloadConfig {
241     pub(crate) timeout: Timeout,
242     pub(crate) speed_limit: SpeedLimit,
243 }
244 
245 impl Default for DownloadConfig {
default() -> Self246     fn default() -> Self {
247         Self {
248             timeout: Timeout::none(),
249             speed_limit: SpeedLimit::none(),
250         }
251     }
252 }
253 
254 #[cfg(all(test, feature = "ylong_base"))]
255 mod ut_downloader {
256     use std::sync::Arc;
257 
258     use ylong_http::h1::ResponseDecoder;
259     use ylong_http::response::Response;
260 
261     use crate::async_impl::conn::StreamData;
262     use crate::async_impl::interceptor::IdleInterceptor;
263     use crate::async_impl::{Downloader, HttpBody, Response as adpater_resp};
264     use crate::util::normalizer::BodyLength;
265 
266     impl StreamData for &[u8] {
shutdown(&self)267         fn shutdown(&self) {
268             println!("Shutdown")
269         }
270     }
271 
272     /// UT test cases for `Downloader::download`.
273     ///
274     /// # Brief
275     /// 1. Creates a `Downloader`.
276     /// 2. Calls `download` method.
277     /// 3. Checks if the result is correct.
278     #[test]
ut_download()279     fn ut_download() {
280         let handle = ylong_runtime::spawn(async move {
281             download().await;
282         });
283         ylong_runtime::block_on(handle).unwrap();
284     }
285 
download()286     async fn download() {
287         let response_str = "HTTP/1.1 304 \r\nAge: \t 270646 \t \t\r\nLocation: \t example3.com:80 \t \t\r\nDate: \t Mon, 19 Dec 2022 01:46:59 GMT \t \t\r\nEtag:\t \"3147526947+gzip\" \t \t\r\n\r\n".as_bytes();
288         let box_stream = Box::new("".as_bytes());
289         let chunk_body_bytes = "\
290             5\r\n\
291             hello\r\n\
292             C ; type = text ;end = !\r\n\
293             hello world!\r\n\
294             000; message = last\r\n\
295             \r\n\
296             ";
297         let chunk = HttpBody::new(
298             Arc::new(IdleInterceptor),
299             BodyLength::Chunk,
300             box_stream,
301             chunk_body_bytes.as_bytes(),
302         )
303         .unwrap();
304         let mut decoder = ResponseDecoder::new();
305         let result = decoder.decode(response_str).unwrap().unwrap();
306         let response = Response::from_raw_parts(result.0, chunk);
307         let mut downloader = Downloader::console(adpater_resp::new(response));
308         let res = downloader.download().await;
309         assert!(res.is_ok());
310     }
311 }
312