1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams manage coroutine.
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::sync::{Arc, Mutex};
19 use std::task::{Context, Poll};
20 
21 use ylong_http::h2::{
22     ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting,
23 };
24 
25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender};
26 use crate::util::dispatcher::http2::{
27     DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync,
28     StreamController,
29 };
30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState};
31 
32 #[derive(Copy, Clone)]
33 enum ManagerState {
34     Send,
35     Receive,
36     Exit(DispatchErrorKind),
37 }
38 
39 pub(crate) struct ConnManager {
40     state: ManagerState,
41     next_state: ManagerState,
42     // Synchronize SETTINGS frames sent by the client.
43     settings: Arc<Mutex<SettingsSync>>,
44     // channel transmitter between manager and io input.
45     input_tx: UnboundedSender<Frame>,
46     // channel receiver between manager and io output.
47     resp_rx: BoundedReceiver<OutputMessage>,
48     // channel receiver between manager and stream coroutine.
49     req_rx: UnboundedReceiver<ReqMessage>,
50     controller: StreamController,
51     handshakes: HandShakes,
52 }
53 
54 struct HandShakes {
55     local: bool,
56     peer: bool,
57 }
58 
59 impl Future for ConnManager {
60     type Output = Result<(), DispatchErrorKind>;
61 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>62     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63         let manager = self.get_mut();
64         loop {
65             match manager.state {
66                 ManagerState::Send => {
67                     if manager.poll_blocked_frames(cx).is_pending() {
68                         return Poll::Pending;
69                     }
70                 }
71                 ManagerState::Receive => {
72                     // Receives a response frame from io output.
73                     match manager.resp_rx.poll_recv(cx) {
74                         #[cfg(feature = "tokio_base")]
75                         Poll::Ready(Some(message)) => match message {
76                             OutputMessage::Output(frame) => {
77                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
78                                     return Poll::Pending;
79                                 }
80                             }
81                             // io output occurs error.
82                             OutputMessage::OutputExit(e) => {
83                                 // Note error returned immediately.
84                                 if manager.manage_resp_error(cx, e)?.is_pending() {
85                                     return Poll::Pending;
86                                 }
87                             }
88                         },
89                         #[cfg(feature = "ylong_base")]
90                         Poll::Ready(Ok(message)) => match message {
91                             OutputMessage::Output(frame) => {
92                                 if manager.poll_recv_message(cx, frame)?.is_pending() {
93                                     return Poll::Pending;
94                                 }
95                             }
96                             // io output occurs error.
97                             OutputMessage::OutputExit(e) => {
98                                 if manager.manage_resp_error(cx, e)?.is_pending() {
99                                     return Poll::Pending;
100                                 }
101                             }
102                         },
103                         #[cfg(feature = "tokio_base")]
104                         Poll::Ready(None) => {
105                             return manager.poll_channel_closed_exit(cx);
106                         }
107                         #[cfg(feature = "ylong_base")]
108                         Poll::Ready(Err(_e)) => {
109                             return manager.poll_channel_closed_exit(cx);
110                         }
111 
112                         Poll::Pending => {
113                             // TODO manage error state.
114                             return manager.manage_pending_state(cx);
115                         }
116                     }
117                 }
118                 ManagerState::Exit(e) => return Poll::Ready(Err(e)),
119             }
120         }
121     }
122 }
123 
124 impl ConnManager {
new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self125     pub(crate) fn new(
126         settings: Arc<Mutex<SettingsSync>>,
127         input_tx: UnboundedSender<Frame>,
128         resp_rx: BoundedReceiver<OutputMessage>,
129         req_rx: UnboundedReceiver<ReqMessage>,
130         controller: StreamController,
131     ) -> Self {
132         Self {
133             state: ManagerState::Receive,
134             next_state: ManagerState::Receive,
135             settings,
136             input_tx,
137             resp_rx,
138             req_rx,
139             controller,
140             handshakes: HandShakes {
141                 local: false,
142                 peer: false,
143             },
144         }
145     }
146 
manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>147     fn manage_pending_state(
148         &mut self,
149         cx: &mut Context<'_>,
150     ) -> Poll<Result<(), DispatchErrorKind>> {
151         // The manager previously accepted a GOAWAY Frame.
152         if let Some(code) = self.controller.recved_go_away {
153             self.poll_deal_with_go_away(code)?;
154         }
155         self.controller.streams.window_update_conn(&self.input_tx)?;
156         self.controller
157             .streams
158             .window_update_streams(&self.input_tx)?;
159         self.poll_recv_request(cx)?;
160         if self.handshakes.local && self.handshakes.peer {
161             self.poll_input_request(cx)?;
162         }
163         Poll::Pending
164     }
165 
poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>166     fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
167         loop {
168             #[cfg(feature = "tokio_base")]
169             match self.req_rx.poll_recv(cx) {
170                 Poll::Ready(Some(message)) => {
171                     if self.controller.streams.reach_max_concurrency() {
172                         self.controller.streams.push_pending_concurrency(message.id);
173                     } else {
174                         self.controller.streams.increase_current_concurrency();
175                         self.controller.streams.push_back_pending_send(message.id)
176                     }
177                     self.controller.senders.insert(message.id, message.sender);
178                     self.controller.streams.insert(message.id, message.request);
179                 }
180                 Poll::Ready(None) => {
181                     return Err(DispatchErrorKind::ChannelClosed);
182                 }
183                 Poll::Pending => {
184                     break;
185                 }
186             }
187             #[cfg(feature = "ylong_base")]
188             match self.req_rx.poll_recv(cx) {
189                 Poll::Ready(Ok(message)) => {
190                     if self.controller.streams.reach_max_concurrency() {
191                         self.controller.streams.push_pending_concurrency(message.id);
192                     } else {
193                         self.controller.streams.increase_current_concurrency();
194                         self.controller.streams.push_back_pending_send(message.id)
195                     }
196                     self.controller.senders.insert(message.id, message.sender);
197                     self.controller.streams.insert(message.id, message.request);
198                 }
199                 Poll::Ready(Err(_e)) => {
200                     return Err(DispatchErrorKind::ChannelClosed);
201                 }
202                 Poll::Pending => {
203                     break;
204                 }
205             }
206         }
207         Ok(())
208     }
209 
poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>210     fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> {
211         self.controller.streams.try_consume_pending_concurrency();
212         let size = self.controller.streams.pending_stream_num();
213         let mut index = 0;
214         while index < size {
215             match self.controller.streams.next_pending_stream() {
216                 None => {
217                     break;
218                 }
219                 Some(id) => {
220                     self.input_stream_frame(cx, id)?;
221                 }
222             }
223             index += 1;
224         }
225         Ok(())
226     }
227 
input_stream_frame( &mut self, cx: &mut Context<'_>, id: u32, ) -> Result<(), DispatchErrorKind>228     fn input_stream_frame(
229         &mut self,
230         cx: &mut Context<'_>,
231         id: u32,
232     ) -> Result<(), DispatchErrorKind> {
233         match self.controller.streams.headers(id)? {
234             None => {}
235             Some(header) => {
236                 self.poll_send_frame(header)?;
237             }
238         }
239 
240         loop {
241             match self.controller.streams.poll_read_body(cx, id)? {
242                 DataReadState::Closed => {
243                     break;
244                 }
245                 DataReadState::Pending => {
246                     break;
247                 }
248                 DataReadState::Ready(data) => {
249                     self.poll_send_frame(data)?;
250                 }
251                 DataReadState::Finish(frame) => {
252                     self.poll_send_frame(frame)?;
253                     break;
254                 }
255             }
256         }
257         Ok(())
258     }
259 
poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>260     fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
261         match frame.payload() {
262             Payload::Headers(_) => {
263                 if let FrameRecvState::Err(e) = self
264                     .controller
265                     .streams
266                     .send_headers_frame(frame.stream_id() as u32, frame.flags().is_end_stream())
267                 {
268                     // Never return FrameRecvState::Ignore case.
269                     return Err(e.into());
270                 }
271             }
272             Payload::Data(_) => {
273                 if let FrameRecvState::Err(e) = self
274                     .controller
275                     .streams
276                     .send_data_frame(frame.stream_id() as u32, frame.flags().is_end_stream())
277                 {
278                     // Never return FrameRecvState::Ignore case.
279                     return Err(e.into());
280                 }
281             }
282             _ => {}
283         }
284 
285         self.input_tx
286             .send(frame)
287             .map_err(|_e| DispatchErrorKind::ChannelClosed)
288     }
289 
poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>290     fn poll_recv_frame(
291         &mut self,
292         cx: &mut Context<'_>,
293         frame: Frame,
294     ) -> Poll<Result<(), DispatchErrorKind>> {
295         match frame.payload() {
296             Payload::Settings(_settings) => {
297                 self.recv_settings_frame(frame)?;
298             }
299             Payload::Ping(_ping) => {
300                 self.recv_ping_frame(frame)?;
301             }
302             Payload::PushPromise(_) => {
303                 // TODO The current settings_enable_push setting is fixed to false.
304                 return Poll::Ready(Err(
305                     H2Error::ConnectionError(ErrorCode::ProtocolError).into()
306                 ));
307             }
308             Payload::Goaway(_go_away) => {
309                 return self.recv_go_away_frame(cx, frame).map_err(Into::into);
310             }
311             Payload::RstStream(_reset) => {
312                 return self.recv_reset_frame(cx, frame).map_err(Into::into);
313             }
314             Payload::Headers(_headers) => {
315                 return self.recv_header_frame(cx, frame).map_err(Into::into);
316             }
317             Payload::Data(_data) => {
318                 return self.recv_data_frame(cx, frame).map_err(Into::into);
319             }
320             Payload::WindowUpdate(_windows) => {
321                 self.recv_window_frame(frame)?;
322             }
323             // Priority is no longer recommended, so keep it compatible but not processed.
324             Payload::Priority(_priority) => {}
325         }
326         Poll::Ready(Ok(()))
327     }
328 
recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>329     fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
330         let settings = if let Payload::Settings(settings) = frame.payload() {
331             settings
332         } else {
333             // this will not happen forever.
334             return Ok(());
335         };
336 
337         if frame.flags().is_ack() {
338             let mut connection = self.settings.lock().unwrap();
339 
340             if let SettingsState::Acknowledging(ref acknowledged) = connection.settings {
341                 for setting in acknowledged.get_settings() {
342                     if let Setting::InitialWindowSize(size) = setting {
343                         self.controller
344                             .streams
345                             .apply_recv_initial_window_size(*size);
346                     }
347                 }
348             }
349             connection.settings = SettingsState::Synced;
350             self.handshakes.local = true;
351             Ok(())
352         } else {
353             for setting in settings.get_settings() {
354                 if let Setting::MaxConcurrentStreams(num) = setting {
355                     self.controller.streams.apply_max_concurrent_streams(*num);
356                 }
357                 if let Setting::InitialWindowSize(size) = setting {
358                     self.controller
359                         .streams
360                         .apply_send_initial_window_size(*size)?;
361                 }
362             }
363 
364             // The reason for copying the payload is to pass information to the io input to
365             // set the frame encoder, and the input will empty the
366             // payload when it is sent
367             let new_settings = Frame::new(
368                 frame.stream_id(),
369                 FrameFlags::new(0x1),
370                 frame.payload().clone(),
371             );
372             self.input_tx
373                 .send(new_settings)
374                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
375             self.handshakes.peer = true;
376             Ok(())
377         }
378     }
379 
recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>380     fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
381         let ping = if let Payload::Ping(ping) = frame.payload() {
382             ping
383         } else {
384             // this will not happen forever.
385             return Ok(());
386         };
387         if frame.flags().is_ack() {
388             // TODO The client does not have the logic to send ping frames. Therefore, the
389             // ack ping is not processed.
390             Ok(())
391         } else {
392             self.input_tx
393                 .send(Ping::ack(ping.clone()))
394                 .map_err(|_e| DispatchErrorKind::ChannelClosed)
395         }
396     }
397 
recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>398     fn recv_go_away_frame(
399         &mut self,
400         cx: &mut Context<'_>,
401         frame: Frame,
402     ) -> Poll<Result<(), H2Error>> {
403         let go_away = if let Payload::Goaway(goaway) = frame.payload() {
404             goaway
405         } else {
406             // this will not happen forever.
407             return Poll::Ready(Ok(()));
408         };
409         // Prevents the current connection from generating a new stream.
410         self.controller.shutdown();
411         self.req_rx.close();
412         let last_stream_id = go_away.get_last_stream_id();
413         let streams = self.controller.get_unsent_streams(last_stream_id as u32)?;
414 
415         let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?);
416 
417         let mut blocked = false;
418         for stream_id in streams {
419             match self.controller.send_message_to_stream(
420                 cx,
421                 stream_id,
422                 RespMessage::OutputExit(error.into()),
423             ) {
424                 // ignore error when going away.
425                 Poll::Ready(_) => {}
426                 Poll::Pending => {
427                     blocked = true;
428                 }
429             }
430         }
431         // Exit after the allowed stream is complete.
432         self.controller.recved_go_away = Some(go_away.get_error_code());
433         if blocked {
434             Poll::Pending
435         } else {
436             Poll::Ready(Ok(()))
437         }
438     }
439 
recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>440     fn recv_reset_frame(
441         &mut self,
442         cx: &mut Context<'_>,
443         frame: Frame,
444     ) -> Poll<Result<(), H2Error>> {
445         match self
446             .controller
447             .streams
448             .recv_remote_reset(frame.stream_id() as u32)
449         {
450             StreamEndState::OK => self.controller.send_message_to_stream(
451                 cx,
452                 frame.stream_id() as u32,
453                 RespMessage::Output(frame),
454             ),
455             StreamEndState::Err(e) => Poll::Ready(Err(e)),
456             StreamEndState::Ignore => Poll::Ready(Ok(())),
457         }
458     }
459 
recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>460     fn recv_header_frame(
461         &mut self,
462         cx: &mut Context<'_>,
463         frame: Frame,
464     ) -> Poll<Result<(), H2Error>> {
465         match self
466             .controller
467             .streams
468             .recv_headers(frame.stream_id() as u32, frame.flags().is_end_stream())
469         {
470             FrameRecvState::OK => self.controller.send_message_to_stream(
471                 cx,
472                 frame.stream_id() as u32,
473                 RespMessage::Output(frame),
474             ),
475             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
476             FrameRecvState::Ignore => Poll::Ready(Ok(())),
477         }
478     }
479 
recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>>480     fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>> {
481         let data = if let Payload::Data(data) = frame.payload() {
482             data
483         } else {
484             // this will not happen forever.
485             return Poll::Ready(Ok(()));
486         };
487         let id = frame.stream_id() as u32;
488         let len = data.size() as u32;
489 
490         self.controller.streams.release_conn_recv_window(len)?;
491         self.controller
492             .streams
493             .release_stream_recv_window(id, len)?;
494 
495         match self
496             .controller
497             .streams
498             .recv_data(id, frame.flags().is_end_stream())
499         {
500             FrameRecvState::OK => self.controller.send_message_to_stream(
501                 cx,
502                 frame.stream_id() as u32,
503                 RespMessage::Output(frame),
504             ),
505             FrameRecvState::Ignore => Poll::Ready(Ok(())),
506             FrameRecvState::Err(e) => Poll::Ready(Err(e)),
507         }
508     }
509 
recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>510     fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> {
511         let windows = if let Payload::WindowUpdate(windows) = frame.payload() {
512             windows
513         } else {
514             // this will not happen forever.
515             return Ok(());
516         };
517         let id = frame.stream_id();
518         let increment = windows.get_increment();
519         if id == 0 {
520             self.controller
521                 .streams
522                 .increase_conn_send_window(increment)?;
523             self.controller.streams.reassign_conn_send_window();
524         } else {
525             self.controller
526                 .streams
527                 .reassign_stream_send_window(id as u32, increment)?;
528         }
529         Ok(())
530     }
531 
manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>532     fn manage_resp_error(
533         &mut self,
534         cx: &mut Context<'_>,
535         kind: DispatchErrorKind,
536     ) -> Poll<Result<(), DispatchErrorKind>> {
537         match kind {
538             DispatchErrorKind::H2(h2) => match h2 {
539                 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code),
540                 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code),
541             },
542             other => {
543                 let blocked = self.exit_with_error(cx, other);
544                 if blocked {
545                     self.state = ManagerState::Send;
546                     self.next_state = ManagerState::Exit(other);
547                     Poll::Pending
548                 } else {
549                     Poll::Ready(Err(other))
550                 }
551             }
552         }
553     }
554 
manage_stream_error( &mut self, cx: &mut Context<'_>, id: u32, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>555     fn manage_stream_error(
556         &mut self,
557         cx: &mut Context<'_>,
558         id: u32,
559         code: ErrorCode,
560     ) -> Poll<Result<(), DispatchErrorKind>> {
561         let rest_payload = RstStream::new(code.into_code());
562         let frame = Frame::new(
563             id as usize,
564             FrameFlags::empty(),
565             Payload::RstStream(rest_payload),
566         );
567         match self.controller.streams.send_local_reset(id) {
568             StreamEndState::OK => {
569                 self.input_tx
570                     .send(frame)
571                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
572 
573                 match self.controller.send_message_to_stream(
574                     cx,
575                     id,
576                     RespMessage::OutputExit(DispatchErrorKind::ChannelClosed),
577                 ) {
578                     Poll::Ready(_) => {
579                         // error at the stream level due to early exit of the coroutine in which the
580                         // request is located, ignored to avoid manager coroutine exit.
581                         Poll::Ready(Ok(()))
582                     }
583                     Poll::Pending => {
584                         self.state = ManagerState::Send;
585                         // stream error will not cause manager exit with error(exit state). Takes
586                         // effect only if blocked.
587                         self.next_state = ManagerState::Receive;
588                         Poll::Pending
589                     }
590                 }
591             }
592             StreamEndState::Ignore => Poll::Ready(Ok(())),
593             StreamEndState::Err(e) => {
594                 // This error will never happen.
595                 Poll::Ready(Err(e.into()))
596             }
597         }
598     }
599 
manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>600     fn manage_conn_error(
601         &mut self,
602         cx: &mut Context<'_>,
603         code: ErrorCode,
604     ) -> Poll<Result<(), DispatchErrorKind>> {
605         // last_stream_id is set to 0 to ensure that all pushed streams are
606         // shutdown.
607         let go_away_payload = Goaway::new(
608             code.into_code(),
609             self.controller.streams.latest_remote_id as usize,
610             vec![],
611         );
612         let frame = Frame::new(
613             0,
614             FrameFlags::empty(),
615             Payload::Goaway(go_away_payload.clone()),
616         );
617         // Avoid sending the same GO_AWAY frame multiple times.
618         if let Some(ref go_away) = self.controller.go_away_sync.going_away {
619             if go_away.get_error_code() == go_away_payload.get_error_code()
620                 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id()
621             {
622                 return Poll::Ready(Ok(()));
623             }
624         }
625         self.controller.go_away_sync.going_away = Some(go_away_payload);
626         self.input_tx
627             .send(frame)
628             .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
629 
630         let blocked =
631             self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code)));
632 
633         if blocked {
634             self.state = ManagerState::Send;
635             self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into());
636             Poll::Pending
637         } else {
638             // TODO When current client has an error,
639             // it always sends the GO_AWAY frame at the first time and exits directly.
640             // Should we consider letting part of the unfinished stream complete?
641             Poll::Ready(Err(H2Error::ConnectionError(code).into()))
642         }
643     }
644 
poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>645     fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> {
646         // The client that receives GO_AWAY needs to return a GO_AWAY to the server
647         // before closed. The preceding operations before receiving the frame
648         // ensure that the connection is in the closing state.
649         if self.controller.streams.is_closed() {
650             let last_stream_id = self.controller.streams.latest_remote_id as usize;
651             let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]);
652             let frame = Frame::new(
653                 0,
654                 FrameFlags::empty(),
655                 Payload::Goaway(go_away_payload.clone()),
656             );
657 
658             self.send_peer_goaway(frame, go_away_payload, error_code)?;
659             return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into());
660         }
661         Ok(())
662     }
663 
send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>664     fn send_peer_goaway(
665         &mut self,
666         frame: Frame,
667         payload: Goaway,
668         err_code: u32,
669     ) -> Result<(), DispatchErrorKind> {
670         match self.controller.go_away_sync.going_away {
671             None => {
672                 self.controller.go_away_sync.going_away = Some(payload);
673                 self.input_tx
674                     .send(frame)
675                     .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
676             }
677             Some(ref go_away) => {
678                 // Whether the same GOAWAY Frame has been sent before.
679                 if !(go_away.get_error_code() == err_code
680                     && go_away.get_last_stream_id()
681                         == self.controller.streams.latest_remote_id as usize)
682                 {
683                     self.controller.go_away_sync.going_away = Some(payload);
684                     self.input_tx
685                         .send(frame)
686                         .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
687                 }
688             }
689         }
690         Ok(())
691     }
692 
poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>693     fn poll_recv_message(
694         &mut self,
695         cx: &mut Context<'_>,
696         frame: Frame,
697     ) -> Poll<Result<(), DispatchErrorKind>> {
698         match self.poll_recv_frame(cx, frame) {
699             Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind),
700             Poll::Pending => {
701                 self.state = ManagerState::Send;
702                 self.next_state = ManagerState::Receive;
703                 Poll::Pending
704             }
705             Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
706         }
707     }
708 
poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>709     fn poll_channel_closed_exit(
710         &mut self,
711         cx: &mut Context<'_>,
712     ) -> Poll<Result<(), DispatchErrorKind>> {
713         if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) {
714             self.state = ManagerState::Send;
715             self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed);
716             Poll::Pending
717         } else {
718             Poll::Ready(Err(DispatchErrorKind::ChannelClosed))
719         }
720     }
721 
poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>722     fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> {
723         match self.controller.poll_blocked_message(cx, &self.input_tx) {
724             Poll::Ready(_) => {
725                 self.state = self.next_state;
726                 // Reset state.
727                 self.next_state = ManagerState::Receive;
728                 Poll::Ready(())
729             }
730             Poll::Pending => Poll::Pending,
731         }
732     }
733 
exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool734     pub(crate) fn exit_with_error(
735         &mut self,
736         cx: &mut Context<'_>,
737         error: DispatchErrorKind,
738     ) -> bool {
739         self.controller.shutdown();
740         self.req_rx.close();
741         self.controller.streams.clear_streams_states();
742 
743         let ids = self.controller.streams.get_all_unclosed_streams();
744         let mut blocked = false;
745         for stream_id in ids {
746             match self.controller.send_message_to_stream(
747                 cx,
748                 stream_id,
749                 RespMessage::OutputExit(error),
750             ) {
751                 // ignore error when going away.
752                 Poll::Ready(_) => {}
753                 Poll::Pending => {
754                     blocked = true;
755                 }
756             }
757         }
758         blocked
759     }
760 }
761