1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod builder;
15 mod operator;
16 
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::task::{Context, Poll};
20 
21 pub use builder::{UploaderBuilder, WantsReader};
22 pub use operator::{Console, UploadOperator};
23 use ylong_http::body::async_impl::ReusableReader;
24 use ylong_http::body::{MultiPart, MultiPartBase};
25 
26 use crate::runtime::{AsyncRead, ReadBuf};
27 
28 /// An uploader that can help you upload the request body.
29 ///
30 /// An `Uploader` provides a template method for uploading a file or a slice and
31 /// needs to use a structure that implements [`UploadOperator`] trait to read
32 /// the file or the slice and convert it into request body.
33 ///
34 /// The `UploadOperator` trait provides a [`progress`] method which is
35 /// responsible for progress display.
36 ///
37 /// You only need to provide a structure that implements the `UploadOperator`
38 /// trait to complete the upload process.
39 ///
40 /// A default structure `Console` which implements `UploadOperator` is
41 /// provided to show download message on console. You can use
42 /// `Uploader::console` to build a `Uploader` which based on it.
43 ///
44 /// [`UploadOperator`]: UploadOperator
45 /// [`progress`]: UploadOperator::progress
46 ///
47 /// # Examples
48 ///
49 /// `Console`:
50 /// ```no_run
51 /// # use ylong_http_client::async_impl::Uploader;
52 ///
53 /// // Creates a default `Uploader` that show progress on console.
54 /// let mut uploader = Uploader::console("HelloWorld".as_bytes());
55 /// ```
56 ///
57 /// `Custom`:
58 /// ```no_run
59 /// # use std::pin::Pin;
60 /// # use std::task::{Context, Poll};
61 /// # use ylong_http_client::async_impl::{Uploader, UploadOperator, Response};
62 /// # use ylong_http_client::{SpeedLimit, Timeout};
63 /// # use ylong_http_client::HttpClientError;
64 ///
65 /// # async fn upload_and_show_progress() {
66 /// // Customizes your own `UploadOperator`.
67 /// struct MyUploadOperator;
68 ///
69 /// impl UploadOperator for MyUploadOperator {
70 ///     fn poll_progress(
71 ///         self: Pin<&mut Self>,
72 ///         cx: &mut Context<'_>,
73 ///         uploaded: u64,
74 ///         total: Option<u64>,
75 ///     ) -> Poll<Result<(), HttpClientError>> {
76 ///         todo!()
77 ///     }
78 /// }
79 ///
80 /// // Creates a default `Uploader` based on `MyUploadOperator`.
81 /// // Configures your uploader by using `UploaderBuilder`.
82 /// let uploader = Uploader::builder()
83 ///     .reader("HelloWorld".as_bytes())
84 ///     .operator(MyUploadOperator)
85 ///     .build();
86 /// # }
87 /// ```
88 pub struct Uploader<R, T> {
89     reader: R,
90     operator: T,
91     config: UploadConfig,
92     info: Option<UploadInfo>,
93 }
94 
95 impl<R: ReusableReader + Unpin> Uploader<R, Console> {
96     /// Creates an `Uploader` with a `Console` operator which displays process
97     /// on console.
98     ///
99     /// # Examples
100     ///
101     /// ```
102     /// # use ylong_http_client::async_impl::Uploader;
103     ///
104     /// let uploader = Uploader::console("HelloWorld".as_bytes());
105     /// ```
console(reader: R) -> Uploader<R, Console>106     pub fn console(reader: R) -> Uploader<R, Console> {
107         UploaderBuilder::new().reader(reader).console().build()
108     }
109 }
110 
111 impl Uploader<(), ()> {
112     /// Creates an `UploaderBuilder` and configures uploader step by step.
113     ///
114     /// # Examples
115     ///
116     /// ```
117     /// # use ylong_http_client::async_impl::Uploader;
118     ///
119     /// let builder = Uploader::builder();
120     /// ```
builder() -> UploaderBuilder<WantsReader>121     pub fn builder() -> UploaderBuilder<WantsReader> {
122         UploaderBuilder::new()
123     }
124 }
125 
126 impl<R, T> AsyncRead for Uploader<R, T>
127 where
128     R: ReusableReader + Unpin,
129     T: UploadOperator + Unpin,
130 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>131     fn poll_read(
132         self: Pin<&mut Self>,
133         cx: &mut Context<'_>,
134         buf: &mut ReadBuf<'_>,
135     ) -> Poll<std::io::Result<()>> {
136         let this = self.get_mut();
137 
138         if this.info.is_none() {
139             this.info = Some(UploadInfo::new());
140         }
141 
142         let info = this.info.as_mut().unwrap();
143 
144         match Pin::new(&mut this.operator).poll_progress(
145             cx,
146             info.uploaded_bytes,
147             this.config.total_bytes,
148         ) {
149             Poll::Ready(Ok(())) => {}
150             // TODO: Consider another way to handle error.
151             Poll::Ready(Err(e)) => {
152                 return Poll::Ready(Err(std::io::Error::new(
153                     std::io::ErrorKind::Other,
154                     Box::new(e),
155                 )))
156             }
157             Poll::Pending => return Poll::Pending,
158         }
159 
160         Pin::new(&mut this.reader).poll_read(cx, buf)
161     }
162 }
163 
164 impl<R, T> ReusableReader for Uploader<R, T>
165 where
166     R: ReusableReader + Unpin,
167     T: UploadOperator + Unpin + Sync,
168 {
reuse<'a>( &'a mut self, ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>> where Self: 'a,169     fn reuse<'a>(
170         &'a mut self,
171     ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>>
172     where
173         Self: 'a,
174     {
175         self.info = None;
176         self.reader.reuse()
177     }
178 }
179 
180 impl<T: UploadOperator + Unpin + Sync> MultiPartBase for Uploader<MultiPart, T> {
multipart(&self) -> &MultiPart181     fn multipart(&self) -> &MultiPart {
182         &self.reader
183     }
184 }
185 
186 #[derive(Default)]
187 struct UploadConfig {
188     total_bytes: Option<u64>,
189 }
190 
191 struct UploadInfo {
192     uploaded_bytes: u64,
193 }
194 
195 impl UploadInfo {
new() -> Self196     fn new() -> Self {
197         Self { uploaded_bytes: 0 }
198     }
199 }
200 
201 #[cfg(all(test, feature = "ylong_base"))]
202 mod ut_uploader {
203     use ylong_http::body::{MultiPart, Part};
204     use ylong_runtime::io::AsyncRead;
205 
206     use crate::async_impl::uploader::{Context, Pin, Poll};
207     use crate::async_impl::{UploadOperator, Uploader, UploaderBuilder};
208     use crate::HttpClientError;
209 
210     /// UT test cases for `UploadOperator::data`.
211     ///
212     /// # Brief
213     /// 1. Creates a `Uploader`.
214     /// 2. Calls `data` method.
215     /// 3. Checks if the result is correct.
216 
217     #[test]
ut_upload()218     fn ut_upload() {
219         let handle = ylong_runtime::spawn(async move {
220             upload().await;
221         });
222         ylong_runtime::block_on(handle).unwrap();
223     }
224 
upload()225     async fn upload() {
226         let mut uploader = Uploader::console("HelloWorld".as_bytes());
227         let mut user_slice = [0_u8; 10];
228         let mut output_vec = vec![];
229 
230         let mut size = user_slice.len();
231         while size == user_slice.len() {
232             let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
233             ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf))
234                 .await
235                 .unwrap();
236             size = buf.filled_len();
237             output_vec.extend_from_slice(&user_slice[..size]);
238         }
239         assert_eq!(&output_vec, b"HelloWorld");
240 
241         let mut user_slice = [0_u8; 12];
242         let multipart = MultiPart::new().part(Part::new().name("name").body("xiaoming"));
243         let mut multi_uploader = UploaderBuilder::default()
244             .multipart(multipart)
245             .console()
246             .build();
247         let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
248         ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut multi_uploader).poll_read(cx, &mut buf))
249             .await
250             .unwrap();
251         let size = buf.filled_len();
252         assert_eq!(size, 12);
253     }
254 
255     /// UT test cases for `UploadOperator::progress`.
256     ///
257     /// # Brief
258     /// 1. Creates a `MyUploadOperator`.
259     /// 2. Calls `progress` method.
260     /// 3. Checks if the result is correct.
261     #[test]
ut_upload_op_cov()262     fn ut_upload_op_cov() {
263         let handle = ylong_runtime::spawn(async move {
264             upload_op_cov().await;
265         });
266         ylong_runtime::block_on(handle).unwrap();
267     }
268 
upload_op_cov()269     async fn upload_op_cov() {
270         struct MyUploadOperator;
271         impl UploadOperator for MyUploadOperator {
272             fn poll_progress(
273                 self: Pin<&mut Self>,
274                 _cx: &mut Context<'_>,
275                 uploaded: u64,
276                 total: Option<u64>,
277             ) -> Poll<Result<(), HttpClientError>> {
278                 if uploaded > total.unwrap() {
279                     return Poll::Ready(err_from_msg!(BodyTransfer, "UploadOperator failed"));
280                 }
281                 Poll::Ready(Ok(()))
282             }
283         }
284         let res = MyUploadOperator.progress(10, Some(20)).await;
285         assert!(res.is_ok());
286     }
287 
288     /// UT test cases for `Uploader::builder`.
289     ///
290     /// # Brief
291     /// 1. Creates a `UploaderBuilder` by `Uploader::builder`.
292     /// 2. Checks if the result is correct.
293 
294     #[test]
ut_uploader_builder()295     fn ut_uploader_builder() {
296         let handle = ylong_runtime::spawn(async { upload_and_show_progress().await });
297         ylong_runtime::block_on(handle).unwrap();
298     }
299 
upload_and_show_progress()300     async fn upload_and_show_progress() {
301         // Customizes your own `UploadOperator`.
302         struct MyUploadOperator;
303 
304         impl UploadOperator for MyUploadOperator {
305             fn poll_progress(
306                 self: Pin<&mut Self>,
307                 _cx: &mut Context<'_>,
308                 _uploaded: u64,
309                 _total: Option<u64>,
310             ) -> Poll<Result<(), HttpClientError>> {
311                 Poll::Ready(Err(HttpClientError::user_aborted()))
312             }
313         }
314 
315         // Creates a default `Uploader` based on `MyUploadOperator`.
316         // Configures your uploader by using `UploaderBuilder`.
317         let mut uploader = Uploader::builder()
318             .reader("HelloWorld".as_bytes())
319             .operator(MyUploadOperator)
320             .build();
321 
322         let mut user_slice = [0_u8; 12];
323         let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice());
324         let res =
325             ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf))
326                 .await;
327         assert_eq!(
328             format!("{:?}", res.err()),
329             format!(
330                 "{:?}",
331                 Some(std::io::Error::new(
332                     std::io::ErrorKind::Other,
333                     Box::new(HttpClientError::user_aborted())
334                 ))
335             ),
336         );
337     }
338 }
339