1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::fmt::{Debug, Formatter};
15 use std::io;
16 use std::io::{IoSlice, Read, Write};
17 use std::net::{Shutdown, SocketAddr};
18 use std::pin::Pin;
19 use std::sync::Arc;
20 use std::task::{Context, Poll};
21 use std::time::Duration;
22 
23 use ylong_io::Interest;
24 
25 use super::split::{SplitReadHalf, SplitWriteHalf};
26 use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
27 use crate::net::sys::tcp::split::{BorrowReadHalf, BorrowWriteHalf};
28 use crate::net::sys::ToSocketAddrs;
29 use crate::net::AsyncSource;
30 
31 /// An asynchronous version of [`std::net::TcpStream`]
32 ///
33 /// After creating a `TcpStream` by either connecting to a remote host or
34 /// accepting a connection on a `TcpListener`, data can be transmitted
35 /// asynchronously by reading and writing to it.
36 ///
37 ///
38 /// # Example
39 /// ```rust
40 /// use std::io;
41 /// use std::io::{IoSlice, IoSliceMut};
42 ///
43 /// use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt};
44 /// use ylong_runtime::net::TcpStream;
45 ///
46 /// async fn io_func() -> io::Result<()> {
47 ///     let addr = "127.0.0.1:8080";
48 ///     let mut stream = TcpStream::connect(addr).await?;
49 ///
50 ///     let _ = stream.write(b"hello client").await?;
51 ///     let _ = stream
52 ///         .write_vectored(&[IoSlice::new(b"hello client")])
53 ///         .await?;
54 ///
55 ///     let mut read_buf = [0 as u8; 1024];
56 ///     let _ = stream.read(&mut read_buf).await?;
57 ///     let _ = stream.read(&mut read_buf).await?;
58 ///     Ok(())
59 /// }
60 /// ```
61 pub struct TcpStream {
62     pub(crate) source: AsyncSource<ylong_io::TcpStream>,
63 }
64 
65 impl Debug for TcpStream {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result66     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67         self.source.fmt(f)
68     }
69 }
70 
71 impl TcpStream {
72     /// Opens a TCP connection to a remote host asynchronously.
73     ///
74     /// # Note
75     ///
76     /// If there are multiple addresses in SocketAddr, it will attempt to
77     /// connect them in sequence until one of the addrs returns success. If
78     /// all connections fail, it returns the error of the last connection.
79     /// This behavior is consistent with std.
80     ///
81     /// # Panic
82     /// Calling this method outside of a Ylong Runtime could cause panic.
83     ///
84     /// # Example
85     /// ```rust
86     /// use std::io;
87     ///
88     /// use ylong_runtime::net::TcpStream;
89     ///
90     /// async fn io_func() -> io::Result<()> {
91     ///     let addr = "127.0.0.1:8080";
92     ///     let mut stream = TcpStream::connect(addr).await?;
93     ///     Ok(())
94     /// }
95     /// ```
connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self>96     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
97         let stream = super::super::addr::each_addr(addr, ylong_io::TcpStream::connect).await?;
98         Self::connect_inner(stream).await
99     }
100 
connect_inner(stream: ylong_io::TcpStream) -> io::Result<Self>101     async fn connect_inner(stream: ylong_io::TcpStream) -> io::Result<Self> {
102         let stream = TcpStream::new(stream)?;
103         stream
104             .source
105             .async_process(
106                 // Wait until the stream is writable
107                 Interest::WRITABLE,
108                 || Ok(()),
109             )
110             .await?;
111 
112         if let Some(e) = stream.source.take_error()? {
113             return Err(e);
114         }
115         Ok(stream)
116     }
117 
118     // Registers the ylong_io::TcpStream's fd to the reactor, and returns async
119     // TcpStream.
new(stream: ylong_io::TcpStream) -> io::Result<Self>120     pub(crate) fn new(stream: ylong_io::TcpStream) -> io::Result<Self> {
121         let source = AsyncSource::new(stream, None)?;
122         Ok(TcpStream { source })
123     }
124 
125     /// Waits for the socket to become readable.
126     ///
127     /// This function is usually paired up with [`TcpStream::try_read`]
128     ///
129     /// # Examples
130     ///
131     /// ```no_run
132     /// use std::io;
133     ///
134     /// use ylong_runtime::net::TcpStream;
135     ///
136     /// async fn io_func() -> io::Result<()> {
137     ///     let local_addr = "127.0.0.1:8080";
138     ///     let stream = TcpStream::connect(local_addr).await?;
139     ///
140     ///     let mut buf = vec![0; 12];
141     ///
142     ///     loop {
143     ///         stream.readable().await?;
144     ///         match stream.try_read(&mut buf) {
145     ///             Ok(_) => break,
146     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
147     ///             Err(e) => panic!("{:?}", e),
148     ///         }
149     ///     }
150     ///
151     ///     Ok(())
152     /// }
153     /// ```
readable(&self) -> io::Result<()>154     pub async fn readable(&self) -> io::Result<()> {
155         self.source.entry.readiness(Interest::READABLE).await?;
156         Ok(())
157     }
158 
159     /// Attempts to read data from the stream to the provided buffer, returning
160     /// the number of bytes read.
161     ///
162     /// This function is usually paired up with [`TcpStream::readable`]
163     ///
164     /// # Examples
165     ///
166     /// ```no_run
167     /// use std::io;
168     ///
169     /// use ylong_runtime::net::TcpStream;
170     ///
171     /// async fn io_func() -> io::Result<()> {
172     ///     let local_addr = "127.0.0.1:8080";
173     ///     let stream = TcpStream::connect(local_addr).await?;
174     ///
175     ///     let mut buf = vec![0; 12];
176     ///
177     ///     loop {
178     ///         stream.readable().await?;
179     ///         match stream.try_read(&mut buf) {
180     ///             Ok(_) => break,
181     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
182     ///             Err(e) => panic!("{:?}", e),
183     ///         }
184     ///     }
185     ///
186     ///     Ok(())
187     /// }
188     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>189     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
190         self.source
191             .try_io(Interest::READABLE, || (&*self.source).read(buf))
192     }
193 
194     /// Waits for the socket to become writable.
195     ///
196     /// This function is usually paired up with [`TcpStream::try_write`]
197     ///
198     /// # Examples
199     ///
200     /// ```no_run
201     /// use std::io;
202     ///
203     /// use ylong_runtime::net::TcpStream;
204     ///
205     /// async fn io_func() -> io::Result<()> {
206     ///     let local_addr = "127.0.0.1:8080";
207     ///     let stream = TcpStream::connect(local_addr).await?;
208     ///
209     ///     loop {
210     ///         stream.writable().await?;
211     ///         match stream.try_write(b"write message") {
212     ///             Ok(_) => break,
213     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
214     ///             Err(e) => panic!("{:?}", e),
215     ///         }
216     ///     }
217     ///
218     ///     Ok(())
219     /// }
220     /// ```
writable(&self) -> io::Result<()>221     pub async fn writable(&self) -> io::Result<()> {
222         self.source.entry.readiness(Interest::WRITABLE).await?;
223         Ok(())
224     }
225 
226     /// Attempts to write data to the stream, returning the number of bytes
227     /// written.
228     ///
229     /// This function is usually paired up with [`TcpStream::writable`]
230     ///
231     /// # Examples
232     ///
233     /// ```no_run
234     /// use std::io;
235     ///
236     /// use ylong_runtime::net::TcpStream;
237     ///
238     /// async fn io_func() -> io::Result<()> {
239     ///     let local_addr = "127.0.0.1:8080";
240     ///     let stream = TcpStream::connect(local_addr).await?;
241     ///
242     ///     loop {
243     ///         stream.writable().await?;
244     ///         match stream.try_write(b"write message") {
245     ///             Ok(_) => break,
246     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
247     ///             Err(e) => panic!("{:?}", e),
248     ///         }
249     ///     }
250     ///
251     ///     Ok(())
252     /// }
253     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>254     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
255         self.source
256             .try_io(Interest::WRITABLE, || (&*self.source).write(buf))
257     }
258 
259     /// Returns the socket address of the remote half of this TCP connection.
260     ///
261     /// # Example
262     ///
263     /// ```no_run
264     /// use std::io;
265     /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
266     ///
267     /// use ylong_runtime::net::TcpStream;
268     ///
269     /// async fn io_func() -> io::Result<()> {
270     ///     let addr = "127.0.0.1:1234";
271     ///     let stream = TcpStream::connect(addr)
272     ///         .await
273     ///         .expect("Couldn't connect to the server...");
274     ///     assert_eq!(
275     ///         stream.local_addr().unwrap().ip(),
276     ///         IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
277     ///     );
278     ///     Ok(())
279     /// }
280     /// ```
local_addr(&self) -> io::Result<SocketAddr>281     pub fn local_addr(&self) -> io::Result<SocketAddr> {
282         self.source.local_addr()
283     }
284 
285     /// Returns the socket address of the remote half of this TCP connection.
286     ///
287     /// # Example
288     ///
289     /// ```no_run
290     /// use std::io;
291     /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
292     ///
293     /// use ylong_runtime::net::TcpStream;
294     ///
295     /// async fn io_func() -> io::Result<()> {
296     ///     let addr = "127.0.0.1:1234";
297     ///     let stream = TcpStream::connect(addr)
298     ///         .await
299     ///         .expect("Couldn't connect to the server...");
300     ///     assert_eq!(
301     ///         stream.peer_addr().unwrap(),
302     ///         SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234))
303     ///     );
304     ///     Ok(())
305     /// }
306     /// ```
peer_addr(&self) -> io::Result<SocketAddr>307     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
308         self.source.peer_addr()
309     }
310 
311     /// Sets the value of the `TCP_NODELAY`.
312     ///
313     /// # Examples
314     ///
315     /// ```no_run
316     /// use std::io;
317     ///
318     /// use ylong_runtime::net::TcpStream;
319     ///
320     /// async fn io_func() -> io::Result<()> {
321     ///     let addr = "127.0.0.1:1234";
322     ///     let stream = TcpStream::connect(addr)
323     ///         .await
324     ///         .expect("Couldn't connect to the server...");
325     ///     stream.set_nodelay(true).expect("set_nodelay call failed");
326     ///     Ok(())
327     /// }
328     /// ```
set_nodelay(&self, nodelay: bool) -> io::Result<()>329     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
330         self.source.set_nodelay(nodelay)
331     }
332 
333     /// Gets the value of the `TCP_NODELAY`.
334     ///
335     /// # Examples
336     ///
337     /// ```no_run
338     /// use std::io;
339     ///
340     /// use ylong_runtime::net::TcpStream;
341     ///
342     /// async fn io_func() -> io::Result<()> {
343     ///     let addr = "127.0.0.1:1234";
344     ///     let stream = TcpStream::connect(addr)
345     ///         .await
346     ///         .expect("Couldn't connect to the server...");
347     ///     stream.set_nodelay(true).expect("set_nodelay call failed");
348     ///     assert_eq!(stream.nodelay().unwrap_or(false), true);
349     ///     Ok(())
350     /// }
351     /// ```
nodelay(&self) -> io::Result<bool>352     pub fn nodelay(&self) -> io::Result<bool> {
353         self.source.nodelay()
354     }
355 
356     /// Sets the value of the linger on this socket by setting `SO_LINGER`
357     /// option.
358     ///
359     /// This value controls how the socket close when a stream has unsent data.
360     /// If SO_LINGER is set, the socket will still open for the duration as
361     /// the system attempts to send pending data. Otherwise, the system may
362     /// close the socket immediately, or wait for a default timeout.
363     ///
364     /// # Examples
365     ///
366     /// ```no_run
367     /// use std::io;
368     ///
369     /// use ylong_runtime::net::TcpStream;
370     ///
371     /// async fn io_func() -> io::Result<()> {
372     ///     let addr = "127.0.0.1:1234";
373     ///     let stream = TcpStream::connect(addr)
374     ///         .await
375     ///         .expect("Couldn't connect to the server...");
376     ///
377     ///     stream.set_linger(None).expect("Sets linger fail.");
378     ///     Ok(())
379     /// }
380     /// ```
set_linger(&self, linger: Option<Duration>) -> io::Result<()>381     pub fn set_linger(&self, linger: Option<Duration>) -> io::Result<()> {
382         self.source.set_linger(linger)
383     }
384 
385     /// Gets the value of the linger on this socket by getting `SO_LINGER`
386     /// option.
387     ///
388     /// # Examples
389     ///
390     /// ```no_run
391     /// use std::io;
392     ///
393     /// use ylong_runtime::net::TcpStream;
394     ///
395     /// async fn io_func() -> io::Result<()> {
396     ///     let addr = "127.0.0.1:1234";
397     ///     let stream = TcpStream::connect(addr)
398     ///         .await
399     ///         .expect("Couldn't connect to the server...");
400     ///
401     ///     println!("{:?}", stream.linger());
402     ///     Ok(())
403     /// }
404     /// ```
linger(&self) -> io::Result<Option<Duration>>405     pub fn linger(&self) -> io::Result<Option<Duration>> {
406         self.source.linger()
407     }
408 
409     /// Sets the value for the `IP_TTL`.
410     ///
411     /// # Examples
412     ///
413     /// ```no_run
414     /// use std::io;
415     ///
416     /// use ylong_runtime::net::TcpStream;
417     ///
418     /// async fn io_func() -> io::Result<()> {
419     ///     let addr = "127.0.0.1:1234";
420     ///     let stream = TcpStream::connect(addr)
421     ///         .await
422     ///         .expect("Couldn't connect to the server...");
423     ///     stream.set_ttl(100).expect("set_ttl call failed");
424     ///     Ok(())
425     /// }
426     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>427     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
428         self.source.set_ttl(ttl)
429     }
430 
431     /// Gets the value of the `IP_TTL`.
432     ///
433     /// # Examples
434     ///
435     /// ```no_run
436     /// use std::io;
437     ///
438     /// use ylong_runtime::net::TcpStream;
439     ///
440     /// async fn io_func() -> io::Result<()> {
441     ///     let addr = "127.0.0.1:1234";
442     ///     let stream = TcpStream::connect(addr)
443     ///         .await
444     ///         .expect("Couldn't connect to the server...");
445     ///     stream.set_ttl(100).expect("set_ttl call failed");
446     ///     assert_eq!(stream.ttl().unwrap_or(0), 100);
447     ///     Ok(())
448     /// }
449     /// ```
ttl(&self) -> io::Result<u32>450     pub fn ttl(&self) -> io::Result<u32> {
451         self.source.ttl()
452     }
453 
454     /// Splits a TcpStream into a read half and a write half with reference,
455     /// which can be used to read and write the stream concurrently.
456     ///
457     /// # Example
458     ///
459     /// ```no_run
460     /// use std::io;
461     ///
462     /// use ylong_runtime::net::TcpStream;
463     ///
464     /// async fn io_func() -> io::Result<()> {
465     ///     let addr = "127.0.0.1:1234";
466     ///     let mut stream = TcpStream::connect(addr)
467     ///         .await
468     ///         .expect("Couldn't connect to the server...");
469     ///     let (read, write) = stream.split();
470     ///     Ok(())
471     /// }
472     /// ```
split(&mut self) -> (BorrowReadHalf, BorrowWriteHalf)473     pub fn split(&mut self) -> (BorrowReadHalf, BorrowWriteHalf) {
474         let read = BorrowReadHalf(self);
475         let write = BorrowWriteHalf(self);
476         (read, write)
477     }
478 
479     /// Splits a TcpStream into a read half and a write half,
480     /// which can be used to read and write the stream concurrently.
481     ///
482     /// # Example
483     ///
484     /// ```no_run
485     /// use std::io;
486     ///
487     /// use ylong_runtime::net::TcpStream;
488     ///
489     /// async fn io_func() -> io::Result<()> {
490     ///     let addr = "127.0.0.1:1234";
491     ///     let stream = TcpStream::connect(addr)
492     ///         .await
493     ///         .expect("Couldn't connect to the server...");
494     ///     let (read, write) = stream.into_split();
495     ///     Ok(())
496     /// }
497     /// ```
into_split(self) -> (SplitReadHalf, SplitWriteHalf)498     pub fn into_split(self) -> (SplitReadHalf, SplitWriteHalf) {
499         let arc = Arc::new(self);
500         let read = SplitReadHalf(Arc::clone(&arc));
501         let write = SplitWriteHalf(Arc::clone(&arc));
502         (read, write)
503     }
504 
505     /// Receives data on the socket from the remote address to which it is
506     /// connected, without removing that data from the queue.
507     /// On success, returns the number of bytes peeked.
508     ///
509     /// # Example
510     ///
511     /// ```no_run
512     /// use std::io;
513     /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
514     ///
515     /// use ylong_runtime::net::TcpStream;
516     ///
517     /// async fn io_func() -> io::Result<()> {
518     ///     let addr = "127.0.0.1:1234";
519     ///     let stream = TcpStream::connect(addr)
520     ///         .await
521     ///         .expect("Couldn't connect to the server...");
522     ///     let mut buf = [0; 10];
523     ///     let len = stream.peek(&mut buf).await.expect("peek failed");
524     ///     Ok(())
525     /// }
526     /// ```
peek(&self, buf: &mut [u8]) -> io::Result<usize>527     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
528         self.source
529             .async_process(Interest::READABLE, || self.source.peek(buf))
530             .await
531     }
532 
533     /// Gets the value of the SO_ERROR.
534     ///
535     /// # Example
536     ///
537     /// ```no_run
538     /// use std::io;
539     /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
540     ///
541     /// use ylong_runtime::net::TcpStream;
542     ///
543     /// async fn io_func() -> io::Result<()> {
544     ///     let addr = "127.0.0.1:1234";
545     ///     let stream = TcpStream::connect(addr)
546     ///         .await
547     ///         .expect("Couldn't connect to the server...");
548     ///     match stream.take_error() {
549     ///         Ok(Some(error)) => println!("TcpStream error: {error:?}"),
550     ///         Ok(None) => println!("No error"),
551     ///         Err(error) => println!("TcpStream.take_error failed: {error:?}"),
552     ///     }
553     ///     Ok(())
554     /// }
555     /// ```
take_error(&self) -> io::Result<Option<io::Error>>556     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
557         self.source.take_error()
558     }
559 
560     // todo: make this async
561     /// Shutdown TcpStream
shutdown(&self, how: Shutdown) -> io::Result<()>562     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
563         self.source.shutdown(how)
564     }
565 
566     /// Sets the owner for this source's fd
567     ///
568     /// # Error
569     /// This method calls libc::fchown, libc::fchown returns the following
570     /// errors [`libc::EBADF`]: The fd argument is not an open file descriptor.
571     /// [`libc::EPERM`]: The effective user ID does not match the owner of the file or the process does not have appropriate privilege and _POSIX_CHOWN_RESTRICTED indicates that such privilege is required.
572     /// [`libc::EROFS`]:The file referred to by fildes resides on a read-only file system.
573     /// [`libc::EINVAL`]: The owner or group ID is not a value supported by the implementation.
574     /// [`libc::EIO`]: A physical I/O error has occurred.
575     /// [`libc::EINTR`]: The fchown() function was interrupted by a signal which was caught.
576     ///
577     /// # Example
578     /// ```no_run
579     /// use std::io;
580     ///
581     /// use ylong_runtime::net::TcpStream;
582     ///
583     /// async fn io_func() -> io::Result<()> {
584     ///     let addr = "127.0.0.1:1234";
585     ///     let stream = TcpStream::connect(addr)
586     ///         .await
587     ///         .expect("Couldn't connect to the server...");
588     ///     stream.fchown(0, 0)?;
589     ///     Ok(())
590     /// }
591     /// ```
592     #[cfg(target_os = "linux")]
fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()>593     pub fn fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()> {
594         self.source.fchown(uid, gid)
595     }
596 }
597 
598 impl AsyncRead for TcpStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>599     fn poll_read(
600         self: Pin<&mut Self>,
601         cx: &mut Context<'_>,
602         buf: &mut ReadBuf<'_>,
603     ) -> Poll<io::Result<()>> {
604         self.source.poll_read(cx, buf)
605     }
606 }
607 
608 impl AsyncWrite for TcpStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>609     fn poll_write(
610         self: Pin<&mut Self>,
611         cx: &mut Context<'_>,
612         buf: &[u8],
613     ) -> Poll<io::Result<usize>> {
614         self.source.poll_write(cx, buf)
615     }
616 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>617     fn poll_write_vectored(
618         self: Pin<&mut Self>,
619         cx: &mut Context<'_>,
620         bufs: &[IoSlice<'_>],
621     ) -> Poll<io::Result<usize>> {
622         self.source.poll_write_vectored(cx, bufs)
623     }
624 
is_write_vectored(&self) -> bool625     fn is_write_vectored(&self) -> bool {
626         true
627     }
628 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>629     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
630         Poll::Ready(Ok(()))
631     }
632 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>633     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
634         self.source.shutdown(std::net::Shutdown::Write)?;
635         Poll::Ready(Ok(()))
636     }
637 }
638 
639 #[cfg(windows)]
640 use std::os::windows::io::{AsRawSocket, RawSocket};
641 
642 #[cfg(windows)]
643 impl AsRawSocket for TcpStream {
as_raw_socket(&self) -> RawSocket644     fn as_raw_socket(&self) -> RawSocket {
645         self.source.as_raw_socket()
646     }
647 }
648 
649 #[cfg(unix)]
650 use std::os::fd::{AsRawFd, RawFd};
651 
652 #[cfg(target_os = "linux")]
653 use libc::{gid_t, uid_t};
654 #[cfg(unix)]
655 use ylong_io::Source;
656 
657 #[cfg(unix)]
658 impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd659     fn as_raw_fd(&self) -> RawFd {
660         self.source.get_fd()
661     }
662 }
663 
664 #[cfg(test)]
665 mod test {
666     use std::net::Ipv4Addr;
667     use std::time::Duration;
668 
669     use crate::io::AsyncWriteExt;
670     use crate::net::{TcpListener, TcpStream};
671 
672     const ADDR: &str = "127.0.0.1:0";
673 
674     /// UT test cases for `TcpStream`.
675     ///
676     /// # Brief
677     /// 1. Bind `TcpListener` and wait for `accept()`.
678     /// 2. `TcpStream` connect to listener.
679     /// 3. Call peer_addr(), local_addr(), set_ttl(), ttl(), set_nodelay(),
680     ///    nodelay(), take_error().
681     /// 4. Check result is correct.
682     #[test]
ut_tcp_stream_basic()683     fn ut_tcp_stream_basic() {
684         crate::block_on(async {
685             let listener = TcpListener::bind(ADDR).await.unwrap();
686             let addr = listener.local_addr().unwrap();
687 
688             let handle = crate::spawn(async move {
689                 let mut stream = TcpStream::connect(addr).await;
690                 while stream.is_err() {
691                     stream = TcpStream::connect(addr).await;
692                 }
693                 let stream = stream.unwrap();
694                 assert_eq!(stream.peer_addr().unwrap(), addr);
695                 assert_eq!(
696                     stream.local_addr().unwrap().ip(),
697                     Ipv4Addr::new(127, 0, 0, 1)
698                 );
699                 stream.set_ttl(101).unwrap();
700                 assert_eq!(stream.ttl().unwrap(), 101);
701                 stream.set_nodelay(true).unwrap();
702                 assert!(stream.nodelay().unwrap());
703                 assert!(stream.linger().unwrap().is_none());
704                 stream.set_linger(Some(Duration::from_secs(1))).unwrap();
705                 assert_eq!(stream.linger().unwrap(), Some(Duration::from_secs(1)));
706                 assert!(stream.take_error().unwrap().is_none());
707             });
708 
709             listener.accept().await.unwrap();
710 
711             handle.await.unwrap();
712         });
713     }
714 
715     /// UT test cases for `TcpStream`.
716     ///
717     /// # Brief
718     /// 1. Bind `TcpListener` and wait for `accept()`.
719     /// 2. `TcpStream` connect to listener.
720     /// 3. Call peek() to get.
721     /// 4. Check result is correct.
722     #[test]
ut_tcp_stream_peek()723     fn ut_tcp_stream_peek() {
724         crate::block_on(async {
725             let listener = TcpListener::bind(ADDR).await.unwrap();
726             let addr = listener.local_addr().unwrap();
727 
728             let handle = crate::spawn(async move {
729                 let mut stream = TcpStream::connect(addr).await;
730                 while stream.is_err() {
731                     stream = TcpStream::connect(addr).await;
732                 }
733                 let stream = stream.unwrap();
734                 let mut buf = [0; 100];
735                 let len = stream.peek(&mut buf).await.expect("peek failed");
736                 let buf = &buf[0..len];
737                 assert_eq!(len, 5);
738                 assert_eq!(String::from_utf8_lossy(buf), "hello");
739             });
740 
741             let (mut stream, _) = listener.accept().await.unwrap();
742             stream.write(b"hello").await.unwrap();
743 
744             handle.await.unwrap();
745         });
746     }
747 
748     /// UT test cases for `TcpStream`.
749     ///
750     /// # Brief
751     /// 1. Bind `TcpListener` and wait for `accept()`.
752     /// 2. After accept, try to write buf.
753     /// 2. `TcpStream` connect to listener and try to read buf.
754     /// 4. Check result is correct.
755     #[test]
ut_tcp_stream_try()756     fn ut_tcp_stream_try() {
757         crate::block_on(async {
758             let listener = TcpListener::bind(ADDR).await.unwrap();
759             let addr = listener.local_addr().unwrap();
760 
761             let handle = crate::spawn(async move {
762                 let mut stream = TcpStream::connect(addr).await;
763                 while stream.is_err() {
764                     stream = TcpStream::connect(addr).await;
765                 }
766                 let stream = stream.unwrap();
767                 let mut buf = vec![0; 5];
768                 stream.readable().await.unwrap();
769                 stream.try_read(&mut buf).unwrap();
770                 assert_eq!(buf, b"hello");
771             });
772 
773             let (stream, _) = listener.accept().await.unwrap();
774             stream.writable().await.unwrap();
775             stream.try_write(b"hello").unwrap();
776 
777             handle.await.unwrap();
778         });
779     }
780 }
781