1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::io::IoSlice; 16 use std::ops::DerefMut; 17 use std::pin::Pin; 18 use std::task::{Context, Poll}; 19 20 use crate::io::write_task::{FlushTask, ShutdownTask, WriteAllTask, WriteTask, WriteVectoredTask}; 21 22 /// Async version of the `std::io::Write` trait. Provides all necessary writing 23 /// methods in an asynchronous style. 24 pub trait AsyncWrite { 25 /// Attempts to write bytes from buffer into an I/O source. 26 /// 27 /// If succeeds, this method will return `Poll::Ready(Ok(n))` where `n` 28 /// indicates the number of bytes that have been successfully written. 29 /// It's guaranteed that `n <= buf.len()`. 30 /// 31 /// If returns `Poll::Ready(Ok(0))`, one of the two scenarios below might 32 /// have occurred 33 /// 1. The underlying stream has been shut down and no longer accepts 34 /// any bytes. 35 /// 2. The buf passed in is empty 36 /// 37 /// If `Poll::Pending` is returned, it means that the output stream is 38 /// currently not ready for writing. In this case, this task will be put 39 /// to sleep until the underlying stream becomes writable or closed. poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>40 fn poll_write( 41 self: Pin<&mut Self>, 42 cx: &mut Context<'_>, 43 buf: &[u8], 44 ) -> Poll<io::Result<usize>>; 45 46 /// Attempts to write bytes from a slice of buffers into an I/O source. 47 /// 48 /// This default implementation writes the first none empty buffer, or 49 /// writes an empty one if all buffers are empty. poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>50 fn poll_write_vectored( 51 self: Pin<&mut Self>, 52 cx: &mut Context<'_>, 53 bufs: &[IoSlice<'_>], 54 ) -> Poll<io::Result<usize>> { 55 let buf = bufs 56 .iter() 57 .find(|b| !b.is_empty()) 58 .map_or(&[][..], |b| &**b); 59 self.poll_write(cx, buf) 60 } 61 62 /// Indicates whether this AsyncWrite implementation has an efficient 63 /// `write_vectored`. The default implementation is not. is_write_vectored(&self) -> bool64 fn is_write_vectored(&self) -> bool { 65 false 66 } 67 68 /// Attempts to flush the I/O source, ensuring that any buffered data has 69 /// been sent to their destination. 70 /// 71 /// If succeeds, `Poll::Ready(Ok(()))` will be returned 72 /// 73 /// If `Poll::Pending` is returned, it means the stream cannot be flushed 74 /// immediately. The task will continue once its waker receives a 75 /// notification indicating the stream is ready. poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>76 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>; 77 78 /// Attempts to shut down the writer, returns `Poll::Ready(Ok(()))` when the 79 /// underlying I/O connection is completely closed and therefore safe to 80 /// drop. 81 /// 82 /// This method is designed for asynchronous shutdown of the I/O connection. 83 /// For protocols like TLS or TCP, this is the place to do a last flush 84 /// of data and gracefully turn off the connection. 85 /// 86 /// If `Poll::Ready(Err(e))` is returned, it indicates a fatal error has 87 /// been occurred during the shutdown procedure. It typically means the 88 /// I/O source is already broken. 89 /// 90 /// If `Poll::Pending` is returned, it indicates the I/O connection is not 91 /// ready to shut down immediately, it may have another final data to be 92 /// flushed. This task will be continued once the waker receives a ready 93 /// notification from the connection. poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>94 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>; 95 } 96 97 macro_rules! async_write_deref { 98 () => { 99 /// A default poll_write implementation for an object that could be deref to an 100 /// AsyncWrite object. 101 fn poll_write( 102 mut self: Pin<&mut Self>, 103 cx: &mut Context<'_>, 104 buf: &[u8], 105 ) -> Poll<Result<usize, io::Error>> { 106 Pin::new(&mut **self).poll_write(cx, buf) 107 } 108 109 /// A default poll_write_vectored implementation for an object that could be 110 /// deref to an AsyncWrite object. 111 fn poll_write_vectored( 112 mut self: Pin<&mut Self>, 113 cx: &mut Context<'_>, 114 bufs: &[IoSlice<'_>], 115 ) -> Poll<Result<usize, io::Error>> { 116 Pin::new(&mut **self).poll_write_vectored(cx, bufs) 117 } 118 119 /// A default is_write_vectored implementation for an object that could be deref 120 /// to an AsyncWrite object. 121 fn is_write_vectored(&self) -> bool { 122 (**self).is_write_vectored() 123 } 124 125 /// A default poll_flush implementation for an object that could be deref to an 126 /// AsyncWrite object. 127 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 128 Pin::new(&mut **self).poll_flush(cx) 129 } 130 131 /// A default poll_shutdown implementation for an object that could be deref to 132 /// an AsyncWrite object. 133 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 134 Pin::new(&mut **self).poll_shutdown(cx) 135 } 136 }; 137 } 138 139 impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for Box<T> { 140 async_write_deref!(); 141 } 142 143 impl<T: AsyncWrite + Unpin + ?Sized> AsyncWrite for &mut T { 144 async_write_deref!(); 145 } 146 147 impl<T> AsyncWrite for Pin<T> 148 where 149 T: DerefMut<Target = dyn AsyncWrite> + Unpin, 150 { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>151 fn poll_write( 152 self: Pin<&mut Self>, 153 cx: &mut Context<'_>, 154 buf: &[u8], 155 ) -> Poll<Result<usize, io::Error>> { 156 Pin::as_mut(self.get_mut()).poll_write(cx, buf) 157 } 158 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>159 fn poll_write_vectored( 160 self: Pin<&mut Self>, 161 cx: &mut Context<'_>, 162 bufs: &[IoSlice<'_>], 163 ) -> Poll<Result<usize, io::Error>> { 164 Pin::as_mut(self.get_mut()).poll_write_vectored(cx, bufs) 165 } 166 is_write_vectored(&self) -> bool167 fn is_write_vectored(&self) -> bool { 168 (**self).is_write_vectored() 169 } 170 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>171 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 172 Pin::as_mut(self.get_mut()).poll_flush(cx) 173 } 174 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>175 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 176 Pin::as_mut(self.get_mut()).poll_shutdown(cx) 177 } 178 } 179 180 /// An external trait that is automatically implemented for any object that has 181 /// the AsyncWrite trait. Provides std-like writing methods such as `write`, 182 /// `write_vectored`, 'write_all'. Every method in this trait returns a future 183 /// object. Awaits on the future will complete the task but it doesn't guarantee 184 /// whether the task will finished immediately or asynchronously. 185 pub trait AsyncWriteExt: AsyncWrite { 186 /// Writes data from the buffer into the I/O source. 187 /// 188 /// On success, `Ok(n)` will be returned, where `n` indicates the number of 189 /// bytes that have been successfully written into the buffer. It 190 /// guarantees `0 <= n < buf.len()`, and if `n == 0`, then one of the 191 /// two scenarios below might have been occurred. 192 /// 1. The underlying I/O no longer accepts any bytes. 193 /// 2. The length of the buffer passed in is 0. 194 /// 195 /// `Err(e)` will be returned when encounters a fatal error during the write 196 /// procedure. This method should not write anything into the buffer if 197 /// an error has occurred. 198 /// 199 /// Not writing the entire buffer into the I/O is not an error. 200 /// # Examples 201 /// ```no run 202 /// let mut io = File::create("foo.txt").await?; 203 /// let buf = [1, 2, 3]; 204 /// let n = io.write(&buf).await?; 205 /// ``` write<'a>(&'a mut self, buf: &'a [u8]) -> WriteTask<'a, Self> where Self: Unpin,206 fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteTask<'a, Self> 207 where 208 Self: Unpin, 209 { 210 WriteTask::new(self, buf) 211 } 212 213 /// Writes data from a slice of buffers into the I/O source. 214 /// 215 /// Data is copied from each buffer in order, with the final buffer 216 /// read from possibly being only partially consumed. This method must 217 /// behave as a call to [`write`] with the buffers concatenated would. 218 /// 219 /// Return values of this method are the same as [`write`]. 220 /// 221 /// # Examples 222 /// ```no run 223 /// let mut data1 = [1, 2, 3]; 224 /// let mut data2 = [4, 5, 6]; 225 /// let slice1 = IoSlice::new(&mut data1); 226 /// let slice2 = IoSlice::new(&mut data2); 227 /// let mut io = Filre::create("foo.txt").await?; 228 /// let n = io.write_vectored(&[slice1, slice2]).await?; 229 /// ``` write_vectored<'a, 'b>( &'a mut self, bufs: &'a [IoSlice<'b>], ) -> WriteVectoredTask<'a, 'b, Self> where Self: Unpin,230 fn write_vectored<'a, 'b>( 231 &'a mut self, 232 bufs: &'a [IoSlice<'b>], 233 ) -> WriteVectoredTask<'a, 'b, Self> 234 where 235 Self: Unpin, 236 { 237 WriteVectoredTask::new(self, bufs) 238 } 239 240 /// Writes all data from the buffer into the I/O source. 241 /// 242 /// On success, `Ok(())` will be returned, indicating all data from the 243 /// buffer has been written into the I/O. 244 /// 245 /// If a write error occurs during the process, this method will finish 246 /// immediately, the number of bytes that has been written is 247 /// unspecified. 248 /// 249 /// # Examples 250 /// ```no run 251 /// let mut io = File::create("foo.txt").await?; 252 /// let buf = [0; 16384]; 253 /// let n = io.read_to_end(&buf).await?; 254 /// ``` write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllTask<'a, Self> where Self: Unpin,255 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllTask<'a, Self> 256 where 257 Self: Unpin, 258 { 259 WriteAllTask::new(self, buf) 260 } 261 262 /// Flushes the stream to ensure that all data reach the destination. 263 /// 264 /// `Err(e)` will be returned when the I/O error occurring or EOF being 265 /// reached. 266 /// 267 /// # Examples 268 /// ```no run 269 /// let mut io = File::create("foo.txt").await?; 270 /// let buf = [1, 2, 3]; 271 /// let n = io.write(&buf).await?; 272 /// io.flush().await?; 273 /// ``` flush(&mut self) -> FlushTask<'_, Self> where Self: Unpin,274 fn flush(&mut self) -> FlushTask<'_, Self> 275 where 276 Self: Unpin, 277 { 278 FlushTask::new(self) 279 } 280 281 /// Shuts down the stream. 282 /// 283 /// # Examples 284 /// ```no run 285 /// let mut io = File::create("foo.txt").await?; 286 /// let buf = [1, 2, 3]; 287 /// let n = io.write(&buf).await?; 288 /// io.shutdown().await?; 289 /// ``` shutdown(&mut self) -> ShutdownTask<'_, Self> where Self: Unpin,290 fn shutdown(&mut self) -> ShutdownTask<'_, Self> 291 where 292 Self: Unpin, 293 { 294 ShutdownTask::new(self) 295 } 296 297 /// Creates a "by reference" adaptor for this instance of `AsyncRead`. 298 /// 299 /// The returned adapter also implements `AsyncRead` and will simply borrow 300 /// this current reader. 301 /// 302 /// # Examples 303 /// 304 /// ```no run 305 /// use std::io; 306 /// 307 /// use ylong_runtime::fs::File; 308 /// use ylong_runtime::io::AsyncWriteExt; 309 /// 310 /// async fn async_io() -> io::Result<()> { 311 /// let mut buffer = File::create("foo.txt").await?; 312 /// 313 /// let reference = buffer.by_ref(); 314 /// 315 /// // we can use reference just like our original buffer 316 /// reference.write_all(b"some bytes").await?; 317 /// Ok(()) 318 /// } 319 /// ``` by_ref(&mut self) -> &mut Self where Self: Sized,320 fn by_ref(&mut self) -> &mut Self 321 where 322 Self: Sized, 323 { 324 self 325 } 326 } 327 328 impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} 329