1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::mem::MaybeUninit;
16 use std::pin::Pin;
17 use std::slice::from_raw_parts_mut;
18 use std::string::FromUtf8Error;
19 use std::task::{Context, Poll};
20 use std::{io, mem};
21 
22 use crate::futures::poll_fn;
23 use crate::io::async_buf_read::AsyncBufRead;
24 use crate::io::async_read::AsyncRead;
25 use crate::io::poll_ready;
26 use crate::io::read_buf::ReadBuf;
27 
28 macro_rules! take_reader {
29     ($self: expr) => {
30         match $self.reader.take() {
31             Some(reader) => reader,
32             None => panic!("read: poll after finished"),
33         }
34     };
35 }
36 
37 /// A future for reading available data from the source into a buffer.
38 ///
39 /// Returned by [`crate::io::AsyncReadExt::read`]
40 pub struct ReadTask<'a, R: ?Sized> {
41     reader: Option<&'a mut R>,
42     buf: &'a mut [u8],
43 }
44 
45 impl<'a, R: ?Sized> ReadTask<'a, R> {
46     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R>47     pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R> {
48         ReadTask {
49             reader: Some(reader),
50             buf,
51         }
52     }
53 }
54 
55 impl<'a, R> Future for ReadTask<'a, R>
56 where
57     R: AsyncRead + Unpin,
58 {
59     type Output = io::Result<usize>;
60 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>61     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62         let mut reader = take_reader!(self);
63 
64         let mut buf = ReadBuf::new(self.buf);
65         match Pin::new(&mut reader).poll_read(cx, &mut buf) {
66             Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
67             Poll::Ready(_) => Poll::Ready(Ok(buf.filled_len())),
68             Poll::Pending => {
69                 self.reader = Some(reader);
70                 Poll::Pending
71             }
72         }
73     }
74 }
75 
76 /// A future for reading every data from the source into a vector.
77 ///
78 /// Returned by [`crate::io::AsyncReadExt::read_to_end`]
79 pub struct ReadToEndTask<'a, R: ?Sized> {
80     reader: &'a mut R,
81     buf: &'a mut Vec<u8>,
82     r_len: usize,
83 }
84 
85 impl<'a, R: ?Sized> ReadToEndTask<'a, R> {
86     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R>87     pub(crate) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R> {
88         ReadToEndTask {
89             reader,
90             buf,
91             r_len: 0,
92         }
93     }
94 }
95 const PROBE_SIZE: usize = 32;
96 
poll_read_to_end<R: AsyncRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>97 fn poll_read_to_end<R: AsyncRead + Unpin>(
98     buf: &mut Vec<u8>,
99     mut reader: &mut R,
100     read_len: &mut usize,
101     cx: &mut Context<'_>,
102 ) -> Poll<io::Result<usize>> {
103     loop {
104         // Allocate spaces to read, if the remaining capacity is larger than 32
105         // bytes, this will do nothing.
106         buf.try_reserve(PROBE_SIZE)
107             .map_err(|_| io::ErrorKind::OutOfMemory)?;
108         let len = buf.len();
109         let mut read_buf = ReadBuf::uninit(unsafe {
110             from_raw_parts_mut(buf.as_mut_ptr().cast::<MaybeUninit<u8>>(), buf.capacity())
111         });
112         read_buf.assume_init(len);
113         read_buf.set_filled(len);
114 
115         let poll = Pin::new(&mut reader).poll_read(cx, &mut read_buf);
116         let new_len = read_buf.filled_len();
117         match poll {
118             Poll::Pending => return Poll::Pending,
119             Poll::Ready(Ok(())) if (new_len - len) == 0 => {
120                 return Poll::Ready(Ok(mem::replace(read_len, 0)))
121             }
122             Poll::Ready(Ok(())) => {
123                 *read_len += new_len - len;
124                 unsafe {
125                     buf.set_len(new_len);
126                 }
127             }
128             Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
129         }
130     }
131 }
132 
133 impl<'a, R> Future for ReadToEndTask<'a, R>
134 where
135     R: AsyncRead + Unpin,
136 {
137     type Output = io::Result<usize>;
138 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>139     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140         let me = self.get_mut();
141         let (buf, reader, read_len) = (&mut me.buf, &mut me.reader, &mut me.r_len);
142         poll_read_to_end(buf, *reader, read_len, cx)
143     }
144 }
145 
146 /// A future for reading every data from the source into a String.
147 ///
148 /// Returned by [`crate::io::AsyncReadExt::read_to_string`]
149 pub struct ReadToStringTask<'a, R: ?Sized> {
150     reader: &'a mut R,
151     buf: Vec<u8>,
152     output: &'a mut String,
153     r_len: usize,
154 }
155 
156 impl<'a, R: ?Sized> ReadToStringTask<'a, R> {
157     #[inline(always)]
new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R>158     pub(crate) fn new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R> {
159         ReadToStringTask {
160             reader,
161             buf: mem::take(dst).into_bytes(),
162             output: dst,
163             r_len: 0,
164         }
165     }
166 }
167 
io_string_result( io_res: io::Result<usize>, str_res: Result<String, FromUtf8Error>, read_len: usize, output: &mut String, ) -> Poll<io::Result<usize>>168 fn io_string_result(
169     io_res: io::Result<usize>,
170     str_res: Result<String, FromUtf8Error>,
171     read_len: usize,
172     output: &mut String,
173 ) -> Poll<io::Result<usize>> {
174     match (io_res, str_res) {
175         (Ok(bytes), Ok(string)) => {
176             *output = string;
177             Poll::Ready(Ok(bytes))
178         }
179         (Ok(bytes), Err(trans_err)) => {
180             let mut vector = trans_err.into_bytes();
181             let len = vector.len() - bytes;
182             vector.truncate(len);
183             *output = String::from_utf8(vector)
184                 .unwrap_or_else(|e| panic!("Invalid utf-8 data, error: {e}"));
185             Poll::Ready(Err(io::Error::new(
186                 io::ErrorKind::InvalidData,
187                 "Invalid utf-8 data",
188             )))
189         }
190         (Err(io_err), Ok(string)) => {
191             *output = string;
192             Poll::Ready(Err(io_err))
193         }
194         (Err(io_err), Err(trans_err)) => {
195             let mut vector = trans_err.into_bytes();
196             let len = vector.len() - read_len;
197             vector.truncate(len);
198             *output = String::from_utf8(vector)
199                 .unwrap_or_else(|e| panic!("Invalid utf-8 data, error: {e}"));
200             Poll::Ready(Err(io_err))
201         }
202     }
203 }
204 
205 impl<'a, R> Future for ReadToStringTask<'a, R>
206 where
207     R: AsyncRead + Unpin,
208 {
209     type Output = io::Result<usize>;
210 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>211     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212         let me = self.get_mut();
213         let (buf, output, reader, read_len) =
214             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
215         let res = poll_ready!(poll_read_to_end(buf, *reader, read_len, cx));
216         let trans = String::from_utf8(mem::take(buf));
217 
218         io_string_result(res, trans, *read_len, output)
219     }
220 }
221 
222 /// A future for reading exact amount of bytes from the source into a vector.
223 ///
224 /// Returned by [`crate::io::AsyncReadExt::read_exact`]
225 pub struct ReadExactTask<'a, R: ?Sized> {
226     reader: Option<&'a mut R>,
227     buf: ReadBuf<'a>,
228 }
229 
230 impl<'a, R: ?Sized> ReadExactTask<'a, R> {
231     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R>232     pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R> {
233         ReadExactTask {
234             reader: Some(reader),
235             buf: ReadBuf::new(buf),
236         }
237     }
238 }
239 
240 impl<'a, R> Future for ReadExactTask<'a, R>
241 where
242     R: AsyncRead + Unpin,
243 {
244     type Output = io::Result<()>;
245 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>246     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
247         let mut reader = take_reader!(self);
248         let this = self.get_mut();
249 
250         loop {
251             let remain = this.buf.remaining();
252             if remain == 0 {
253                 return Poll::Ready(Ok(()));
254             }
255             let _ = match Pin::new(&mut reader).poll_read(cx, &mut this.buf) {
256                 Poll::Pending => {
257                     this.reader = Some(reader);
258                     return Poll::Pending;
259                 }
260                 x => x?,
261             };
262             if this.buf.remaining() == remain {
263                 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
264             }
265         }
266     }
267 }
268 
269 /// A future for reading every data from the source into a vector until the
270 /// desired delimiter appears.
271 ///
272 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
273 pub struct ReadUtilTask<'a, R: ?Sized> {
274     reader: &'a mut R,
275     r_len: usize,
276     delim: u8,
277     buf: &'a mut Vec<u8>,
278 }
279 
280 impl<'a, R: ?Sized> ReadUtilTask<'a, R> {
281     #[inline(always)]
new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R>282     pub(crate) fn new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R> {
283         ReadUtilTask {
284             reader,
285             r_len: 0,
286             delim,
287             buf,
288         }
289     }
290 }
291 
poll_read_until<R: AsyncBufRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, delim: u8, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>292 fn poll_read_until<R: AsyncBufRead + Unpin>(
293     buf: &mut Vec<u8>,
294     mut reader: &mut R,
295     delim: u8,
296     read_len: &mut usize,
297     cx: &mut Context<'_>,
298 ) -> Poll<io::Result<usize>> {
299     loop {
300         let (done, used) = {
301             let available = poll_ready!(Pin::new(&mut reader).poll_fill_buf(cx))?;
302 
303             let ret = available.iter().position(|&val| val == delim);
304 
305             match ret {
306                 None => {
307                     buf.extend_from_slice(available);
308                     (false, available.len())
309                 }
310                 Some(i) => {
311                     buf.extend_from_slice(&available[..=i]);
312                     (true, i + 1)
313                 }
314             }
315         };
316         Pin::new(&mut reader).consume(used);
317         *read_len += used;
318         if done || used == 0 {
319             return Poll::Ready(Ok(mem::replace(read_len, 0)));
320         }
321     }
322 }
323 
324 impl<'a, R> Future for ReadUtilTask<'a, R>
325 where
326     R: AsyncBufRead + Unpin,
327 {
328     type Output = io::Result<usize>;
329 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>330     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
331         let me = self.get_mut();
332         let (buf, reader, delim, read_len) = (&mut me.buf, &mut me.reader, me.delim, &mut me.r_len);
333         poll_read_until(buf, *reader, delim, read_len, cx)
334     }
335 }
336 
337 /// A future for reading every data from the source into a vector until the
338 /// desired delimiter appears.
339 ///
340 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
341 pub struct ReadLineTask<'a, R: ?Sized> {
342     reader: &'a mut R,
343     r_len: usize,
344     buf: Vec<u8>,
345     output: &'a mut String,
346 }
347 
348 impl<'a, R: ?Sized> ReadLineTask<'a, R> {
349     #[inline(always)]
new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R>350     pub(crate) fn new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R> {
351         ReadLineTask {
352             reader,
353             r_len: 0,
354             buf: mem::take(buf).into_bytes(),
355             output: buf,
356         }
357     }
358 }
359 
360 impl<'a, R> Future for ReadLineTask<'a, R>
361 where
362     R: AsyncBufRead + Unpin,
363 {
364     type Output = io::Result<usize>;
365 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>366     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367         let me = self.get_mut();
368         let (buf, output, reader, read_len) =
369             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
370         let res = poll_ready!(poll_read_until(buf, *reader, b'\n', read_len, cx));
371         let trans = String::from_utf8(mem::take(buf));
372 
373         io_string_result(res, trans, *read_len, output)
374     }
375 }
376 
377 /// A future for reading every data from the source into a vector and splitting
378 /// it into segments by a delimiter.
379 ///
380 /// Returned by [`crate::io::AsyncBufReadExt::split`]
381 pub struct SplitTask<R> {
382     reader: R,
383     delim: u8,
384     buf: Vec<u8>,
385     r_len: usize,
386 }
387 
388 impl<R> SplitTask<R>
389 where
390     R: AsyncBufRead + Unpin,
391 {
new(reader: R, delim: u8) -> SplitTask<R>392     pub(crate) fn new(reader: R, delim: u8) -> SplitTask<R> {
393         SplitTask {
394             reader,
395             delim,
396             buf: Vec::new(),
397             r_len: 0,
398         }
399     }
400 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>>401     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>> {
402         let me = self.get_mut();
403         let (buf, reader, read_len, delim) = (&mut me.buf, &mut me.reader, &mut me.r_len, me.delim);
404         let res = poll_ready!(poll_read_until(buf, reader, delim, read_len, cx))?;
405 
406         if buf.is_empty() && res == 0 {
407             return Poll::Ready(Ok(None));
408         }
409 
410         if buf.last() == Some(&delim) {
411             buf.pop();
412         }
413         Poll::Ready(Ok(Some(mem::take(buf))))
414     }
415 
next(&mut self) -> io::Result<Option<Vec<u8>>>416     pub async fn next(&mut self) -> io::Result<Option<Vec<u8>>> {
417         poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
418     }
419 }
420 
421 /// A future for reading every data from the source into a vector and splitting
422 /// it into segments by row.
423 ///
424 /// Returned by [`crate::io::AsyncBufReadExt::split`]
425 pub struct LinesTask<R> {
426     reader: R,
427     buf: Vec<u8>,
428     output: String,
429     r_len: usize,
430 }
431 
432 impl<R> LinesTask<R>
433 where
434     R: AsyncBufRead,
435 {
new(reader: R) -> LinesTask<R>436     pub(crate) fn new(reader: R) -> LinesTask<R> {
437         LinesTask {
438             reader,
439             buf: Vec::new(),
440             output: String::new(),
441             r_len: 0,
442         }
443     }
444 }
445 
446 impl<R> LinesTask<R>
447 where
448     R: AsyncBufRead + Unpin,
449 {
poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<Option<String>>>450     fn poll_next_line(
451         self: Pin<&mut Self>,
452         cx: &mut Context<'_>,
453     ) -> Poll<io::Result<Option<String>>> {
454         let me = self.get_mut();
455         let (buf, output, reader, read_len) =
456             (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
457         let io_res = poll_ready!(poll_read_until(buf, reader, b'\n', read_len, cx));
458         let str_res = String::from_utf8(mem::take(buf));
459 
460         let res = poll_ready!(io_string_result(io_res, str_res, *read_len, output))?;
461 
462         if output.is_empty() && res == 0 {
463             return Poll::Ready(Ok(None));
464         }
465 
466         if output.ends_with('\n') {
467             output.pop();
468             if output.ends_with('\r') {
469                 output.pop();
470             }
471         }
472         Poll::Ready(Ok(Some(mem::take(output))))
473     }
474 
next_line(&mut self) -> io::Result<Option<String>>475     pub async fn next_line(&mut self) -> io::Result<Option<String>> {
476         poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
477     }
478 }
479 
480 #[cfg(all(test, feature = "fs"))]
481 mod test {
482     use crate::fs::{remove_file, File};
483     use crate::io::async_read::AsyncReadExt;
484     use crate::io::async_write::AsyncWriteExt;
485     use crate::io::AsyncBufReader;
486 
487     /// UT test cases for `io_string_result()`.
488     ///
489     /// # Brief
490     /// 1. Create a file and write non-utf8 chars to it.
491     /// 2. Create a AsyncBufReader.
492     /// 3. Call io_string_result() to translate the content of the file to
493     ///    String.
494     /// 4. Check if the test results are expected errors.
495     #[test]
ut_io_string_result()496     fn ut_io_string_result() {
497         let handle = crate::spawn(async move {
498             let file_path = "foo.txt";
499 
500             let mut f = File::create(file_path).await.unwrap();
501             let buf = [0, 159, 146, 150];
502             let n = f.write(&buf).await.unwrap();
503             assert_eq!(n, 4);
504 
505             let f = File::open(file_path).await.unwrap();
506             let mut reader = AsyncBufReader::new(f);
507             let mut buf = String::new();
508             let res = reader.read_to_string(&mut buf).await;
509             assert!(res.is_err());
510             assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
511 
512             let res = remove_file(file_path).await;
513             assert!(res.is_ok());
514         });
515         crate::block_on(handle).expect("failed to block on");
516     }
517 }
518