1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod builder; 15 mod operator; 16 17 use std::future::Future; 18 use std::pin::Pin; 19 use std::task::{Context, Poll}; 20 21 pub use builder::{UploaderBuilder, WantsReader}; 22 pub use operator::{Console, UploadOperator}; 23 use ylong_http::body::async_impl::ReusableReader; 24 use ylong_http::body::{MultiPart, MultiPartBase}; 25 26 use crate::runtime::{AsyncRead, ReadBuf}; 27 28 /// An uploader that can help you upload the request body. 29 /// 30 /// An `Uploader` provides a template method for uploading a file or a slice and 31 /// needs to use a structure that implements [`UploadOperator`] trait to read 32 /// the file or the slice and convert it into request body. 33 /// 34 /// The `UploadOperator` trait provides a [`progress`] method which is 35 /// responsible for progress display. 36 /// 37 /// You only need to provide a structure that implements the `UploadOperator` 38 /// trait to complete the upload process. 39 /// 40 /// A default structure `Console` which implements `UploadOperator` is 41 /// provided to show download message on console. You can use 42 /// `Uploader::console` to build a `Uploader` which based on it. 43 /// 44 /// [`UploadOperator`]: UploadOperator 45 /// [`progress`]: UploadOperator::progress 46 /// 47 /// # Examples 48 /// 49 /// `Console`: 50 /// ```no_run 51 /// # use ylong_http_client::async_impl::Uploader; 52 /// 53 /// // Creates a default `Uploader` that show progress on console. 54 /// let mut uploader = Uploader::console("HelloWorld".as_bytes()); 55 /// ``` 56 /// 57 /// `Custom`: 58 /// ```no_run 59 /// # use std::pin::Pin; 60 /// # use std::task::{Context, Poll}; 61 /// # use ylong_http_client::async_impl::{Uploader, UploadOperator, Response}; 62 /// # use ylong_http_client::{SpeedLimit, Timeout}; 63 /// # use ylong_http_client::HttpClientError; 64 /// 65 /// # async fn upload_and_show_progress() { 66 /// // Customizes your own `UploadOperator`. 67 /// struct MyUploadOperator; 68 /// 69 /// impl UploadOperator for MyUploadOperator { 70 /// fn poll_progress( 71 /// self: Pin<&mut Self>, 72 /// cx: &mut Context<'_>, 73 /// uploaded: u64, 74 /// total: Option<u64>, 75 /// ) -> Poll<Result<(), HttpClientError>> { 76 /// todo!() 77 /// } 78 /// } 79 /// 80 /// // Creates a default `Uploader` based on `MyUploadOperator`. 81 /// // Configures your uploader by using `UploaderBuilder`. 82 /// let uploader = Uploader::builder() 83 /// .reader("HelloWorld".as_bytes()) 84 /// .operator(MyUploadOperator) 85 /// .build(); 86 /// # } 87 /// ``` 88 pub struct Uploader<R, T> { 89 reader: R, 90 operator: T, 91 config: UploadConfig, 92 info: Option<UploadInfo>, 93 } 94 95 impl<R: ReusableReader + Unpin> Uploader<R, Console> { 96 /// Creates an `Uploader` with a `Console` operator which displays process 97 /// on console. 98 /// 99 /// # Examples 100 /// 101 /// ``` 102 /// # use ylong_http_client::async_impl::Uploader; 103 /// 104 /// let uploader = Uploader::console("HelloWorld".as_bytes()); 105 /// ``` console(reader: R) -> Uploader<R, Console>106 pub fn console(reader: R) -> Uploader<R, Console> { 107 UploaderBuilder::new().reader(reader).console().build() 108 } 109 } 110 111 impl Uploader<(), ()> { 112 /// Creates an `UploaderBuilder` and configures uploader step by step. 113 /// 114 /// # Examples 115 /// 116 /// ``` 117 /// # use ylong_http_client::async_impl::Uploader; 118 /// 119 /// let builder = Uploader::builder(); 120 /// ``` builder() -> UploaderBuilder<WantsReader>121 pub fn builder() -> UploaderBuilder<WantsReader> { 122 UploaderBuilder::new() 123 } 124 } 125 126 impl<R, T> AsyncRead for Uploader<R, T> 127 where 128 R: ReusableReader + Unpin, 129 T: UploadOperator + Unpin, 130 { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>131 fn poll_read( 132 self: Pin<&mut Self>, 133 cx: &mut Context<'_>, 134 buf: &mut ReadBuf<'_>, 135 ) -> Poll<std::io::Result<()>> { 136 let this = self.get_mut(); 137 138 if this.info.is_none() { 139 this.info = Some(UploadInfo::new()); 140 } 141 142 let info = this.info.as_mut().unwrap(); 143 144 match Pin::new(&mut this.operator).poll_progress( 145 cx, 146 info.uploaded_bytes, 147 this.config.total_bytes, 148 ) { 149 Poll::Ready(Ok(())) => {} 150 // TODO: Consider another way to handle error. 151 Poll::Ready(Err(e)) => { 152 return Poll::Ready(Err(std::io::Error::new( 153 std::io::ErrorKind::Other, 154 Box::new(e), 155 ))) 156 } 157 Poll::Pending => return Poll::Pending, 158 } 159 160 Pin::new(&mut this.reader).poll_read(cx, buf) 161 } 162 } 163 164 impl<R, T> ReusableReader for Uploader<R, T> 165 where 166 R: ReusableReader + Unpin, 167 T: UploadOperator + Unpin + Sync, 168 { reuse<'a>( &'a mut self, ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>> where Self: 'a,169 fn reuse<'a>( 170 &'a mut self, 171 ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + Sync + 'a>> 172 where 173 Self: 'a, 174 { 175 self.info = None; 176 self.reader.reuse() 177 } 178 } 179 180 impl<T: UploadOperator + Unpin + Sync> MultiPartBase for Uploader<MultiPart, T> { multipart(&self) -> &MultiPart181 fn multipart(&self) -> &MultiPart { 182 &self.reader 183 } 184 } 185 186 #[derive(Default)] 187 struct UploadConfig { 188 total_bytes: Option<u64>, 189 } 190 191 struct UploadInfo { 192 uploaded_bytes: u64, 193 } 194 195 impl UploadInfo { new() -> Self196 fn new() -> Self { 197 Self { uploaded_bytes: 0 } 198 } 199 } 200 201 #[cfg(all(test, feature = "ylong_base"))] 202 mod ut_uploader { 203 use ylong_http::body::{MultiPart, Part}; 204 use ylong_runtime::io::AsyncRead; 205 206 use crate::async_impl::uploader::{Context, Pin, Poll}; 207 use crate::async_impl::{UploadOperator, Uploader, UploaderBuilder}; 208 use crate::HttpClientError; 209 210 /// UT test cases for `UploadOperator::data`. 211 /// 212 /// # Brief 213 /// 1. Creates a `Uploader`. 214 /// 2. Calls `data` method. 215 /// 3. Checks if the result is correct. 216 217 #[test] ut_upload()218 fn ut_upload() { 219 let handle = ylong_runtime::spawn(async move { 220 upload().await; 221 }); 222 ylong_runtime::block_on(handle).unwrap(); 223 } 224 upload()225 async fn upload() { 226 let mut uploader = Uploader::console("HelloWorld".as_bytes()); 227 let mut user_slice = [0_u8; 10]; 228 let mut output_vec = vec![]; 229 230 let mut size = user_slice.len(); 231 while size == user_slice.len() { 232 let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice()); 233 ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf)) 234 .await 235 .unwrap(); 236 size = buf.filled_len(); 237 output_vec.extend_from_slice(&user_slice[..size]); 238 } 239 assert_eq!(&output_vec, b"HelloWorld"); 240 241 let mut user_slice = [0_u8; 12]; 242 let multipart = MultiPart::new().part(Part::new().name("name").body("xiaoming")); 243 let mut multi_uploader = UploaderBuilder::default() 244 .multipart(multipart) 245 .console() 246 .build(); 247 let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice()); 248 ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut multi_uploader).poll_read(cx, &mut buf)) 249 .await 250 .unwrap(); 251 let size = buf.filled_len(); 252 assert_eq!(size, 12); 253 } 254 255 /// UT test cases for `UploadOperator::progress`. 256 /// 257 /// # Brief 258 /// 1. Creates a `MyUploadOperator`. 259 /// 2. Calls `progress` method. 260 /// 3. Checks if the result is correct. 261 #[test] ut_upload_op_cov()262 fn ut_upload_op_cov() { 263 let handle = ylong_runtime::spawn(async move { 264 upload_op_cov().await; 265 }); 266 ylong_runtime::block_on(handle).unwrap(); 267 } 268 upload_op_cov()269 async fn upload_op_cov() { 270 struct MyUploadOperator; 271 impl UploadOperator for MyUploadOperator { 272 fn poll_progress( 273 self: Pin<&mut Self>, 274 _cx: &mut Context<'_>, 275 uploaded: u64, 276 total: Option<u64>, 277 ) -> Poll<Result<(), HttpClientError>> { 278 if uploaded > total.unwrap() { 279 return Poll::Ready(err_from_msg!(BodyTransfer, "UploadOperator failed")); 280 } 281 Poll::Ready(Ok(())) 282 } 283 } 284 let res = MyUploadOperator.progress(10, Some(20)).await; 285 assert!(res.is_ok()); 286 } 287 288 /// UT test cases for `Uploader::builder`. 289 /// 290 /// # Brief 291 /// 1. Creates a `UploaderBuilder` by `Uploader::builder`. 292 /// 2. Checks if the result is correct. 293 294 #[test] ut_uploader_builder()295 fn ut_uploader_builder() { 296 let handle = ylong_runtime::spawn(async { upload_and_show_progress().await }); 297 ylong_runtime::block_on(handle).unwrap(); 298 } 299 upload_and_show_progress()300 async fn upload_and_show_progress() { 301 // Customizes your own `UploadOperator`. 302 struct MyUploadOperator; 303 304 impl UploadOperator for MyUploadOperator { 305 fn poll_progress( 306 self: Pin<&mut Self>, 307 _cx: &mut Context<'_>, 308 _uploaded: u64, 309 _total: Option<u64>, 310 ) -> Poll<Result<(), HttpClientError>> { 311 Poll::Ready(Err(HttpClientError::user_aborted())) 312 } 313 } 314 315 // Creates a default `Uploader` based on `MyUploadOperator`. 316 // Configures your uploader by using `UploaderBuilder`. 317 let mut uploader = Uploader::builder() 318 .reader("HelloWorld".as_bytes()) 319 .operator(MyUploadOperator) 320 .build(); 321 322 let mut user_slice = [0_u8; 12]; 323 let mut buf = ylong_runtime::io::ReadBuf::new(user_slice.as_mut_slice()); 324 let res = 325 ylong_runtime::futures::poll_fn(|cx| Pin::new(&mut uploader).poll_read(cx, &mut buf)) 326 .await; 327 assert_eq!( 328 format!("{:?}", res.err()), 329 format!( 330 "{:?}", 331 Some(std::io::Error::new( 332 std::io::ErrorKind::Other, 333 Box::new(HttpClientError::user_aborted()) 334 )) 335 ), 336 ); 337 } 338 } 339