1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::future::Future;
15 use std::mem::MaybeUninit;
16 use std::pin::Pin;
17 use std::slice::from_raw_parts_mut;
18 use std::string::FromUtf8Error;
19 use std::task::{Context, Poll};
20 use std::{io, mem};
21
22 use crate::futures::poll_fn;
23 use crate::io::async_buf_read::AsyncBufRead;
24 use crate::io::async_read::AsyncRead;
25 use crate::io::poll_ready;
26 use crate::io::read_buf::ReadBuf;
27
28 macro_rules! take_reader {
29 ($self: expr) => {
30 match $self.reader.take() {
31 Some(reader) => reader,
32 None => panic!("read: poll after finished"),
33 }
34 };
35 }
36
37 /// A future for reading available data from the source into a buffer.
38 ///
39 /// Returned by [`crate::io::AsyncReadExt::read`]
40 pub struct ReadTask<'a, R: ?Sized> {
41 reader: Option<&'a mut R>,
42 buf: &'a mut [u8],
43 }
44
45 impl<'a, R: ?Sized> ReadTask<'a, R> {
46 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R>47 pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadTask<'a, R> {
48 ReadTask {
49 reader: Some(reader),
50 buf,
51 }
52 }
53 }
54
55 impl<'a, R> Future for ReadTask<'a, R>
56 where
57 R: AsyncRead + Unpin,
58 {
59 type Output = io::Result<usize>;
60
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>61 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62 let mut reader = take_reader!(self);
63
64 let mut buf = ReadBuf::new(self.buf);
65 match Pin::new(&mut reader).poll_read(cx, &mut buf) {
66 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
67 Poll::Ready(_) => Poll::Ready(Ok(buf.filled_len())),
68 Poll::Pending => {
69 self.reader = Some(reader);
70 Poll::Pending
71 }
72 }
73 }
74 }
75
76 /// A future for reading every data from the source into a vector.
77 ///
78 /// Returned by [`crate::io::AsyncReadExt::read_to_end`]
79 pub struct ReadToEndTask<'a, R: ?Sized> {
80 reader: &'a mut R,
81 buf: &'a mut Vec<u8>,
82 r_len: usize,
83 }
84
85 impl<'a, R: ?Sized> ReadToEndTask<'a, R> {
86 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R>87 pub(crate) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEndTask<'a, R> {
88 ReadToEndTask {
89 reader,
90 buf,
91 r_len: 0,
92 }
93 }
94 }
95 const PROBE_SIZE: usize = 32;
96
poll_read_to_end<R: AsyncRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>97 fn poll_read_to_end<R: AsyncRead + Unpin>(
98 buf: &mut Vec<u8>,
99 mut reader: &mut R,
100 read_len: &mut usize,
101 cx: &mut Context<'_>,
102 ) -> Poll<io::Result<usize>> {
103 loop {
104 // Allocate spaces to read, if the remaining capacity is larger than 32
105 // bytes, this will do nothing.
106 buf.try_reserve(PROBE_SIZE)
107 .map_err(|_| io::ErrorKind::OutOfMemory)?;
108 let len = buf.len();
109 let mut read_buf = ReadBuf::uninit(unsafe {
110 from_raw_parts_mut(buf.as_mut_ptr().cast::<MaybeUninit<u8>>(), buf.capacity())
111 });
112 read_buf.assume_init(len);
113 read_buf.set_filled(len);
114
115 let poll = Pin::new(&mut reader).poll_read(cx, &mut read_buf);
116 let new_len = read_buf.filled_len();
117 match poll {
118 Poll::Pending => return Poll::Pending,
119 Poll::Ready(Ok(())) if (new_len - len) == 0 => {
120 return Poll::Ready(Ok(mem::replace(read_len, 0)))
121 }
122 Poll::Ready(Ok(())) => {
123 *read_len += new_len - len;
124 unsafe {
125 buf.set_len(new_len);
126 }
127 }
128 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
129 }
130 }
131 }
132
133 impl<'a, R> Future for ReadToEndTask<'a, R>
134 where
135 R: AsyncRead + Unpin,
136 {
137 type Output = io::Result<usize>;
138
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>139 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140 let me = self.get_mut();
141 let (buf, reader, read_len) = (&mut me.buf, &mut me.reader, &mut me.r_len);
142 poll_read_to_end(buf, *reader, read_len, cx)
143 }
144 }
145
146 /// A future for reading every data from the source into a String.
147 ///
148 /// Returned by [`crate::io::AsyncReadExt::read_to_string`]
149 pub struct ReadToStringTask<'a, R: ?Sized> {
150 reader: &'a mut R,
151 buf: Vec<u8>,
152 output: &'a mut String,
153 r_len: usize,
154 }
155
156 impl<'a, R: ?Sized> ReadToStringTask<'a, R> {
157 #[inline(always)]
new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R>158 pub(crate) fn new(reader: &'a mut R, dst: &'a mut String) -> ReadToStringTask<'a, R> {
159 ReadToStringTask {
160 reader,
161 buf: mem::take(dst).into_bytes(),
162 output: dst,
163 r_len: 0,
164 }
165 }
166 }
167
io_string_result( io_res: io::Result<usize>, str_res: Result<String, FromUtf8Error>, read_len: usize, output: &mut String, ) -> Poll<io::Result<usize>>168 fn io_string_result(
169 io_res: io::Result<usize>,
170 str_res: Result<String, FromUtf8Error>,
171 read_len: usize,
172 output: &mut String,
173 ) -> Poll<io::Result<usize>> {
174 match (io_res, str_res) {
175 (Ok(bytes), Ok(string)) => {
176 *output = string;
177 Poll::Ready(Ok(bytes))
178 }
179 (Ok(bytes), Err(trans_err)) => {
180 let mut vector = trans_err.into_bytes();
181 let len = vector.len() - bytes;
182 vector.truncate(len);
183 *output = String::from_utf8(vector)
184 .unwrap_or_else(|e| panic!("Invalid utf-8 data, error: {e}"));
185 Poll::Ready(Err(io::Error::new(
186 io::ErrorKind::InvalidData,
187 "Invalid utf-8 data",
188 )))
189 }
190 (Err(io_err), Ok(string)) => {
191 *output = string;
192 Poll::Ready(Err(io_err))
193 }
194 (Err(io_err), Err(trans_err)) => {
195 let mut vector = trans_err.into_bytes();
196 let len = vector.len() - read_len;
197 vector.truncate(len);
198 *output = String::from_utf8(vector)
199 .unwrap_or_else(|e| panic!("Invalid utf-8 data, error: {e}"));
200 Poll::Ready(Err(io_err))
201 }
202 }
203 }
204
205 impl<'a, R> Future for ReadToStringTask<'a, R>
206 where
207 R: AsyncRead + Unpin,
208 {
209 type Output = io::Result<usize>;
210
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>211 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
212 let me = self.get_mut();
213 let (buf, output, reader, read_len) =
214 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
215 let res = poll_ready!(poll_read_to_end(buf, *reader, read_len, cx));
216 let trans = String::from_utf8(mem::take(buf));
217
218 io_string_result(res, trans, *read_len, output)
219 }
220 }
221
222 /// A future for reading exact amount of bytes from the source into a vector.
223 ///
224 /// Returned by [`crate::io::AsyncReadExt::read_exact`]
225 pub struct ReadExactTask<'a, R: ?Sized> {
226 reader: Option<&'a mut R>,
227 buf: ReadBuf<'a>,
228 }
229
230 impl<'a, R: ?Sized> ReadExactTask<'a, R> {
231 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R>232 pub(crate) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> ReadExactTask<'a, R> {
233 ReadExactTask {
234 reader: Some(reader),
235 buf: ReadBuf::new(buf),
236 }
237 }
238 }
239
240 impl<'a, R> Future for ReadExactTask<'a, R>
241 where
242 R: AsyncRead + Unpin,
243 {
244 type Output = io::Result<()>;
245
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>246 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
247 let mut reader = take_reader!(self);
248 let this = self.get_mut();
249
250 loop {
251 let remain = this.buf.remaining();
252 if remain == 0 {
253 return Poll::Ready(Ok(()));
254 }
255 let _ = match Pin::new(&mut reader).poll_read(cx, &mut this.buf) {
256 Poll::Pending => {
257 this.reader = Some(reader);
258 return Poll::Pending;
259 }
260 x => x?,
261 };
262 if this.buf.remaining() == remain {
263 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
264 }
265 }
266 }
267 }
268
269 /// A future for reading every data from the source into a vector until the
270 /// desired delimiter appears.
271 ///
272 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
273 pub struct ReadUtilTask<'a, R: ?Sized> {
274 reader: &'a mut R,
275 r_len: usize,
276 delim: u8,
277 buf: &'a mut Vec<u8>,
278 }
279
280 impl<'a, R: ?Sized> ReadUtilTask<'a, R> {
281 #[inline(always)]
new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R>282 pub(crate) fn new(reader: &'a mut R, delim: u8, buf: &'a mut Vec<u8>) -> ReadUtilTask<'a, R> {
283 ReadUtilTask {
284 reader,
285 r_len: 0,
286 delim,
287 buf,
288 }
289 }
290 }
291
poll_read_until<R: AsyncBufRead + Unpin>( buf: &mut Vec<u8>, mut reader: &mut R, delim: u8, read_len: &mut usize, cx: &mut Context<'_>, ) -> Poll<io::Result<usize>>292 fn poll_read_until<R: AsyncBufRead + Unpin>(
293 buf: &mut Vec<u8>,
294 mut reader: &mut R,
295 delim: u8,
296 read_len: &mut usize,
297 cx: &mut Context<'_>,
298 ) -> Poll<io::Result<usize>> {
299 loop {
300 let (done, used) = {
301 let available = poll_ready!(Pin::new(&mut reader).poll_fill_buf(cx))?;
302
303 let ret = available.iter().position(|&val| val == delim);
304
305 match ret {
306 None => {
307 buf.extend_from_slice(available);
308 (false, available.len())
309 }
310 Some(i) => {
311 buf.extend_from_slice(&available[..=i]);
312 (true, i + 1)
313 }
314 }
315 };
316 Pin::new(&mut reader).consume(used);
317 *read_len += used;
318 if done || used == 0 {
319 return Poll::Ready(Ok(mem::replace(read_len, 0)));
320 }
321 }
322 }
323
324 impl<'a, R> Future for ReadUtilTask<'a, R>
325 where
326 R: AsyncBufRead + Unpin,
327 {
328 type Output = io::Result<usize>;
329
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>330 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
331 let me = self.get_mut();
332 let (buf, reader, delim, read_len) = (&mut me.buf, &mut me.reader, me.delim, &mut me.r_len);
333 poll_read_until(buf, *reader, delim, read_len, cx)
334 }
335 }
336
337 /// A future for reading every data from the source into a vector until the
338 /// desired delimiter appears.
339 ///
340 /// Returned by [`crate::io::AsyncBufReadExt::read_until`]
341 pub struct ReadLineTask<'a, R: ?Sized> {
342 reader: &'a mut R,
343 r_len: usize,
344 buf: Vec<u8>,
345 output: &'a mut String,
346 }
347
348 impl<'a, R: ?Sized> ReadLineTask<'a, R> {
349 #[inline(always)]
new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R>350 pub(crate) fn new(reader: &'a mut R, buf: &'a mut String) -> ReadLineTask<'a, R> {
351 ReadLineTask {
352 reader,
353 r_len: 0,
354 buf: mem::take(buf).into_bytes(),
355 output: buf,
356 }
357 }
358 }
359
360 impl<'a, R> Future for ReadLineTask<'a, R>
361 where
362 R: AsyncBufRead + Unpin,
363 {
364 type Output = io::Result<usize>;
365
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>366 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367 let me = self.get_mut();
368 let (buf, output, reader, read_len) =
369 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
370 let res = poll_ready!(poll_read_until(buf, *reader, b'\n', read_len, cx));
371 let trans = String::from_utf8(mem::take(buf));
372
373 io_string_result(res, trans, *read_len, output)
374 }
375 }
376
377 /// A future for reading every data from the source into a vector and splitting
378 /// it into segments by a delimiter.
379 ///
380 /// Returned by [`crate::io::AsyncBufReadExt::split`]
381 pub struct SplitTask<R> {
382 reader: R,
383 delim: u8,
384 buf: Vec<u8>,
385 r_len: usize,
386 }
387
388 impl<R> SplitTask<R>
389 where
390 R: AsyncBufRead + Unpin,
391 {
new(reader: R, delim: u8) -> SplitTask<R>392 pub(crate) fn new(reader: R, delim: u8) -> SplitTask<R> {
393 SplitTask {
394 reader,
395 delim,
396 buf: Vec::new(),
397 r_len: 0,
398 }
399 }
400
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>>401 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Option<Vec<u8>>>> {
402 let me = self.get_mut();
403 let (buf, reader, read_len, delim) = (&mut me.buf, &mut me.reader, &mut me.r_len, me.delim);
404 let res = poll_ready!(poll_read_until(buf, reader, delim, read_len, cx))?;
405
406 if buf.is_empty() && res == 0 {
407 return Poll::Ready(Ok(None));
408 }
409
410 if buf.last() == Some(&delim) {
411 buf.pop();
412 }
413 Poll::Ready(Ok(Some(mem::take(buf))))
414 }
415
next(&mut self) -> io::Result<Option<Vec<u8>>>416 pub async fn next(&mut self) -> io::Result<Option<Vec<u8>>> {
417 poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
418 }
419 }
420
421 /// A future for reading every data from the source into a vector and splitting
422 /// it into segments by row.
423 ///
424 /// Returned by [`crate::io::AsyncBufReadExt::split`]
425 pub struct LinesTask<R> {
426 reader: R,
427 buf: Vec<u8>,
428 output: String,
429 r_len: usize,
430 }
431
432 impl<R> LinesTask<R>
433 where
434 R: AsyncBufRead,
435 {
new(reader: R) -> LinesTask<R>436 pub(crate) fn new(reader: R) -> LinesTask<R> {
437 LinesTask {
438 reader,
439 buf: Vec::new(),
440 output: String::new(),
441 r_len: 0,
442 }
443 }
444 }
445
446 impl<R> LinesTask<R>
447 where
448 R: AsyncBufRead + Unpin,
449 {
poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<Option<String>>>450 fn poll_next_line(
451 self: Pin<&mut Self>,
452 cx: &mut Context<'_>,
453 ) -> Poll<io::Result<Option<String>>> {
454 let me = self.get_mut();
455 let (buf, output, reader, read_len) =
456 (&mut me.buf, &mut me.output, &mut me.reader, &mut me.r_len);
457 let io_res = poll_ready!(poll_read_until(buf, reader, b'\n', read_len, cx));
458 let str_res = String::from_utf8(mem::take(buf));
459
460 let res = poll_ready!(io_string_result(io_res, str_res, *read_len, output))?;
461
462 if output.is_empty() && res == 0 {
463 return Poll::Ready(Ok(None));
464 }
465
466 if output.ends_with('\n') {
467 output.pop();
468 if output.ends_with('\r') {
469 output.pop();
470 }
471 }
472 Poll::Ready(Ok(Some(mem::take(output))))
473 }
474
next_line(&mut self) -> io::Result<Option<String>>475 pub async fn next_line(&mut self) -> io::Result<Option<String>> {
476 poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
477 }
478 }
479
480 #[cfg(all(test, feature = "fs"))]
481 mod test {
482 use crate::fs::{remove_file, File};
483 use crate::io::async_read::AsyncReadExt;
484 use crate::io::async_write::AsyncWriteExt;
485 use crate::io::AsyncBufReader;
486
487 /// UT test cases for `io_string_result()`.
488 ///
489 /// # Brief
490 /// 1. Create a file and write non-utf8 chars to it.
491 /// 2. Create a AsyncBufReader.
492 /// 3. Call io_string_result() to translate the content of the file to
493 /// String.
494 /// 4. Check if the test results are expected errors.
495 #[test]
ut_io_string_result()496 fn ut_io_string_result() {
497 let handle = crate::spawn(async move {
498 let file_path = "foo.txt";
499
500 let mut f = File::create(file_path).await.unwrap();
501 let buf = [0, 159, 146, 150];
502 let n = f.write(&buf).await.unwrap();
503 assert_eq!(n, 4);
504
505 let f = File::open(file_path).await.unwrap();
506 let mut reader = AsyncBufReader::new(f);
507 let mut buf = String::new();
508 let res = reader.read_to_string(&mut buf).await;
509 assert!(res.is_err());
510 assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
511
512 let res = remove_file(file_path).await;
513 assert!(res.is_ok());
514 });
515 crate::block_on(handle).expect("failed to block on");
516 }
517 }
518