1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::io::IoSlice;
16 use std::ops::DerefMut;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 use crate::io::write_task::{FlushTask, ShutdownTask, WriteAllTask, WriteTask, WriteVectoredTask};
21 
22 /// Async version of the `std::io::Write` trait. Provides all necessary writing
23 /// methods in an asynchronous style.
24 pub trait AsyncWrite {
25     /// Attempts to write bytes from buffer into an I/O source.
26     ///
27     /// If succeeds, this method will return `Poll::Ready(Ok(n))` where `n`
28     /// indicates the number of bytes that have been successfully written.
29     /// It's guaranteed that `n <= buf.len()`.
30     ///
31     /// If returns `Poll::Ready(Ok(0))`, one of the two scenarios below might
32     /// have occurred
33     ///     1. The underlying stream has been shut down and no longer accepts
34     ///        any bytes.
35     ///     2. The buf passed in is empty
36     ///
37     /// If `Poll::Pending` is returned, it means that the output stream is
38     /// currently not ready for writing. In this case, this task will be put
39     /// to sleep until the underlying stream becomes writable or closed.
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>40     fn poll_write(
41         self: Pin<&mut Self>,
42         cx: &mut Context<'_>,
43         buf: &[u8],
44     ) -> Poll<io::Result<usize>>;
45 
46     /// Attempts to write bytes from a slice of buffers into an I/O source.
47     ///
48     /// This default implementation writes the first none empty buffer, or
49     /// writes an empty one if all buffers are empty.
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>50     fn poll_write_vectored(
51         self: Pin<&mut Self>,
52         cx: &mut Context<'_>,
53         bufs: &[IoSlice<'_>],
54     ) -> Poll<io::Result<usize>> {
55         let buf = bufs
56             .iter()
57             .find(|b| !b.is_empty())
58             .map_or(&[][..], |b| &**b);
59         self.poll_write(cx, buf)
60     }
61 
62     /// Indicates whether this AsyncWrite implementation has an efficient
63     /// `write_vectored`. The default implementation is not.
is_write_vectored(&self) -> bool64     fn is_write_vectored(&self) -> bool {
65         false
66     }
67 
68     /// Attempts to flush the I/O source, ensuring that any buffered data has
69     /// been sent to their destination.
70     ///
71     /// If succeeds, `Poll::Ready(Ok(()))` will be returned
72     ///
73     /// If `Poll::Pending` is returned, it means the stream cannot be flushed
74     /// immediately. The task will continue once its waker receives a
75     /// notification indicating the stream is ready.
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>76     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
77 
78     /// Attempts to shut down the writer, returns `Poll::Ready(Ok(()))` when the
79     /// underlying I/O connection is completely closed and therefore safe to
80     /// drop.
81     ///
82     /// This method is designed for asynchronous shutdown of the I/O connection.
83     /// For protocols like TLS or TCP, this is the place to do a last flush
84     /// of data and gracefully turn off the connection.
85     ///
86     /// If `Poll::Ready(Err(e))` is returned, it indicates a fatal error has
87     /// been occurred during the shutdown procedure. It typically means the
88     /// I/O source is already broken.
89     ///
90     /// If `Poll::Pending` is returned, it indicates the I/O connection is not
91     /// ready to shut down immediately, it may have another final data to be
92     /// flushed. This task will be continued once the waker receives a ready
93     /// notification from the connection.
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>94     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
95 }
96 
97 macro_rules! async_write_deref {
98     () => {
99         /// A default poll_write implementation for an object that could be deref to an
100         /// AsyncWrite object.
101         fn poll_write(
102             mut self: Pin<&mut Self>,
103             cx: &mut Context<'_>,
104             buf: &[u8],
105         ) -> Poll<Result<usize, io::Error>> {
106             Pin::new(&mut **self).poll_write(cx, buf)
107         }
108 
109         /// A default poll_write_vectored implementation for an object that could be
110         /// deref to an AsyncWrite object.
111         fn poll_write_vectored(
112             mut self: Pin<&mut Self>,
113             cx: &mut Context<'_>,
114             bufs: &[IoSlice<'_>],
115         ) -> Poll<Result<usize, io::Error>> {
116             Pin::new(&mut **self).poll_write_vectored(cx, bufs)
117         }
118 
119         /// A default is_write_vectored implementation for an object that could be deref
120         /// to an AsyncWrite object.
121         fn is_write_vectored(&self) -> bool {
122             (**self).is_write_vectored()
123         }
124 
125         /// A default poll_flush implementation for an object that could be deref to an
126         /// AsyncWrite object.
127         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
128             Pin::new(&mut **self).poll_flush(cx)
129         }
130 
131         /// A default poll_shutdown implementation for an object that could be deref to
132         /// an AsyncWrite object.
133         fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
134             Pin::new(&mut **self).poll_shutdown(cx)
135         }
136     };
137 }
138 
139 impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for Box<T> {
140     async_write_deref!();
141 }
142 
143 impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for &mut T {
144     async_write_deref!();
145 }
146 
147 impl<T> AsyncWrite for Pin<T>
148 where
149     T: DerefMut<Target = dyn AsyncWrite> + Unpin,
150 {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>151     fn poll_write(
152         self: Pin<&mut Self>,
153         cx: &mut Context<'_>,
154         buf: &[u8],
155     ) -> Poll<Result<usize, io::Error>> {
156         Pin::as_mut(self.get_mut()).poll_write(cx, buf)
157     }
158 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>159     fn poll_write_vectored(
160         self: Pin<&mut Self>,
161         cx: &mut Context<'_>,
162         bufs: &[IoSlice<'_>],
163     ) -> Poll<Result<usize, io::Error>> {
164         Pin::as_mut(self.get_mut()).poll_write_vectored(cx, bufs)
165     }
166 
is_write_vectored(&self) -> bool167     fn is_write_vectored(&self) -> bool {
168         (**self).is_write_vectored()
169     }
170 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>171     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
172         Pin::as_mut(self.get_mut()).poll_flush(cx)
173     }
174 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>175     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
176         Pin::as_mut(self.get_mut()).poll_shutdown(cx)
177     }
178 }
179 
180 /// An external trait that is automatically implemented for any object that has
181 /// the AsyncWrite trait. Provides std-like writing methods such as `write`,
182 /// `write_vectored`, 'write_all'. Every method in this trait returns a future
183 /// object. Awaits on the future will complete the task but it doesn't guarantee
184 /// whether the task will finished immediately or asynchronously.
185 pub trait AsyncWriteExt: AsyncWrite {
186     /// Writes data from the buffer into the I/O source.
187     ///
188     /// On success, `Ok(n)` will be returned, where `n` indicates the number of
189     /// bytes that have been successfully written into the buffer. It
190     /// guarantees `0 <= n < buf.len()`, and if `n == 0`, then one of the
191     /// two scenarios below might have been occurred.
192     ///     1. The underlying I/O no longer accepts any bytes.
193     ///     2. The length of the buffer passed in is 0.
194     ///
195     /// `Err(e)` will be returned when encounters a fatal error during the write
196     /// procedure. This method should not write anything into the buffer if
197     /// an error has occurred.
198     ///
199     /// Not writing the entire buffer into the I/O is not an error.
200     /// # Examples
201     /// ```no run
202     /// let mut io = File::create("foo.txt").await?;
203     /// let buf = [1, 2, 3];
204     /// let n = io.write(&buf).await?;
205     /// ```
write<'a>(&'a mut self, buf: &'a [u8]) -> WriteTask<'a, Self> where Self: Unpin,206     fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteTask<'a, Self>
207     where
208         Self: Unpin,
209     {
210         WriteTask::new(self, buf)
211     }
212 
213     /// Writes data from a slice of buffers into the I/O source.
214     ///
215     /// Data is copied from each buffer in order, with the final buffer
216     /// read from possibly being only partially consumed. This method must
217     /// behave as a call to [`write`] with the buffers concatenated would.
218     ///
219     /// Return values of this method are the same as [`write`].
220     ///
221     /// # Examples
222     /// ```no run
223     /// let mut data1 = [1, 2, 3];
224     /// let mut data2 = [4, 5, 6];
225     /// let slice1 = IoSlice::new(&mut data1);
226     /// let slice2 = IoSlice::new(&mut data2);
227     /// let mut io = Filre::create("foo.txt").await?;
228     /// let n = io.write_vectored(&[slice1, slice2]).await?;
229     /// ```
write_vectored<'a, 'b>( &'a mut self, bufs: &'a [IoSlice<'b>], ) -> WriteVectoredTask<'a, 'b, Self> where Self: Unpin,230     fn write_vectored<'a, 'b>(
231         &'a mut self,
232         bufs: &'a [IoSlice<'b>],
233     ) -> WriteVectoredTask<'a, 'b, Self>
234     where
235         Self: Unpin,
236     {
237         WriteVectoredTask::new(self, bufs)
238     }
239 
240     /// Writes all data from the buffer into the I/O source.
241     ///
242     /// On success, `Ok(())` will be returned, indicating all data from the
243     /// buffer has been written into the I/O.
244     ///
245     /// If a write error occurs during the process, this method will finish
246     /// immediately, the number of bytes that has been written is
247     /// unspecified.
248     ///
249     /// # Examples
250     /// ```no run
251     /// let mut io = File::create("foo.txt").await?;
252     /// let buf = [0; 16384];
253     /// let n = io.read_to_end(&buf).await?;
254     /// ```
write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllTask<'a, Self> where Self: Unpin,255     fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllTask<'a, Self>
256     where
257         Self: Unpin,
258     {
259         WriteAllTask::new(self, buf)
260     }
261 
262     /// Flushes the stream to ensure that all data reach the destination.
263     ///
264     /// `Err(e)` will be returned when the I/O error occurring or EOF being
265     /// reached.
266     ///
267     /// # Examples
268     /// ```no run
269     /// let mut io = File::create("foo.txt").await?;
270     /// let buf = [1, 2, 3];
271     /// let n = io.write(&buf).await?;
272     /// io.flush().await?;
273     /// ```
flush(&mut self) -> FlushTask<'_, Self> where Self: Unpin,274     fn flush(&mut self) -> FlushTask<'_, Self>
275     where
276         Self: Unpin,
277     {
278         FlushTask::new(self)
279     }
280 
281     /// Shuts down the stream.
282     ///
283     /// # Examples
284     /// ```no run
285     /// let mut io = File::create("foo.txt").await?;
286     /// let buf = [1, 2, 3];
287     /// let n = io.write(&buf).await?;
288     /// io.shutdown().await?;
289     /// ```
shutdown(&mut self) -> ShutdownTask<'_, Self> where Self: Unpin,290     fn shutdown(&mut self) -> ShutdownTask<'_, Self>
291     where
292         Self: Unpin,
293     {
294         ShutdownTask::new(self)
295     }
296 
297     /// Creates a "by reference" adaptor for this instance of `AsyncRead`.
298     ///
299     /// The returned adapter also implements `AsyncRead` and will simply borrow
300     /// this current reader.
301     ///
302     /// # Examples
303     ///
304     /// ```no run
305     /// use std::io;
306     ///
307     /// use ylong_runtime::fs::File;
308     /// use ylong_runtime::io::AsyncWriteExt;
309     ///
310     /// async fn async_io() -> io::Result<()> {
311     ///     let mut buffer = File::create("foo.txt").await?;
312     ///
313     ///     let reference = buffer.by_ref();
314     ///
315     ///     // we can use reference just like our original buffer
316     ///     reference.write_all(b"some bytes").await?;
317     ///     Ok(())
318     /// }
319     /// ```
by_ref(&mut self) -> &mut Self where Self: Sized,320     fn by_ref(&mut self) -> &mut Self
321     where
322         Self: Sized,
323     {
324         self
325     }
326 }
327 
328 impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}
329