1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 pub(crate) trait Dispatcher {
15     type Handle;
16 
dispatch(&self) -> Option<Self::Handle>17     fn dispatch(&self) -> Option<Self::Handle>;
18 
is_shutdown(&self) -> bool19     fn is_shutdown(&self) -> bool;
20 }
21 
22 pub(crate) enum ConnDispatcher<S> {
23     #[cfg(feature = "http1_1")]
24     Http1(http1::Http1Dispatcher<S>),
25 
26     #[cfg(feature = "http2")]
27     Http2(http2::Http2Dispatcher<S>),
28 }
29 
30 impl<S> Dispatcher for ConnDispatcher<S> {
31     type Handle = Conn<S>;
32 
dispatch(&self) -> Option<Self::Handle>33     fn dispatch(&self) -> Option<Self::Handle> {
34         match self {
35             #[cfg(feature = "http1_1")]
36             Self::Http1(h1) => h1.dispatch().map(Conn::Http1),
37 
38             #[cfg(feature = "http2")]
39             Self::Http2(h2) => h2.dispatch().map(Conn::Http2),
40         }
41     }
42 
is_shutdown(&self) -> bool43     fn is_shutdown(&self) -> bool {
44         match self {
45             #[cfg(feature = "http1_1")]
46             Self::Http1(h1) => h1.is_shutdown(),
47 
48             #[cfg(feature = "http2")]
49             Self::Http2(h2) => h2.is_shutdown(),
50         }
51     }
52 }
53 
54 pub(crate) enum Conn<S> {
55     #[cfg(feature = "http1_1")]
56     Http1(http1::Http1Conn<S>),
57 
58     #[cfg(feature = "http2")]
59     Http2(http2::Http2Conn<S>),
60 }
61 
62 #[cfg(feature = "http1_1")]
63 pub(crate) mod http1 {
64     use std::cell::UnsafeCell;
65     use std::sync::atomic::{AtomicBool, Ordering};
66     use std::sync::Arc;
67 
68     use super::{ConnDispatcher, Dispatcher};
69 
70     impl<S> ConnDispatcher<S> {
http1(io: S) -> Self71         pub(crate) fn http1(io: S) -> Self {
72             Self::Http1(Http1Dispatcher::new(io))
73         }
74     }
75 
76     /// HTTP1-based connection manager, which can dispatch connections to other
77     /// threads according to HTTP1 syntax.
78     pub(crate) struct Http1Dispatcher<S> {
79         inner: Arc<Inner<S>>,
80     }
81 
82     pub(crate) struct Inner<S> {
83         pub(crate) io: UnsafeCell<S>,
84         // `occupied` indicates that the connection is occupied. Only one coroutine
85         // can get the handle at the same time. Once the handle is fetched, the flag
86         // position is true.
87         pub(crate) occupied: AtomicBool,
88         // `shutdown` indicates that the connection need to be shut down.
89         pub(crate) shutdown: AtomicBool,
90     }
91 
92     unsafe impl<S> Sync for Inner<S> {}
93 
94     impl<S> Http1Dispatcher<S> {
new(io: S) -> Self95         pub(crate) fn new(io: S) -> Self {
96             Self {
97                 inner: Arc::new(Inner {
98                     io: UnsafeCell::new(io),
99                     occupied: AtomicBool::new(false),
100                     shutdown: AtomicBool::new(false),
101                 }),
102             }
103         }
104     }
105 
106     impl<S> Dispatcher for Http1Dispatcher<S> {
107         type Handle = Http1Conn<S>;
108 
dispatch(&self) -> Option<Self::Handle>109         fn dispatch(&self) -> Option<Self::Handle> {
110             self.inner
111                 .occupied
112                 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
113                 .ok()
114                 .map(|_| Http1Conn {
115                     inner: self.inner.clone(),
116                 })
117         }
118 
is_shutdown(&self) -> bool119         fn is_shutdown(&self) -> bool {
120             self.inner.shutdown.load(Ordering::Relaxed)
121         }
122     }
123 
124     /// Handle returned to other threads for I/O operations.
125     pub(crate) struct Http1Conn<S> {
126         pub(crate) inner: Arc<Inner<S>>,
127     }
128 
129     impl<S> Http1Conn<S> {
raw_mut(&mut self) -> &mut S130         pub(crate) fn raw_mut(&mut self) -> &mut S {
131             // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle
132             // at the same time.
133             unsafe { &mut *self.inner.io.get() }
134         }
135 
shutdown(&self)136         pub(crate) fn shutdown(&self) {
137             self.inner.shutdown.store(true, Ordering::Release);
138         }
139     }
140 
141     impl<S> Drop for Http1Conn<S> {
drop(&mut self)142         fn drop(&mut self) {
143             self.inner.occupied.store(false, Ordering::Release)
144         }
145     }
146 }
147 
148 #[cfg(feature = "http2")]
149 pub(crate) mod http2 {
150     use std::collections::HashMap;
151     use std::future::Future;
152     use std::marker::PhantomData;
153     use std::pin::Pin;
154     use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
155     use std::sync::{Arc, Mutex};
156     use std::task::{Context, Poll};
157 
158     use ylong_http::error::HttpError;
159     use ylong_http::h2::{
160         ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload,
161         RstStream, Settings, SettingsBuilder,
162     };
163 
164     use crate::runtime::{
165         bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver,
166         BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf,
167     };
168     use crate::util::config::H2Config;
169     use crate::util::dispatcher::{ConnDispatcher, Dispatcher};
170     use crate::util::h2::{
171         ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData,
172         StreamEndState, Streams,
173     };
174     use crate::ErrorKind::Request;
175     use crate::{ErrorKind, HttpClientError};
176 
177     const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1;
178     const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13;
179     const DEFAULT_WINDOW_SIZE: u32 = 65535;
180 
181     pub(crate) type ManagerSendFut =
182         Pin<Box<dyn Future<Output = Result<(), SendError<RespMessage>>> + Send + Sync>>;
183 
184     pub(crate) enum RespMessage {
185         Output(Frame),
186         OutputExit(DispatchErrorKind),
187     }
188 
189     pub(crate) enum OutputMessage {
190         Output(Frame),
191         OutputExit(DispatchErrorKind),
192     }
193 
194     pub(crate) struct ReqMessage {
195         pub(crate) id: u32,
196         pub(crate) sender: BoundedSender<RespMessage>,
197         pub(crate) request: RequestWrapper,
198     }
199 
200     #[derive(Debug, Eq, PartialEq, Copy, Clone)]
201     pub(crate) enum DispatchErrorKind {
202         H2(H2Error),
203         Io(std::io::ErrorKind),
204         ChannelClosed,
205         Disconnect,
206     }
207 
208     // HTTP2-based connection manager, which can dispatch connections to other
209     // threads according to HTTP2 syntax.
210     pub(crate) struct Http2Dispatcher<S> {
211         pub(crate) next_stream_id: StreamId,
212         pub(crate) allowed_cache: usize,
213         pub(crate) sender: UnboundedSender<ReqMessage>,
214         pub(crate) io_shutdown: Arc<AtomicBool>,
215         pub(crate) handles: Vec<crate::runtime::JoinHandle<()>>,
216         pub(crate) _mark: PhantomData<S>,
217     }
218 
219     pub(crate) struct Http2Conn<S> {
220         // Handle id
221         pub(crate) id: u32,
222         pub(crate) allow_cached_frames: usize,
223         // Sends frame to StreamController
224         pub(crate) sender: UnboundedSender<ReqMessage>,
225         pub(crate) receiver: RespReceiver,
226         pub(crate) io_shutdown: Arc<AtomicBool>,
227         pub(crate) _mark: PhantomData<S>,
228     }
229 
230     pub(crate) struct StreamController {
231         // The connection close flag organizes new stream commits to the current connection when
232         // closed.
233         pub(crate) io_shutdown: Arc<AtomicBool>,
234         // The senders of all connected stream channels of response.
235         pub(crate) senders: HashMap<u32, BoundedSender<RespMessage>>,
236         pub(crate) curr_message: HashMap<u32, ManagerSendFut>,
237         // Stream information on the connection.
238         pub(crate) streams: Streams,
239         // Received GO_AWAY frame.
240         pub(crate) recved_go_away: Option<u32>,
241         // The last GO_AWAY frame sent by the client.
242         pub(crate) go_away_sync: GoAwaySync,
243     }
244 
245     #[derive(Default)]
246     pub(crate) struct GoAwaySync {
247         pub(crate) going_away: Option<Goaway>,
248     }
249 
250     #[derive(Default)]
251     pub(crate) struct SettingsSync {
252         pub(crate) settings: SettingsState,
253     }
254 
255     #[derive(Default, Clone)]
256     pub(crate) enum SettingsState {
257         Acknowledging(Settings),
258         #[default]
259         Synced,
260     }
261 
262     pub(crate) struct StreamId {
263         // TODO Determine the maximum value of id.
264         id: AtomicU32,
265     }
266 
267     #[derive(Default)]
268     pub(crate) struct RespReceiver {
269         receiver: Option<BoundedReceiver<RespMessage>>,
270     }
271 
272     impl<S> ConnDispatcher<S>
273     where
274         S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
275     {
http2(config: H2Config, io: S) -> Self276         pub(crate) fn http2(config: H2Config, io: S) -> Self {
277             Self::Http2(Http2Dispatcher::new(config, io))
278         }
279     }
280 
281     impl<S> Http2Dispatcher<S>
282     where
283         S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
284     {
new(config: H2Config, io: S) -> Self285         pub(crate) fn new(config: H2Config, io: S) -> Self {
286             let settings = create_initial_settings(&config);
287 
288             let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
289             flow.setup_recv_window(config.conn_window_size());
290 
291             let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow);
292             let shutdown_flag = Arc::new(AtomicBool::new(false));
293             let controller = StreamController::new(streams, shutdown_flag.clone());
294 
295             // The id of the client stream, starting from 1
296             let next_stream_id = StreamId {
297                 id: AtomicU32::new(1),
298             };
299             let (input_tx, input_rx) = unbounded_channel();
300             let (req_tx, req_rx) = unbounded_channel();
301 
302             // Error is not possible, so it is not handled for the time
303             // being.
304             let mut handles = Vec::with_capacity(3);
305             if input_tx.send(settings).is_ok() {
306                 Self::launch(
307                     config.allowed_cache_frame_size(),
308                     config.use_huffman_coding(),
309                     controller,
310                     (input_tx, input_rx),
311                     req_rx,
312                     &mut handles,
313                     io,
314                 );
315             }
316             Self {
317                 next_stream_id,
318                 allowed_cache: config.allowed_cache_frame_size(),
319                 sender: req_tx,
320                 io_shutdown: shutdown_flag,
321                 handles,
322                 _mark: PhantomData,
323             }
324         }
325 
launch( allow_num: usize, use_huffman: bool, controller: StreamController, input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>), req_rx: UnboundedReceiver<ReqMessage>, handles: &mut Vec<crate::runtime::JoinHandle<()>>, io: S, )326         fn launch(
327             allow_num: usize,
328             use_huffman: bool,
329             controller: StreamController,
330             input_channel: (UnboundedSender<Frame>, UnboundedReceiver<Frame>),
331             req_rx: UnboundedReceiver<ReqMessage>,
332             handles: &mut Vec<crate::runtime::JoinHandle<()>>,
333             io: S,
334         ) {
335             let (resp_tx, resp_rx) = bounded_channel(allow_num);
336             let (read, write) = crate::runtime::split(io);
337             let settings_sync = Arc::new(Mutex::new(SettingsSync::default()));
338             let send_settings_sync = settings_sync.clone();
339             let send = crate::runtime::spawn(async move {
340                 let mut writer = write;
341                 if async_send_preface(&mut writer).await.is_ok() {
342                     let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman);
343                     let mut send =
344                         SendData::new(encoder, send_settings_sync, writer, input_channel.1);
345                     let _ = Pin::new(&mut send).await;
346                 }
347             });
348             handles.push(send);
349 
350             let recv_settings_sync = settings_sync.clone();
351             let recv = crate::runtime::spawn(async move {
352                 let decoder = FrameDecoder::new();
353                 let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx);
354                 let _ = Pin::new(&mut recv).await;
355             });
356             handles.push(recv);
357 
358             let manager = crate::runtime::spawn(async move {
359                 let mut conn_manager =
360                     ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller);
361                 let _ = Pin::new(&mut conn_manager).await;
362             });
363             handles.push(manager);
364         }
365     }
366 
367     impl<S> Dispatcher for Http2Dispatcher<S> {
368         type Handle = Http2Conn<S>;
369 
dispatch(&self) -> Option<Self::Handle>370         fn dispatch(&self) -> Option<Self::Handle> {
371             let id = self.next_stream_id.generate_id();
372             if id > DEFAULT_MAX_STREAM_ID {
373                 return None;
374             }
375             let sender = self.sender.clone();
376             let handle = Http2Conn::new(id, self.allowed_cache, self.io_shutdown.clone(), sender);
377             Some(handle)
378         }
379 
is_shutdown(&self) -> bool380         fn is_shutdown(&self) -> bool {
381             self.io_shutdown.load(Ordering::Relaxed)
382         }
383     }
384 
385     impl<S> Drop for Http2Dispatcher<S> {
drop(&mut self)386         fn drop(&mut self) {
387             for handle in &self.handles {
388                 #[cfg(feature = "ylong_base")]
389                 handle.cancel();
390                 #[cfg(feature = "tokio_base")]
391                 handle.abort();
392             }
393         }
394     }
395 
396     impl<S> Http2Conn<S> {
new( id: u32, allow_cached_num: usize, io_shutdown: Arc<AtomicBool>, sender: UnboundedSender<ReqMessage>, ) -> Self397         pub(crate) fn new(
398             id: u32,
399             allow_cached_num: usize,
400             io_shutdown: Arc<AtomicBool>,
401             sender: UnboundedSender<ReqMessage>,
402         ) -> Self {
403             Self {
404                 id,
405                 allow_cached_frames: allow_cached_num,
406                 sender,
407                 receiver: RespReceiver::default(),
408                 io_shutdown,
409                 _mark: PhantomData,
410             }
411         }
412 
send_frame_to_controller( &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError>413         pub(crate) fn send_frame_to_controller(
414             &mut self,
415             request: RequestWrapper,
416         ) -> Result<(), HttpClientError> {
417             let (tx, rx) = bounded_channel::<RespMessage>(self.allow_cached_frames);
418             self.receiver.set_receiver(rx);
419             self.sender
420                 .send(ReqMessage {
421                     id: self.id,
422                     sender: tx,
423                     request,
424                 })
425                 .map_err(|_| {
426                     HttpClientError::from_str(ErrorKind::Request, "Request Sender Closed !")
427                 })
428         }
429     }
430 
431     impl StreamId {
generate_id(&self) -> u32432         fn generate_id(&self) -> u32 {
433             self.id.fetch_add(2, Ordering::Relaxed)
434         }
435     }
436 
437     impl StreamController {
new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self438         pub(crate) fn new(streams: Streams, shutdown: Arc<AtomicBool>) -> Self {
439             Self {
440                 io_shutdown: shutdown,
441                 senders: HashMap::new(),
442                 curr_message: HashMap::new(),
443                 streams,
444                 recved_go_away: None,
445                 go_away_sync: GoAwaySync::default(),
446             }
447         }
448 
shutdown(&self)449         pub(crate) fn shutdown(&self) {
450             self.io_shutdown.store(true, Ordering::Release);
451         }
452 
get_unsent_streams( &mut self, last_stream_id: u32, ) -> Result<Vec<u32>, H2Error>453         pub(crate) fn get_unsent_streams(
454             &mut self,
455             last_stream_id: u32,
456         ) -> Result<Vec<u32>, H2Error> {
457             // The last-stream-id in the subsequent GO_AWAY frame
458             // cannot be greater than the last-stream-id in the previous GO_AWAY frame.
459             if self.streams.max_send_id < last_stream_id {
460                 return Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
461             }
462             self.streams.max_send_id = last_stream_id;
463             Ok(self.streams.get_go_away_streams(last_stream_id))
464         }
465 
send_message_to_stream( &mut self, cx: &mut Context<'_>, stream_id: u32, message: RespMessage, ) -> Poll<Result<(), H2Error>>466         pub(crate) fn send_message_to_stream(
467             &mut self,
468             cx: &mut Context<'_>,
469             stream_id: u32,
470             message: RespMessage,
471         ) -> Poll<Result<(), H2Error>> {
472             if let Some(sender) = self.senders.get(&stream_id) {
473                 // If the client coroutine has exited, this frame is skipped.
474                 let mut tx = {
475                     let sender = sender.clone();
476                     let ft = async move { sender.send(message).await };
477                     Box::pin(ft)
478                 };
479 
480                 match tx.as_mut().poll(cx) {
481                     Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
482                     // The current coroutine sending the request exited prematurely.
483                     Poll::Ready(Err(_)) => {
484                         self.senders.remove(&stream_id);
485                         Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
486                     }
487                     Poll::Pending => {
488                         self.curr_message.insert(stream_id, tx);
489                         Poll::Pending
490                     }
491                 }
492             } else {
493                 Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError)))
494             }
495         }
496 
poll_blocked_message( &mut self, cx: &mut Context<'_>, input_tx: &UnboundedSender<Frame>, ) -> Poll<()>497         pub(crate) fn poll_blocked_message(
498             &mut self,
499             cx: &mut Context<'_>,
500             input_tx: &UnboundedSender<Frame>,
501         ) -> Poll<()> {
502             let keys: Vec<u32> = self.curr_message.keys().cloned().collect();
503             let mut blocked = false;
504 
505             for key in keys {
506                 if let Some(mut task) = self.curr_message.remove(&key) {
507                     match task.as_mut().poll(cx) {
508                         Poll::Ready(Ok(_)) => {}
509                         // The current coroutine sending the request exited prematurely.
510                         Poll::Ready(Err(_)) => {
511                             self.senders.remove(&key);
512                             if let Some(state) = self.streams.stream_state(key) {
513                                 if !matches!(state, H2StreamState::Closed(_)) {
514                                     if let StreamEndState::OK = self.streams.send_local_reset(key) {
515                                         let rest_payload =
516                                             RstStream::new(ErrorCode::NoError.into_code());
517                                         let frame = Frame::new(
518                                             key as usize,
519                                             FrameFlags::empty(),
520                                             Payload::RstStream(rest_payload),
521                                         );
522                                         // ignore the send error occurs here in order to finish all
523                                         // tasks.
524                                         let _ = input_tx.send(frame);
525                                     }
526                                 }
527                             }
528                         }
529                         Poll::Pending => {
530                             self.curr_message.insert(key, task);
531                             blocked = true;
532                         }
533                     }
534                 }
535             }
536             if blocked {
537                 Poll::Pending
538             } else {
539                 Poll::Ready(())
540             }
541         }
542     }
543 
544     impl RespReceiver {
set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>)545         pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver<RespMessage>) {
546             self.receiver = Some(receiver);
547         }
548 
recv(&mut self) -> Result<Frame, HttpClientError>549         pub(crate) async fn recv(&mut self) -> Result<Frame, HttpClientError> {
550             match self.receiver {
551                 Some(ref mut receiver) => {
552                     #[cfg(feature = "tokio_base")]
553                     match receiver.recv().await {
554                         None => err_from_msg!(Request, "Response Receiver Closed !"),
555                         Some(message) => match message {
556                             RespMessage::Output(frame) => Ok(frame),
557                             RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
558                         },
559                     }
560 
561                     #[cfg(feature = "ylong_base")]
562                     match receiver.recv().await {
563                         Err(err) => Err(HttpClientError::from_error(ErrorKind::Request, err)),
564                         Ok(message) => match message {
565                             RespMessage::Output(frame) => Ok(frame),
566                             RespMessage::OutputExit(e) => Err(dispatch_client_error(e)),
567                         },
568                     }
569                 }
570                 // this will not happen.
571                 None => Err(HttpClientError::from_str(
572                     ErrorKind::Request,
573                     "Invalid Frame Receiver !",
574                 )),
575             }
576         }
577 
poll_recv( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<Frame, HttpClientError>>578         pub(crate) fn poll_recv(
579             &mut self,
580             cx: &mut Context<'_>,
581         ) -> Poll<Result<Frame, HttpClientError>> {
582             if let Some(ref mut receiver) = self.receiver {
583                 #[cfg(feature = "tokio_base")]
584                 match receiver.poll_recv(cx) {
585                     Poll::Ready(None) => {
586                         Poll::Ready(err_from_msg!(Request, "Error receive response !"))
587                     }
588                     Poll::Ready(Some(message)) => match message {
589                         RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
590                         RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
591                     },
592                     Poll::Pending => Poll::Pending,
593                 }
594 
595                 #[cfg(feature = "ylong_base")]
596                 match receiver.poll_recv(cx) {
597                     Poll::Ready(Err(e)) => {
598                         Poll::Ready(Err(HttpClientError::from_error(ErrorKind::Request, e)))
599                     }
600                     Poll::Ready(Ok(message)) => match message {
601                         RespMessage::Output(frame) => Poll::Ready(Ok(frame)),
602                         RespMessage::OutputExit(e) => Poll::Ready(Err(dispatch_client_error(e))),
603                     },
604                     Poll::Pending => Poll::Pending,
605                 }
606             } else {
607                 Poll::Ready(err_from_msg!(Request, "Invalid Frame Receiver !"))
608             }
609         }
610     }
611 
async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind> where S: AsyncWrite + Unpin,612     async fn async_send_preface<S>(writer: &mut WriteHalf<S>) -> Result<(), DispatchErrorKind>
613     where
614         S: AsyncWrite + Unpin,
615     {
616         const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
617         writer
618             .write_all(PREFACE)
619             .await
620             .map_err(|e| DispatchErrorKind::Io(e.kind()))
621     }
622 
create_initial_settings(config: &H2Config) -> Frame623     pub(crate) fn create_initial_settings(config: &H2Config) -> Frame {
624         let settings = SettingsBuilder::new()
625             .max_header_list_size(config.max_header_list_size())
626             .max_frame_size(config.max_frame_size())
627             .header_table_size(config.header_table_size())
628             .enable_push(config.enable_push())
629             .initial_window_size(config.stream_window_size())
630             .build();
631 
632         Frame::new(0, FrameFlags::new(0), Payload::Settings(settings))
633     }
634 
635     impl From<std::io::Error> for DispatchErrorKind {
from(value: std::io::Error) -> Self636         fn from(value: std::io::Error) -> Self {
637             DispatchErrorKind::Io(value.kind())
638         }
639     }
640 
641     impl From<H2Error> for DispatchErrorKind {
from(err: H2Error) -> Self642         fn from(err: H2Error) -> Self {
643             DispatchErrorKind::H2(err)
644         }
645     }
646 
dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError647     pub(crate) fn dispatch_client_error(dispatch_error: DispatchErrorKind) -> HttpClientError {
648         match dispatch_error {
649             DispatchErrorKind::H2(e) => HttpClientError::from_error(Request, HttpError::from(e)),
650             DispatchErrorKind::Io(e) => {
651                 HttpClientError::from_io_error(Request, std::io::Error::from(e))
652             }
653             DispatchErrorKind::ChannelClosed => {
654                 HttpClientError::from_str(Request, "Coroutine channel closed.")
655             }
656             DispatchErrorKind::Disconnect => {
657                 HttpClientError::from_str(Request, "remote peer closed.")
658             }
659         }
660     }
661 }
662 
663 #[cfg(test)]
664 mod ut_dispatch {
665     use crate::dispatcher::{ConnDispatcher, Dispatcher};
666 
667     /// UT test cases for `ConnDispatcher::is_shutdown`.
668     ///
669     /// # Brief
670     /// 1. Creates a `ConnDispatcher`.
671     /// 2. Calls `ConnDispatcher::is_shutdown` to get the result.
672     /// 3. Calls `ConnDispatcher::dispatch` to get the result.
673     /// 4. Checks if the result is false.
674     #[test]
ut_is_shutdown()675     fn ut_is_shutdown() {
676         let conn = ConnDispatcher::http1(b"Data");
677         let res = conn.is_shutdown();
678         assert!(!res);
679         let res = conn.dispatch();
680         assert!(res.is_some());
681     }
682 }
683