1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 pub(crate) trait Dispatcher { 15 type Handle; 16 dispatch(&self) -> Option<Self::Handle>17 fn dispatch(&self) -> Option<Self::Handle>; 18 is_shutdown(&self) -> bool19 fn is_shutdown(&self) -> bool; 20 } 21 22 pub(crate) enum ConnDispatcher<S> { 23 #[cfg(feature = "http1_1")] 24 Http1(http1::Http1Dispatcher<S>), 25 26 #[cfg(feature = "http2")] 27 Http2(http2::Http2Dispatcher<S>), 28 } 29 30 impl<S> Dispatcher for ConnDispatcher<S> { 31 type Handle = Conn<S>; 32 dispatch(&self) -> Option<Self::Handle>33 fn dispatch(&self) -> Option<Self::Handle> { 34 match self { 35 #[cfg(feature = "http1_1")] 36 Self::Http1(h1) => h1.dispatch().map(Conn::Http1), 37 38 #[cfg(feature = "http2")] 39 Self::Http2(h2) => h2.dispatch().map(Conn::Http2), 40 } 41 } 42 is_shutdown(&self) -> bool43 fn is_shutdown(&self) -> bool { 44 match self { 45 #[cfg(feature = "http1_1")] 46 Self::Http1(h1) => h1.is_shutdown(), 47 48 #[cfg(feature = "http2")] 49 Self::Http2(h2) => h2.is_shutdown(), 50 } 51 } 52 } 53 54 pub(crate) enum Conn<S> { 55 #[cfg(feature = "http1_1")] 56 Http1(http1::Http1Conn<S>), 57 58 #[cfg(feature = "http2")] 59 Http2(http2::Http2Conn<S>), 60 } 61 62 #[cfg(feature = "http1_1")] 63 pub(crate) mod http1 { 64 use std::cell::UnsafeCell; 65 use std::sync::atomic::{AtomicBool, Ordering}; 66 use std::sync::Arc; 67 68 use super::{ConnDispatcher, Dispatcher}; 69 70 impl<S> ConnDispatcher<S> { http1(io: S) -> Self71 pub(crate) fn http1(io: S) -> Self { 72 Self::Http1(Http1Dispatcher::new(io)) 73 } 74 } 75 76 /// HTTP1-based connection manager, which can dispatch connections to other 77 /// threads according to HTTP1 syntax. 78 pub(crate) struct Http1Dispatcher<S> { 79 inner: Arc<Inner<S>>, 80 } 81 82 pub(crate) struct Inner<S> { 83 pub(crate) io: UnsafeCell<S>, 84 // `occupied` indicates that the connection is occupied. Only one coroutine 85 // can get the handle at the same time. Once the handle is fetched, the flag 86 // position is true. 87 pub(crate) occupied: AtomicBool, 88 // `shutdown` indicates that the connection need to be shut down. 89 pub(crate) shutdown: AtomicBool, 90 } 91 92 unsafe impl<S> Sync for Inner<S> {} 93 94 impl<S> Http1Dispatcher<S> { new(io: S) -> Self95 pub(crate) fn new(io: S) -> Self { 96 Self { 97 inner: Arc::new(Inner { 98 io: UnsafeCell::new(io), 99 occupied: AtomicBool::new(false), 100 shutdown: AtomicBool::new(false), 101 }), 102 } 103 } 104 } 105 106 impl<S> Dispatcher for Http1Dispatcher<S> { 107 type Handle = Http1Conn<S>; 108 dispatch(&self) -> Option<Self::Handle>109 fn dispatch(&self) -> Option<Self::Handle> { 110 self.inner 111 .occupied 112 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) 113 .ok() 114 .map(|_| Http1Conn { 115 inner: self.inner.clone(), 116 }) 117 } 118 is_shutdown(&self) -> bool119 fn is_shutdown(&self) -> bool { 120 self.inner.shutdown.load(Ordering::Relaxed) 121 } 122 } 123 124 /// Handle returned to other threads for I/O operations. 125 pub(crate) struct Http1Conn<S> { 126 pub(crate) inner: Arc<Inner<S>>, 127 } 128 129 impl<S> Http1Conn<S> { raw_mut(&mut self) -> &mut S130 pub(crate) fn raw_mut(&mut self) -> &mut S { 131 // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle 132 // at the same time. 133 unsafe { &mut *self.inner.io.get() } 134 } 135 shutdown(&self)136 pub(crate) fn shutdown(&self) { 137 self.inner.shutdown.store(true, Ordering::Release); 138 } 139 } 140 141 impl<S> Drop for Http1Conn<S> { drop(&mut self)142 fn drop(&mut self) { 143 self.inner.occupied.store(false, Ordering::Release) 144 } 145 } 146 } 147 148 #[cfg(feature = "http2")] 149 pub(crate) mod http2 { 150 use std::collections::HashMap; 151 use std::future::Future; 152 use std::marker::PhantomData; 153 use std::pin::Pin; 154 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; 155 use std::sync::{Arc, Mutex}; 156 use std::task::{Context, Poll}; 157 158 use ylong_http::error::HttpError; 159 use ylong_http::h2::{ 160 ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload, 161 RstStream, Settings, SettingsBuilder, 162 }; 163 164 use crate::runtime::{ 165 bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver, 166 BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf, 167 }; 168 use crate::util::config::H2Config; 169 use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; 170 use crate::util::h2::{ 171 ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData, 172 StreamEndState, Streams, 173 }; 174 use crate::ErrorKind::Request; 175 use crate::{ErrorKind, HttpClientError}; 176 177 const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1; 178 const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; 179 const DEFAULT_WINDOW_SIZE: u32 = 65535; 180 181 pub(crate) type ManagerSendFut = 182 Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>; 183 184 pub(crate) enum RespMessage { 185 Output(Frame), 186 OutputExit(DispatchErrorKind), 187 } 188 189 pub(crate) enum OutputMessage { 190 Output(Frame), 191 OutputExit(DispatchErrorKind), 192 } 193 194 pub(crate) struct ReqMessage { 195 pub(crate) id: u32, 196 pub(crate) sender: BoundedSender<RespMessage>, 197 pub(crate) request: RequestWrapper, 198 } 199 200 #[derive(Debug, Eq, PartialEq, Copy, Clone)] 201 pub(crate) enum DispatchErrorKind { 202 H2(H2Error), 203 Io(std::io::ErrorKind), 204 ChannelClosed, 205 Disconnect, 206 } 207 208 // HTTP2-based connection manager, which can dispatch connections to other 209 // threads according to HTTP2 syntax. 210 pub(crate) struct Http2Dispatcher<S> { 211 pub(crate) next_stream_id: StreamId, 212 pub(crate) allowed_cache: usize, 213 pub(crate) sender: UnboundedSender<ReqMessage>, 214 pub(crate) io_shutdown: Arc<AtomicBool>, 215 pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>, 216 pub(crate) _mark: PhantomData<S>, 217 } 218 219 pub(crate) struct Http2Conn<S> { 220 // Handle id 221 pub(crate) id: u32, 222 pub(crate) allow_cached_frames: usize, 223 // Sends frame to StreamController 224 pub(crate) sender: UnboundedSender<ReqMessage>, 225 pub(crate) receiver: RespReceiver, 226 pub(crate) io_shutdown: Arc<AtomicBool>, 227 pub(crate) _mark: PhantomData<S>, 228 } 229 230 pub(crate) struct StreamController { 231 // The connection close flag organizes new stream commits to the current connection when 232 // closed. 233 pub(crate) io_shutdown: Arc<AtomicBool>, 234 // The senders of all connected stream channels of response. 235 pub(crate) senders: HashMap<u32, BoundedSender<RespMessage>>, 236 pub(crate) curr_message: HashMap<u32, ManagerSendFut>, 237 // Stream information on the connection. 238 pub(crate) streams: Streams, 239 // Received GO_AWAY frame. 240 pub(crate) recved_go_away: Option<u32>, 241 // The last GO_AWAY frame sent by the client. 242 pub(crate) go_away_sync: GoAwaySync, 243 } 244 245 #[derive(Default)] 246 pub(crate) struct GoAwaySync { 247 pub(crate) going_away: Option<Goaway>, 248 } 249 250 #[derive(Default)] 251 pub(crate) struct SettingsSync { 252 pub(crate) settings: SettingsState, 253 } 254 255 #[derive(Default, Clone)] 256 pub(crate) enum SettingsState { 257 Acknowledging(Settings), 258 #[default] 259 Synced, 260 } 261 262 pub(crate) struct StreamId { 263 // TODO Determine the maximum value of id. 264 id: AtomicU32, 265 } 266 267 #[derive(Default)] 268 pub(crate) struct RespReceiver { 269 receiver: Option<BoundedReceiver<RespMessage>>, 270 } 271 272 impl<S> ConnDispatcher<S> 273 where 274 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 275 { http2(config: H2Config, io: S) -> Self276 pub(crate) fn http2(config: H2Config, io: S) -> Self { 277 Self::Http2(Http2Dispatcher::new(config, io)) 278 } 279 } 280 281 impl<S> Http2Dispatcher<S> 282 where 283 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, 284 { new(config: H2Config, io: S) -> Self285 pub(crate) fn new(config: H2Config, io: S) -> Self { 286 let settings = create_initial_settings(&config); 287 288 let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); 289 flow.setup_recv_window(config.conn_window_size()); 290 291 let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); 292 let shutdown_flag = Arc::new(AtomicBool::new(false)); 293 let controller = StreamController::new(streams, shutdown_flag.clone()); 294 295 // The id of the client stream, starting from 1 296 let next_stream_id = StreamId { 297 id: AtomicU32::new(1), 298 }; 299 let (input_tx, input_rx) = unbounded_channel(); 300 let (req_tx, req_rx) = unbounded_channel(); 301 302 // Error is not possible, so it is not handled for the time 303 // being. 304 let mut handles = Vec::with_capacity(3); 305 if input_tx.send(settings).is_ok() { 306 Self::launch( 307 config.allowed_cache_frame_size(), 308 config.use_huffman_coding(), 309 controller, 310 (input_tx, input_rx), 311 req_rx, 312 &mut handles, 313 io, 314 ); 315 } 316 Self { 317 next_stream_id, 318 allowed_cache: config.allowed_cache_frame_size(), 319 sender: req_tx, 320 io_shutdown: shutdown_flag, 321 handles, 322 _mark: PhantomData, 323 } 324 } 325 launch( allow_num: usize, use_huffman: bool, controller: StreamController, input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), req_rx: UnboundedReceiver<ReqMessage>, handles: &mut Vec<crate::runtime::JoinHandle<()>>, io: S, )326 fn launch( 327 allow_num: usize, 328 use_huffman: bool, 329 controller: StreamController, 330 input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), 331 req_rx: UnboundedReceiver<ReqMessage>, 332 handles: &mut Vec<crate::runtime::JoinHandle<()>>, 333 io: S, 334 ) { 335 let (resp_tx, resp_rx) = bounded_channel(allow_num); 336 let (read, write) = crate::runtime::split(io); 337 let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); 338 let send_settings_sync = settings_sync.clone(); 339 let send = crate::runtime::spawn(async move { 340 let mut writer = write; 341 if async_send_preface(&mut writer).await.is_ok() { 342 let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman); 343 let mut send = 344 SendData::new(encoder, send_settings_sync, writer, input_channel.1); 345 let _ = Pin::new(&mut send).await; 346 } 347 }); 348 handles.push(send); 349 350 let recv_settings_sync = settings_sync.clone(); 351 let recv = crate::runtime::spawn(async move { 352 let decoder = FrameDecoder::new(); 353 let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); 354 let _ = Pin::new(&mut recv).await; 355 }); 356 handles.push(recv); 357 358 let manager = crate::runtime::spawn(async move { 359 let mut conn_manager = 360 ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller); 361 let _ = Pin::new(&mut conn_manager).await; 362 }); 363 handles.push(manager); 364 } 365 } 366 367 impl<S> Dispatcher for Http2Dispatcher<S> { 368 type Handle = Http2Conn<S>; 369 dispatch(&self) -> Option<Self::Handle>370 fn dispatch(&self) -> Option<Self::Handle> { 371 let id = self.next_stream_id.generate_id(); 372 if id > DEFAULT_MAX_STREAM_ID { 373 return None; 374 } 375 let sender = self.sender.clone(); 376 let handle = Http2Conn::new(id, self.allowed_cache, self.io_shutdown.clone(), sender); 377 Some(handle) 378 } 379 is_shutdown(&self) -> bool380 fn is_shutdown(&self) -> bool { 381 self.io_shutdown.load(Ordering::Relaxed) 382 } 383 } 384 385 impl<S> Drop for Http2Dispatcher<S> { drop(&mut self)386 fn drop(&mut self) { 387 for handle in &self.handles { 388 #[cfg(feature = "ylong_base")] 389 handle.cancel(); 390 #[cfg(feature = "tokio_base")] 391 handle.abort(); 392 } 393 } 394 } 395 396 impl<S> Http2Conn<S> { new( id: u32, allow_cached_num: usize, io_shutdown: Arc<AtomicBool>, sender: UnboundedSender<ReqMessage>, ) -> Self397 pub(crate) fn new( 398 id: u32, 399 allow_cached_num: usize, 400 io_shutdown: Arc<AtomicBool>, 401 sender: UnboundedSender<ReqMessage>, 402 ) -> Self { 403 Self { 404 id, 405 allow_cached_frames: allow_cached_num, 406 sender, 407 receiver: RespReceiver::default(), 408 io_shutdown, 409 _mark: PhantomData, 410 } 411 } 412 send_frame_to_controller( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>413 pub(crate) fn send_frame_to_controller( 414 &mut self, 415 request: RequestWrapper, 416 ) -> Result<(), HttpClientError> { 417 let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames); 418 self.receiver.set_receiver(rx); 419 self.sender 420 .send(ReqMessage { 421 id: self.id, 422 sender: tx, 423 request, 424 }) 425 .map_err(|_| { 426 HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !") 427 }) 428 } 429 } 430 431 impl StreamId { generate_id(&self) -> u32432 fn generate_id(&self) -> u32 { 433 self.id.fetch_add(2, Ordering::Relaxed) 434 } 435 } 436 437 impl StreamController { new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self438 pub(crate) fn new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self { 439 Self { 440 io_shutdown: shutdown, 441 senders: HashMap::new(), 442 curr_message: HashMap::new(), 443 streams, 444 recved_go_away: None, 445 go_away_sync: GoAwaySync::default(), 446 } 447 } 448 shutdown(&self)449 pub(crate) fn shutdown(&self) { 450 self.io_shutdown.store(true, Ordering::Release); 451 } 452 get_unsent_streams( &mut self, last_stream_id: u32, ) -> Result<Vec<u32>, H2Error>453 pub(crate) fn get_unsent_streams( 454 &mut self, 455 last_stream_id: u32, 456 ) -> Result<Vec<u32>, H2Error> { 457 // The last-stream-id in the subsequent GO_AWAY frame 458 // cannot be greater than the last-stream-id in the previous GO_AWAY frame. 459 if self.streams.max_send_id < last_stream_id { 460 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 461 } 462 self.streams.max_send_id = last_stream_id; 463 Ok(self.streams.get_go_away_streams(last_stream_id)) 464 } 465 send_message_to_stream( &mut self, cx: &mut Context<'_>, stream_id: u32, message: RespMessage, ) -> Poll<Result<(), H2Error>>466 pub(crate) fn send_message_to_stream( 467 &mut self, 468 cx: &mut Context<'_>, 469 stream_id: u32, 470 message: RespMessage, 471 ) -> Poll<Result<(), H2Error>> { 472 if let Some(sender) = self.senders.get(&stream_id) { 473 // If the client coroutine has exited, this frame is skipped. 474 let mut tx = { 475 let sender = sender.clone(); 476 let ft = async move { sender.send(message).await }; 477 Box::pin(ft) 478 }; 479 480 match tx.as_mut().poll(cx) { 481 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 482 // The current coroutine sending the request exited prematurely. 483 Poll::Ready(Err(_)) => { 484 self.senders.remove(&stream_id); 485 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 486 } 487 Poll::Pending => { 488 self.curr_message.insert(stream_id, tx); 489 Poll::Pending 490 } 491 } 492 } else { 493 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) 494 } 495 } 496 poll_blocked_message( &mut self, cx: &mut Context<'_>, input_tx: &UnboundedSender<Frame>, ) -> Poll<()>497 pub(crate) fn poll_blocked_message( 498 &mut self, 499 cx: &mut Context<'_>, 500 input_tx: &UnboundedSender<Frame>, 501 ) -> Poll<()> { 502 let keys: Vec<u32> = self.curr_message.keys().cloned().collect(); 503 let mut blocked = false; 504 505 for key in keys { 506 if let Some(mut task) = self.curr_message.remove(&key) { 507 match task.as_mut().poll(cx) { 508 Poll::Ready(Ok(_)) => {} 509 // The current coroutine sending the request exited prematurely. 510 Poll::Ready(Err(_)) => { 511 self.senders.remove(&key); 512 if let Some(state) = self.streams.stream_state(key) { 513 if !matches!(state, H2StreamState::Closed(_)) { 514 if let StreamEndState::OK = self.streams.send_local_reset(key) { 515 let rest_payload = 516 RstStream::new(ErrorCode::NoError.into_code()); 517 let frame = Frame::new( 518 key as usize, 519 FrameFlags::empty(), 520 Payload::RstStream(rest_payload), 521 ); 522 // ignore the send error occurs here in order to finish all 523 // tasks. 524 let _ = input_tx.send(frame); 525 } 526 } 527 } 528 } 529 Poll::Pending => { 530 self.curr_message.insert(key, task); 531 blocked = true; 532 } 533 } 534 } 535 } 536 if blocked { 537 Poll::Pending 538 } else { 539 Poll::Ready(()) 540 } 541 } 542 } 543 544 impl RespReceiver { set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>)545 pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) { 546 self.receiver = Some(receiver); 547 } 548 recv(&mut self) -> Result<Frame, HttpClientError>549 pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> { 550 match self.receiver { 551 Some(ref mut receiver) => { 552 #[cfg(feature = "tokio_base")] 553 match receiver.recv().await { 554 None => err_from_msg!(Request, "Response Receiver Closed !"), 555 Some(message) => match message { 556 RespMessage::Output(frame) => Ok(frame), 557 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 558 }, 559 } 560 561 #[cfg(feature = "ylong_base")] 562 match receiver.recv().await { 563 Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)), 564 Ok(message) => match message { 565 RespMessage::Output(frame) => Ok(frame), 566 RespMessage::OutputExit(e) => Err(dispatch_client_error(e)), 567 }, 568 } 569 } 570 // this will not happen. 571 None => Err(HttpClientError::from_str( 572 ErrorKind::Request, 573 "Invalid Frame Receiver !", 574 )), 575 } 576 } 577 poll_recv( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Frame, HttpClientError>>578 pub(crate) fn poll_recv( 579 &mut self, 580 cx: &mut Context<'_>, 581 ) -> Poll<Result<Frame, HttpClientError>> { 582 if let Some(ref mut receiver) = self.receiver { 583 #[cfg(feature = "tokio_base")] 584 match receiver.poll_recv(cx) { 585 Poll::Ready(None) => { 586 Poll::Ready(err_from_msg!(Request, "Error receive response !")) 587 } 588 Poll::Ready(Some(message)) => match message { 589 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 590 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 591 }, 592 Poll::Pending => Poll::Pending, 593 } 594 595 #[cfg(feature = "ylong_base")] 596 match receiver.poll_recv(cx) { 597 Poll::Ready(Err(e)) => { 598 Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e))) 599 } 600 Poll::Ready(Ok(message)) => match message { 601 RespMessage::Output(frame) => Poll::Ready(Ok(frame)), 602 RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))), 603 }, 604 Poll::Pending => Poll::Pending, 605 } 606 } else { 607 Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !")) 608 } 609 } 610 } 611 async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> where S: AsyncWrite + Unpin,612 async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> 613 where 614 S: AsyncWrite + Unpin, 615 { 616 const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 617 writer 618 .write_all(PREFACE) 619 .await 620 .map_err(|e| DispatchErrorKind::Io(e.kind())) 621 } 622 create_initial_settings(config: &H2Config) -> Frame623 pub(crate) fn create_initial_settings(config: &H2Config) -> Frame { 624 let settings = SettingsBuilder::new() 625 .max_header_list_size(config.max_header_list_size()) 626 .max_frame_size(config.max_frame_size()) 627 .header_table_size(config.header_table_size()) 628 .enable_push(config.enable_push()) 629 .initial_window_size(config.stream_window_size()) 630 .build(); 631 632 Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)) 633 } 634 635 impl From<std::io::Error> for DispatchErrorKind { from(value: std::io::Error) -> Self636 fn from(value: std::io::Error) -> Self { 637 DispatchErrorKind::Io(value.kind()) 638 } 639 } 640 641 impl From<H2Error> for DispatchErrorKind { from(err: H2Error) -> Self642 fn from(err: H2Error) -> Self { 643 DispatchErrorKind::H2(err) 644 } 645 } 646 dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError647 pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError { 648 match dispatch_error { 649 DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)), 650 DispatchErrorKind::Io(e) => { 651 HttpClientError::from_io_error(Request, std::io::Error::from(e)) 652 } 653 DispatchErrorKind::ChannelClosed => { 654 HttpClientError::from_str(Request, "Coroutine channel closed.") 655 } 656 DispatchErrorKind::Disconnect => { 657 HttpClientError::from_str(Request, "remote peer closed.") 658 } 659 } 660 } 661 } 662 663 #[cfg(test)] 664 mod ut_dispatch { 665 use crate::dispatcher::{ConnDispatcher, Dispatcher}; 666 667 /// UT test cases for `ConnDispatcher::is_shutdown`. 668 /// 669 /// # Brief 670 /// 1. Creates a `ConnDispatcher`. 671 /// 2. Calls `ConnDispatcher::is_shutdown` to get the result. 672 /// 3. Calls `ConnDispatcher::dispatch` to get the result. 673 /// 4. Checks if the result is false. 674 #[test] ut_is_shutdown()675 fn ut_is_shutdown() { 676 let conn = ConnDispatcher::http1(b"Data"); 677 let res = conn.is_shutdown(); 678 assert!(!res); 679 let res = conn.dispatch(); 680 assert!(res.is_some()); 681 } 682 } 683