1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::fmt::{Debug, Formatter}; 15 use std::io; 16 use std::io::{IoSlice, Read, Write}; 17 use std::net::{Shutdown, SocketAddr}; 18 use std::pin::Pin; 19 use std::sync::Arc; 20 use std::task::{Context, Poll}; 21 use std::time::Duration; 22 23 use ylong_io::Interest; 24 25 use super::split::{SplitReadHalf, SplitWriteHalf}; 26 use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; 27 use crate::net::sys::tcp::split::{BorrowReadHalf, BorrowWriteHalf}; 28 use crate::net::sys::ToSocketAddrs; 29 use crate::net::AsyncSource; 30 31 /// An asynchronous version of [`std::net::TcpStream`] 32 /// 33 /// After creating a `TcpStream` by either connecting to a remote host or 34 /// accepting a connection on a `TcpListener`, data can be transmitted 35 /// asynchronously by reading and writing to it. 36 /// 37 /// 38 /// # Example 39 /// ```rust 40 /// use std::io; 41 /// use std::io::{IoSlice, IoSliceMut}; 42 /// 43 /// use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt}; 44 /// use ylong_runtime::net::TcpStream; 45 /// 46 /// async fn io_func() -> io::Result<()> { 47 /// let addr = "127.0.0.1:8080"; 48 /// let mut stream = TcpStream::connect(addr).await?; 49 /// 50 /// let _ = stream.write(b"hello client").await?; 51 /// let _ = stream 52 /// .write_vectored(&[IoSlice::new(b"hello client")]) 53 /// .await?; 54 /// 55 /// let mut read_buf = [0 as u8; 1024]; 56 /// let _ = stream.read(&mut read_buf).await?; 57 /// let _ = stream.read(&mut read_buf).await?; 58 /// Ok(()) 59 /// } 60 /// ``` 61 pub struct TcpStream { 62 pub(crate) source: AsyncSource<ylong_io::TcpStream>, 63 } 64 65 impl Debug for TcpStream { fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 67 self.source.fmt(f) 68 } 69 } 70 71 impl TcpStream { 72 /// Opens a TCP connection to a remote host asynchronously. 73 /// 74 /// # Note 75 /// 76 /// If there are multiple addresses in SocketAddr, it will attempt to 77 /// connect them in sequence until one of the addrs returns success. If 78 /// all connections fail, it returns the error of the last connection. 79 /// This behavior is consistent with std. 80 /// 81 /// # Panic 82 /// Calling this method outside of a Ylong Runtime could cause panic. 83 /// 84 /// # Example 85 /// ```rust 86 /// use std::io; 87 /// 88 /// use ylong_runtime::net::TcpStream; 89 /// 90 /// async fn io_func() -> io::Result<()> { 91 /// let addr = "127.0.0.1:8080"; 92 /// let mut stream = TcpStream::connect(addr).await?; 93 /// Ok(()) 94 /// } 95 /// ``` connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self>96 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<Self> { 97 let stream = super::super::addr::each_addr(addr, ylong_io::TcpStream::connect).await?; 98 Self::connect_inner(stream).await 99 } 100 connect_inner(stream: ylong_io::TcpStream) -> io::Result<Self>101 async fn connect_inner(stream: ylong_io::TcpStream) -> io::Result<Self> { 102 let stream = TcpStream::new(stream)?; 103 stream 104 .source 105 .async_process( 106 // Wait until the stream is writable 107 Interest::WRITABLE, 108 || Ok(()), 109 ) 110 .await?; 111 112 if let Some(e) = stream.source.take_error()? { 113 return Err(e); 114 } 115 Ok(stream) 116 } 117 118 // Registers the ylong_io::TcpStream's fd to the reactor, and returns async 119 // TcpStream. new(stream: ylong_io::TcpStream) -> io::Result<Self>120 pub(crate) fn new(stream: ylong_io::TcpStream) -> io::Result<Self> { 121 let source = AsyncSource::new(stream, None)?; 122 Ok(TcpStream { source }) 123 } 124 125 /// Waits for the socket to become readable. 126 /// 127 /// This function is usually paired up with [`TcpStream::try_read`] 128 /// 129 /// # Examples 130 /// 131 /// ```no_run 132 /// use std::io; 133 /// 134 /// use ylong_runtime::net::TcpStream; 135 /// 136 /// async fn io_func() -> io::Result<()> { 137 /// let local_addr = "127.0.0.1:8080"; 138 /// let stream = TcpStream::connect(local_addr).await?; 139 /// 140 /// let mut buf = vec![0; 12]; 141 /// 142 /// loop { 143 /// stream.readable().await?; 144 /// match stream.try_read(&mut buf) { 145 /// Ok(_) => break, 146 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} 147 /// Err(e) => panic!("{:?}", e), 148 /// } 149 /// } 150 /// 151 /// Ok(()) 152 /// } 153 /// ``` readable(&self) -> io::Result<()>154 pub async fn readable(&self) -> io::Result<()> { 155 self.source.entry.readiness(Interest::READABLE).await?; 156 Ok(()) 157 } 158 159 /// Attempts to read data from the stream to the provided buffer, returning 160 /// the number of bytes read. 161 /// 162 /// This function is usually paired up with [`TcpStream::readable`] 163 /// 164 /// # Examples 165 /// 166 /// ```no_run 167 /// use std::io; 168 /// 169 /// use ylong_runtime::net::TcpStream; 170 /// 171 /// async fn io_func() -> io::Result<()> { 172 /// let local_addr = "127.0.0.1:8080"; 173 /// let stream = TcpStream::connect(local_addr).await?; 174 /// 175 /// let mut buf = vec![0; 12]; 176 /// 177 /// loop { 178 /// stream.readable().await?; 179 /// match stream.try_read(&mut buf) { 180 /// Ok(_) => break, 181 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} 182 /// Err(e) => panic!("{:?}", e), 183 /// } 184 /// } 185 /// 186 /// Ok(()) 187 /// } 188 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>189 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 190 self.source 191 .try_io(Interest::READABLE, || (&*self.source).read(buf)) 192 } 193 194 /// Waits for the socket to become writable. 195 /// 196 /// This function is usually paired up with [`TcpStream::try_write`] 197 /// 198 /// # Examples 199 /// 200 /// ```no_run 201 /// use std::io; 202 /// 203 /// use ylong_runtime::net::TcpStream; 204 /// 205 /// async fn io_func() -> io::Result<()> { 206 /// let local_addr = "127.0.0.1:8080"; 207 /// let stream = TcpStream::connect(local_addr).await?; 208 /// 209 /// loop { 210 /// stream.writable().await?; 211 /// match stream.try_write(b"write message") { 212 /// Ok(_) => break, 213 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} 214 /// Err(e) => panic!("{:?}", e), 215 /// } 216 /// } 217 /// 218 /// Ok(()) 219 /// } 220 /// ``` writable(&self) -> io::Result<()>221 pub async fn writable(&self) -> io::Result<()> { 222 self.source.entry.readiness(Interest::WRITABLE).await?; 223 Ok(()) 224 } 225 226 /// Attempts to write data to the stream, returning the number of bytes 227 /// written. 228 /// 229 /// This function is usually paired up with [`TcpStream::writable`] 230 /// 231 /// # Examples 232 /// 233 /// ```no_run 234 /// use std::io; 235 /// 236 /// use ylong_runtime::net::TcpStream; 237 /// 238 /// async fn io_func() -> io::Result<()> { 239 /// let local_addr = "127.0.0.1:8080"; 240 /// let stream = TcpStream::connect(local_addr).await?; 241 /// 242 /// loop { 243 /// stream.writable().await?; 244 /// match stream.try_write(b"write message") { 245 /// Ok(_) => break, 246 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} 247 /// Err(e) => panic!("{:?}", e), 248 /// } 249 /// } 250 /// 251 /// Ok(()) 252 /// } 253 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>254 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 255 self.source 256 .try_io(Interest::WRITABLE, || (&*self.source).write(buf)) 257 } 258 259 /// Returns the socket address of the remote half of this TCP connection. 260 /// 261 /// # Example 262 /// 263 /// ```no_run 264 /// use std::io; 265 /// use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; 266 /// 267 /// use ylong_runtime::net::TcpStream; 268 /// 269 /// async fn io_func() -> io::Result<()> { 270 /// let addr = "127.0.0.1:1234"; 271 /// let stream = TcpStream::connect(addr) 272 /// .await 273 /// .expect("Couldn't connect to the server..."); 274 /// assert_eq!( 275 /// stream.local_addr().unwrap().ip(), 276 /// IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) 277 /// ); 278 /// Ok(()) 279 /// } 280 /// ``` local_addr(&self) -> io::Result<SocketAddr>281 pub fn local_addr(&self) -> io::Result<SocketAddr> { 282 self.source.local_addr() 283 } 284 285 /// Returns the socket address of the remote half of this TCP connection. 286 /// 287 /// # Example 288 /// 289 /// ```no_run 290 /// use std::io; 291 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; 292 /// 293 /// use ylong_runtime::net::TcpStream; 294 /// 295 /// async fn io_func() -> io::Result<()> { 296 /// let addr = "127.0.0.1:1234"; 297 /// let stream = TcpStream::connect(addr) 298 /// .await 299 /// .expect("Couldn't connect to the server..."); 300 /// assert_eq!( 301 /// stream.peer_addr().unwrap(), 302 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234)) 303 /// ); 304 /// Ok(()) 305 /// } 306 /// ``` peer_addr(&self) -> io::Result<SocketAddr>307 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 308 self.source.peer_addr() 309 } 310 311 /// Sets the value of the `TCP_NODELAY`. 312 /// 313 /// # Examples 314 /// 315 /// ```no_run 316 /// use std::io; 317 /// 318 /// use ylong_runtime::net::TcpStream; 319 /// 320 /// async fn io_func() -> io::Result<()> { 321 /// let addr = "127.0.0.1:1234"; 322 /// let stream = TcpStream::connect(addr) 323 /// .await 324 /// .expect("Couldn't connect to the server..."); 325 /// stream.set_nodelay(true).expect("set_nodelay call failed"); 326 /// Ok(()) 327 /// } 328 /// ``` set_nodelay(&self, nodelay: bool) -> io::Result<()>329 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 330 self.source.set_nodelay(nodelay) 331 } 332 333 /// Gets the value of the `TCP_NODELAY`. 334 /// 335 /// # Examples 336 /// 337 /// ```no_run 338 /// use std::io; 339 /// 340 /// use ylong_runtime::net::TcpStream; 341 /// 342 /// async fn io_func() -> io::Result<()> { 343 /// let addr = "127.0.0.1:1234"; 344 /// let stream = TcpStream::connect(addr) 345 /// .await 346 /// .expect("Couldn't connect to the server..."); 347 /// stream.set_nodelay(true).expect("set_nodelay call failed"); 348 /// assert_eq!(stream.nodelay().unwrap_or(false), true); 349 /// Ok(()) 350 /// } 351 /// ``` nodelay(&self) -> io::Result<bool>352 pub fn nodelay(&self) -> io::Result<bool> { 353 self.source.nodelay() 354 } 355 356 /// Sets the value of the linger on this socket by setting `SO_LINGER` 357 /// option. 358 /// 359 /// This value controls how the socket close when a stream has unsent data. 360 /// If SO_LINGER is set, the socket will still open for the duration as 361 /// the system attempts to send pending data. Otherwise, the system may 362 /// close the socket immediately, or wait for a default timeout. 363 /// 364 /// # Examples 365 /// 366 /// ```no_run 367 /// use std::io; 368 /// 369 /// use ylong_runtime::net::TcpStream; 370 /// 371 /// async fn io_func() -> io::Result<()> { 372 /// let addr = "127.0.0.1:1234"; 373 /// let stream = TcpStream::connect(addr) 374 /// .await 375 /// .expect("Couldn't connect to the server..."); 376 /// 377 /// stream.set_linger(None).expect("Sets linger fail."); 378 /// Ok(()) 379 /// } 380 /// ``` set_linger(&self, linger: Option<Duration>) -> io::Result<()>381 pub fn set_linger(&self, linger: Option<Duration>) -> io::Result<()> { 382 self.source.set_linger(linger) 383 } 384 385 /// Gets the value of the linger on this socket by getting `SO_LINGER` 386 /// option. 387 /// 388 /// # Examples 389 /// 390 /// ```no_run 391 /// use std::io; 392 /// 393 /// use ylong_runtime::net::TcpStream; 394 /// 395 /// async fn io_func() -> io::Result<()> { 396 /// let addr = "127.0.0.1:1234"; 397 /// let stream = TcpStream::connect(addr) 398 /// .await 399 /// .expect("Couldn't connect to the server..."); 400 /// 401 /// println!("{:?}", stream.linger()); 402 /// Ok(()) 403 /// } 404 /// ``` linger(&self) -> io::Result<Option<Duration>>405 pub fn linger(&self) -> io::Result<Option<Duration>> { 406 self.source.linger() 407 } 408 409 /// Sets the value for the `IP_TTL`. 410 /// 411 /// # Examples 412 /// 413 /// ```no_run 414 /// use std::io; 415 /// 416 /// use ylong_runtime::net::TcpStream; 417 /// 418 /// async fn io_func() -> io::Result<()> { 419 /// let addr = "127.0.0.1:1234"; 420 /// let stream = TcpStream::connect(addr) 421 /// .await 422 /// .expect("Couldn't connect to the server..."); 423 /// stream.set_ttl(100).expect("set_ttl call failed"); 424 /// Ok(()) 425 /// } 426 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>427 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 428 self.source.set_ttl(ttl) 429 } 430 431 /// Gets the value of the `IP_TTL`. 432 /// 433 /// # Examples 434 /// 435 /// ```no_run 436 /// use std::io; 437 /// 438 /// use ylong_runtime::net::TcpStream; 439 /// 440 /// async fn io_func() -> io::Result<()> { 441 /// let addr = "127.0.0.1:1234"; 442 /// let stream = TcpStream::connect(addr) 443 /// .await 444 /// .expect("Couldn't connect to the server..."); 445 /// stream.set_ttl(100).expect("set_ttl call failed"); 446 /// assert_eq!(stream.ttl().unwrap_or(0), 100); 447 /// Ok(()) 448 /// } 449 /// ``` ttl(&self) -> io::Result<u32>450 pub fn ttl(&self) -> io::Result<u32> { 451 self.source.ttl() 452 } 453 454 /// Splits a TcpStream into a read half and a write half with reference, 455 /// which can be used to read and write the stream concurrently. 456 /// 457 /// # Example 458 /// 459 /// ```no_run 460 /// use std::io; 461 /// 462 /// use ylong_runtime::net::TcpStream; 463 /// 464 /// async fn io_func() -> io::Result<()> { 465 /// let addr = "127.0.0.1:1234"; 466 /// let mut stream = TcpStream::connect(addr) 467 /// .await 468 /// .expect("Couldn't connect to the server..."); 469 /// let (read, write) = stream.split(); 470 /// Ok(()) 471 /// } 472 /// ``` split(&mut self) -> (BorrowReadHalf, BorrowWriteHalf)473 pub fn split(&mut self) -> (BorrowReadHalf, BorrowWriteHalf) { 474 let read = BorrowReadHalf(self); 475 let write = BorrowWriteHalf(self); 476 (read, write) 477 } 478 479 /// Splits a TcpStream into a read half and a write half, 480 /// which can be used to read and write the stream concurrently. 481 /// 482 /// # Example 483 /// 484 /// ```no_run 485 /// use std::io; 486 /// 487 /// use ylong_runtime::net::TcpStream; 488 /// 489 /// async fn io_func() -> io::Result<()> { 490 /// let addr = "127.0.0.1:1234"; 491 /// let stream = TcpStream::connect(addr) 492 /// .await 493 /// .expect("Couldn't connect to the server..."); 494 /// let (read, write) = stream.into_split(); 495 /// Ok(()) 496 /// } 497 /// ``` into_split(self) -> (SplitReadHalf, SplitWriteHalf)498 pub fn into_split(self) -> (SplitReadHalf, SplitWriteHalf) { 499 let arc = Arc::new(self); 500 let read = SplitReadHalf(Arc::clone(&arc)); 501 let write = SplitWriteHalf(Arc::clone(&arc)); 502 (read, write) 503 } 504 505 /// Receives data on the socket from the remote address to which it is 506 /// connected, without removing that data from the queue. 507 /// On success, returns the number of bytes peeked. 508 /// 509 /// # Example 510 /// 511 /// ```no_run 512 /// use std::io; 513 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; 514 /// 515 /// use ylong_runtime::net::TcpStream; 516 /// 517 /// async fn io_func() -> io::Result<()> { 518 /// let addr = "127.0.0.1:1234"; 519 /// let stream = TcpStream::connect(addr) 520 /// .await 521 /// .expect("Couldn't connect to the server..."); 522 /// let mut buf = [0; 10]; 523 /// let len = stream.peek(&mut buf).await.expect("peek failed"); 524 /// Ok(()) 525 /// } 526 /// ``` peek(&self, buf: &mut [u8]) -> io::Result<usize>527 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { 528 self.source 529 .async_process(Interest::READABLE, || self.source.peek(buf)) 530 .await 531 } 532 533 /// Gets the value of the SO_ERROR. 534 /// 535 /// # Example 536 /// 537 /// ```no_run 538 /// use std::io; 539 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; 540 /// 541 /// use ylong_runtime::net::TcpStream; 542 /// 543 /// async fn io_func() -> io::Result<()> { 544 /// let addr = "127.0.0.1:1234"; 545 /// let stream = TcpStream::connect(addr) 546 /// .await 547 /// .expect("Couldn't connect to the server..."); 548 /// match stream.take_error() { 549 /// Ok(Some(error)) => println!("TcpStream error: {error:?}"), 550 /// Ok(None) => println!("No error"), 551 /// Err(error) => println!("TcpStream.take_error failed: {error:?}"), 552 /// } 553 /// Ok(()) 554 /// } 555 /// ``` take_error(&self) -> io::Result<Option<io::Error>>556 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 557 self.source.take_error() 558 } 559 560 // todo: make this async 561 /// Shutdown TcpStream shutdown(&self, how: Shutdown) -> io::Result<()>562 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 563 self.source.shutdown(how) 564 } 565 566 /// Sets the owner for this source's fd 567 /// 568 /// # Error 569 /// This method calls libc::fchown, libc::fchown returns the following 570 /// errors [`libc::EBADF`]: The fd argument is not an open file descriptor. 571 /// [`libc::EPERM`]: The effective user ID does not match the owner of the file or the process does not have appropriate privilege and _POSIX_CHOWN_RESTRICTED indicates that such privilege is required. 572 /// [`libc::EROFS`]:The file referred to by fildes resides on a read-only file system. 573 /// [`libc::EINVAL`]: The owner or group ID is not a value supported by the implementation. 574 /// [`libc::EIO`]: A physical I/O error has occurred. 575 /// [`libc::EINTR`]: The fchown() function was interrupted by a signal which was caught. 576 /// 577 /// # Example 578 /// ```no_run 579 /// use std::io; 580 /// 581 /// use ylong_runtime::net::TcpStream; 582 /// 583 /// async fn io_func() -> io::Result<()> { 584 /// let addr = "127.0.0.1:1234"; 585 /// let stream = TcpStream::connect(addr) 586 /// .await 587 /// .expect("Couldn't connect to the server..."); 588 /// stream.fchown(0, 0)?; 589 /// Ok(()) 590 /// } 591 /// ``` 592 #[cfg(target_os = "linux")] fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()>593 pub fn fchown(&self, uid: uid_t, gid: gid_t) -> io::Result<()> { 594 self.source.fchown(uid, gid) 595 } 596 } 597 598 impl AsyncRead for TcpStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>599 fn poll_read( 600 self: Pin<&mut Self>, 601 cx: &mut Context<'_>, 602 buf: &mut ReadBuf<'_>, 603 ) -> Poll<io::Result<()>> { 604 self.source.poll_read(cx, buf) 605 } 606 } 607 608 impl AsyncWrite for TcpStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>609 fn poll_write( 610 self: Pin<&mut Self>, 611 cx: &mut Context<'_>, 612 buf: &[u8], 613 ) -> Poll<io::Result<usize>> { 614 self.source.poll_write(cx, buf) 615 } 616 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<io::Result<usize>>617 fn poll_write_vectored( 618 self: Pin<&mut Self>, 619 cx: &mut Context<'_>, 620 bufs: &[IoSlice<'_>], 621 ) -> Poll<io::Result<usize>> { 622 self.source.poll_write_vectored(cx, bufs) 623 } 624 is_write_vectored(&self) -> bool625 fn is_write_vectored(&self) -> bool { 626 true 627 } 628 poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>629 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 630 Poll::Ready(Ok(())) 631 } 632 poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>633 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 634 self.source.shutdown(std::net::Shutdown::Write)?; 635 Poll::Ready(Ok(())) 636 } 637 } 638 639 #[cfg(windows)] 640 use std::os::windows::io::{AsRawSocket, RawSocket}; 641 642 #[cfg(windows)] 643 impl AsRawSocket for TcpStream { as_raw_socket(&self) -> RawSocket644 fn as_raw_socket(&self) -> RawSocket { 645 self.source.as_raw_socket() 646 } 647 } 648 649 #[cfg(unix)] 650 use std::os::fd::{AsRawFd, RawFd}; 651 652 #[cfg(target_os = "linux")] 653 use libc::{gid_t, uid_t}; 654 #[cfg(unix)] 655 use ylong_io::Source; 656 657 #[cfg(unix)] 658 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd659 fn as_raw_fd(&self) -> RawFd { 660 self.source.get_fd() 661 } 662 } 663 664 #[cfg(test)] 665 mod test { 666 use std::net::Ipv4Addr; 667 use std::time::Duration; 668 669 use crate::io::AsyncWriteExt; 670 use crate::net::{TcpListener, TcpStream}; 671 672 const ADDR: &str = "127.0.0.1:0"; 673 674 /// UT test cases for `TcpStream`. 675 /// 676 /// # Brief 677 /// 1. Bind `TcpListener` and wait for `accept()`. 678 /// 2. `TcpStream` connect to listener. 679 /// 3. Call peer_addr(), local_addr(), set_ttl(), ttl(), set_nodelay(), 680 /// nodelay(), take_error(). 681 /// 4. Check result is correct. 682 #[test] ut_tcp_stream_basic()683 fn ut_tcp_stream_basic() { 684 crate::block_on(async { 685 let listener = TcpListener::bind(ADDR).await.unwrap(); 686 let addr = listener.local_addr().unwrap(); 687 688 let handle = crate::spawn(async move { 689 let mut stream = TcpStream::connect(addr).await; 690 while stream.is_err() { 691 stream = TcpStream::connect(addr).await; 692 } 693 let stream = stream.unwrap(); 694 assert_eq!(stream.peer_addr().unwrap(), addr); 695 assert_eq!( 696 stream.local_addr().unwrap().ip(), 697 Ipv4Addr::new(127, 0, 0, 1) 698 ); 699 stream.set_ttl(101).unwrap(); 700 assert_eq!(stream.ttl().unwrap(), 101); 701 stream.set_nodelay(true).unwrap(); 702 assert!(stream.nodelay().unwrap()); 703 assert!(stream.linger().unwrap().is_none()); 704 stream.set_linger(Some(Duration::from_secs(1))).unwrap(); 705 assert_eq!(stream.linger().unwrap(), Some(Duration::from_secs(1))); 706 assert!(stream.take_error().unwrap().is_none()); 707 }); 708 709 listener.accept().await.unwrap(); 710 711 handle.await.unwrap(); 712 }); 713 } 714 715 /// UT test cases for `TcpStream`. 716 /// 717 /// # Brief 718 /// 1. Bind `TcpListener` and wait for `accept()`. 719 /// 2. `TcpStream` connect to listener. 720 /// 3. Call peek() to get. 721 /// 4. Check result is correct. 722 #[test] ut_tcp_stream_peek()723 fn ut_tcp_stream_peek() { 724 crate::block_on(async { 725 let listener = TcpListener::bind(ADDR).await.unwrap(); 726 let addr = listener.local_addr().unwrap(); 727 728 let handle = crate::spawn(async move { 729 let mut stream = TcpStream::connect(addr).await; 730 while stream.is_err() { 731 stream = TcpStream::connect(addr).await; 732 } 733 let stream = stream.unwrap(); 734 let mut buf = [0; 100]; 735 let len = stream.peek(&mut buf).await.expect("peek failed"); 736 let buf = &buf[0..len]; 737 assert_eq!(len, 5); 738 assert_eq!(String::from_utf8_lossy(buf), "hello"); 739 }); 740 741 let (mut stream, _) = listener.accept().await.unwrap(); 742 stream.write(b"hello").await.unwrap(); 743 744 handle.await.unwrap(); 745 }); 746 } 747 748 /// UT test cases for `TcpStream`. 749 /// 750 /// # Brief 751 /// 1. Bind `TcpListener` and wait for `accept()`. 752 /// 2. After accept, try to write buf. 753 /// 2. `TcpStream` connect to listener and try to read buf. 754 /// 4. Check result is correct. 755 #[test] ut_tcp_stream_try()756 fn ut_tcp_stream_try() { 757 crate::block_on(async { 758 let listener = TcpListener::bind(ADDR).await.unwrap(); 759 let addr = listener.local_addr().unwrap(); 760 761 let handle = crate::spawn(async move { 762 let mut stream = TcpStream::connect(addr).await; 763 while stream.is_err() { 764 stream = TcpStream::connect(addr).await; 765 } 766 let stream = stream.unwrap(); 767 let mut buf = vec![0; 5]; 768 stream.readable().await.unwrap(); 769 stream.try_read(&mut buf).unwrap(); 770 assert_eq!(buf, b"hello"); 771 }); 772 773 let (stream, _) = listener.accept().await.unwrap(); 774 stream.writable().await.unwrap(); 775 stream.try_write(b"hello").unwrap(); 776 777 handle.await.unwrap(); 778 }); 779 } 780 } 781