1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams manage coroutine. 15 16 use std::future::Future; 17 use std::pin::Pin; 18 use std::sync::{Arc, Mutex}; 19 use std::task::{Context, Poll}; 20 21 use ylong_http::h2::{ 22 ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, 23 }; 24 25 use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender}; 26 use crate::util::dispatcher::http2::{ 27 DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync, 28 StreamController, 29 }; 30 use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState}; 31 32 #[derive(Copy, Clone)] 33 enum ManagerState { 34 Send, 35 Receive, 36 Exit(DispatchErrorKind), 37 } 38 39 pub(crate) struct ConnManager { 40 state: ManagerState, 41 next_state: ManagerState, 42 // Synchronize SETTINGS frames sent by the client. 43 settings: Arc<Mutex<SettingsSync>>, 44 // channel transmitter between manager and io input. 45 input_tx: UnboundedSender<Frame>, 46 // channel receiver between manager and io output. 47 resp_rx: BoundedReceiver<OutputMessage>, 48 // channel receiver between manager and stream coroutine. 49 req_rx: UnboundedReceiver<ReqMessage>, 50 controller: StreamController, 51 handshakes: HandShakes, 52 } 53 54 struct HandShakes { 55 local: bool, 56 peer: bool, 57 } 58 59 impl Future for ConnManager { 60 type Output = Result<(), DispatchErrorKind>; 61 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 63 let manager = self.get_mut(); 64 loop { 65 match manager.state { 66 ManagerState::Send => { 67 if manager.poll_blocked_frames(cx).is_pending() { 68 return Poll::Pending; 69 } 70 } 71 ManagerState::Receive => { 72 // Receives a response frame from io output. 73 match manager.resp_rx.poll_recv(cx) { 74 #[cfg(feature = "tokio_base")] 75 Poll::Ready(Some(message)) => match message { 76 OutputMessage::Output(frame) => { 77 if manager.poll_recv_message(cx, frame)?.is_pending() { 78 return Poll::Pending; 79 } 80 } 81 // io output occurs error. 82 OutputMessage::OutputExit(e) => { 83 // Note error returned immediately. 84 if manager.manage_resp_error(cx, e)?.is_pending() { 85 return Poll::Pending; 86 } 87 } 88 }, 89 #[cfg(feature = "ylong_base")] 90 Poll::Ready(Ok(message)) => match message { 91 OutputMessage::Output(frame) => { 92 if manager.poll_recv_message(cx, frame)?.is_pending() { 93 return Poll::Pending; 94 } 95 } 96 // io output occurs error. 97 OutputMessage::OutputExit(e) => { 98 if manager.manage_resp_error(cx, e)?.is_pending() { 99 return Poll::Pending; 100 } 101 } 102 }, 103 #[cfg(feature = "tokio_base")] 104 Poll::Ready(None) => { 105 return manager.poll_channel_closed_exit(cx); 106 } 107 #[cfg(feature = "ylong_base")] 108 Poll::Ready(Err(_e)) => { 109 return manager.poll_channel_closed_exit(cx); 110 } 111 112 Poll::Pending => { 113 // TODO manage error state. 114 return manager.manage_pending_state(cx); 115 } 116 } 117 } 118 ManagerState::Exit(e) => return Poll::Ready(Err(e)), 119 } 120 } 121 } 122 } 123 124 impl ConnManager { new( settings: Arc<Mutex<SettingsSync>>, input_tx: UnboundedSender<Frame>, resp_rx: BoundedReceiver<OutputMessage>, req_rx: UnboundedReceiver<ReqMessage>, controller: StreamController, ) -> Self125 pub(crate) fn new( 126 settings: Arc<Mutex<SettingsSync>>, 127 input_tx: UnboundedSender<Frame>, 128 resp_rx: BoundedReceiver<OutputMessage>, 129 req_rx: UnboundedReceiver<ReqMessage>, 130 controller: StreamController, 131 ) -> Self { 132 Self { 133 state: ManagerState::Receive, 134 next_state: ManagerState::Receive, 135 settings, 136 input_tx, 137 resp_rx, 138 req_rx, 139 controller, 140 handshakes: HandShakes { 141 local: false, 142 peer: false, 143 }, 144 } 145 } 146 manage_pending_state( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>147 fn manage_pending_state( 148 &mut self, 149 cx: &mut Context<'_>, 150 ) -> Poll<Result<(), DispatchErrorKind>> { 151 // The manager previously accepted a GOAWAY Frame. 152 if let Some(code) = self.controller.recved_go_away { 153 self.poll_deal_with_go_away(code)?; 154 } 155 self.controller.streams.window_update_conn(&self.input_tx)?; 156 self.controller 157 .streams 158 .window_update_streams(&self.input_tx)?; 159 self.poll_recv_request(cx)?; 160 if self.handshakes.local && self.handshakes.peer { 161 self.poll_input_request(cx)?; 162 } 163 Poll::Pending 164 } 165 poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>166 fn poll_recv_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 167 loop { 168 #[cfg(feature = "tokio_base")] 169 match self.req_rx.poll_recv(cx) { 170 Poll::Ready(Some(message)) => { 171 if self.controller.streams.reach_max_concurrency() { 172 self.controller.streams.push_pending_concurrency(message.id); 173 } else { 174 self.controller.streams.increase_current_concurrency(); 175 self.controller.streams.push_back_pending_send(message.id) 176 } 177 self.controller.senders.insert(message.id, message.sender); 178 self.controller.streams.insert(message.id, message.request); 179 } 180 Poll::Ready(None) => { 181 return Err(DispatchErrorKind::ChannelClosed); 182 } 183 Poll::Pending => { 184 break; 185 } 186 } 187 #[cfg(feature = "ylong_base")] 188 match self.req_rx.poll_recv(cx) { 189 Poll::Ready(Ok(message)) => { 190 if self.controller.streams.reach_max_concurrency() { 191 self.controller.streams.push_pending_concurrency(message.id); 192 } else { 193 self.controller.streams.increase_current_concurrency(); 194 self.controller.streams.push_back_pending_send(message.id) 195 } 196 self.controller.senders.insert(message.id, message.sender); 197 self.controller.streams.insert(message.id, message.request); 198 } 199 Poll::Ready(Err(_e)) => { 200 return Err(DispatchErrorKind::ChannelClosed); 201 } 202 Poll::Pending => { 203 break; 204 } 205 } 206 } 207 Ok(()) 208 } 209 poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind>210 fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { 211 self.controller.streams.try_consume_pending_concurrency(); 212 let size = self.controller.streams.pending_stream_num(); 213 let mut index = 0; 214 while index < size { 215 match self.controller.streams.next_pending_stream() { 216 None => { 217 break; 218 } 219 Some(id) => { 220 self.input_stream_frame(cx, id)?; 221 } 222 } 223 index += 1; 224 } 225 Ok(()) 226 } 227 input_stream_frame( &mut self, cx: &mut Context<'_>, id: u32, ) -> Result<(), DispatchErrorKind>228 fn input_stream_frame( 229 &mut self, 230 cx: &mut Context<'_>, 231 id: u32, 232 ) -> Result<(), DispatchErrorKind> { 233 match self.controller.streams.headers(id)? { 234 None => {} 235 Some(header) => { 236 self.poll_send_frame(header)?; 237 } 238 } 239 240 loop { 241 match self.controller.streams.poll_read_body(cx, id)? { 242 DataReadState::Closed => { 243 break; 244 } 245 DataReadState::Pending => { 246 break; 247 } 248 DataReadState::Ready(data) => { 249 self.poll_send_frame(data)?; 250 } 251 DataReadState::Finish(frame) => { 252 self.poll_send_frame(frame)?; 253 break; 254 } 255 } 256 } 257 Ok(()) 258 } 259 poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>260 fn poll_send_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 261 match frame.payload() { 262 Payload::Headers(_) => { 263 if let FrameRecvState::Err(e) = self 264 .controller 265 .streams 266 .send_headers_frame(frame.stream_id() as u32, frame.flags().is_end_stream()) 267 { 268 // Never return FrameRecvState::Ignore case. 269 return Err(e.into()); 270 } 271 } 272 Payload::Data(_) => { 273 if let FrameRecvState::Err(e) = self 274 .controller 275 .streams 276 .send_data_frame(frame.stream_id() as u32, frame.flags().is_end_stream()) 277 { 278 // Never return FrameRecvState::Ignore case. 279 return Err(e.into()); 280 } 281 } 282 _ => {} 283 } 284 285 self.input_tx 286 .send(frame) 287 .map_err(|_e| DispatchErrorKind::ChannelClosed) 288 } 289 poll_recv_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>290 fn poll_recv_frame( 291 &mut self, 292 cx: &mut Context<'_>, 293 frame: Frame, 294 ) -> Poll<Result<(), DispatchErrorKind>> { 295 match frame.payload() { 296 Payload::Settings(_settings) => { 297 self.recv_settings_frame(frame)?; 298 } 299 Payload::Ping(_ping) => { 300 self.recv_ping_frame(frame)?; 301 } 302 Payload::PushPromise(_) => { 303 // TODO The current settings_enable_push setting is fixed to false. 304 return Poll::Ready(Err( 305 H2Error::ConnectionError(ErrorCode::ProtocolError).into() 306 )); 307 } 308 Payload::Goaway(_go_away) => { 309 return self.recv_go_away_frame(cx, frame).map_err(Into::into); 310 } 311 Payload::RstStream(_reset) => { 312 return self.recv_reset_frame(cx, frame).map_err(Into::into); 313 } 314 Payload::Headers(_headers) => { 315 return self.recv_header_frame(cx, frame).map_err(Into::into); 316 } 317 Payload::Data(_data) => { 318 return self.recv_data_frame(cx, frame).map_err(Into::into); 319 } 320 Payload::WindowUpdate(_windows) => { 321 self.recv_window_frame(frame)?; 322 } 323 // Priority is no longer recommended, so keep it compatible but not processed. 324 Payload::Priority(_priority) => {} 325 } 326 Poll::Ready(Ok(())) 327 } 328 recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>329 fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 330 let settings = if let Payload::Settings(settings) = frame.payload() { 331 settings 332 } else { 333 // this will not happen forever. 334 return Ok(()); 335 }; 336 337 if frame.flags().is_ack() { 338 let mut connection = self.settings.lock().unwrap(); 339 340 if let SettingsState::Acknowledging(ref acknowledged) = connection.settings { 341 for setting in acknowledged.get_settings() { 342 if let Setting::InitialWindowSize(size) = setting { 343 self.controller 344 .streams 345 .apply_recv_initial_window_size(*size); 346 } 347 } 348 } 349 connection.settings = SettingsState::Synced; 350 self.handshakes.local = true; 351 Ok(()) 352 } else { 353 for setting in settings.get_settings() { 354 if let Setting::MaxConcurrentStreams(num) = setting { 355 self.controller.streams.apply_max_concurrent_streams(*num); 356 } 357 if let Setting::InitialWindowSize(size) = setting { 358 self.controller 359 .streams 360 .apply_send_initial_window_size(*size)?; 361 } 362 } 363 364 // The reason for copying the payload is to pass information to the io input to 365 // set the frame encoder, and the input will empty the 366 // payload when it is sent 367 let new_settings = Frame::new( 368 frame.stream_id(), 369 FrameFlags::new(0x1), 370 frame.payload().clone(), 371 ); 372 self.input_tx 373 .send(new_settings) 374 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 375 self.handshakes.peer = true; 376 Ok(()) 377 } 378 } 379 recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>380 fn recv_ping_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 381 let ping = if let Payload::Ping(ping) = frame.payload() { 382 ping 383 } else { 384 // this will not happen forever. 385 return Ok(()); 386 }; 387 if frame.flags().is_ack() { 388 // TODO The client does not have the logic to send ping frames. Therefore, the 389 // ack ping is not processed. 390 Ok(()) 391 } else { 392 self.input_tx 393 .send(Ping::ack(ping.clone())) 394 .map_err(|_e| DispatchErrorKind::ChannelClosed) 395 } 396 } 397 recv_go_away_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>398 fn recv_go_away_frame( 399 &mut self, 400 cx: &mut Context<'_>, 401 frame: Frame, 402 ) -> Poll<Result<(), H2Error>> { 403 let go_away = if let Payload::Goaway(goaway) = frame.payload() { 404 goaway 405 } else { 406 // this will not happen forever. 407 return Poll::Ready(Ok(())); 408 }; 409 // Prevents the current connection from generating a new stream. 410 self.controller.shutdown(); 411 self.req_rx.close(); 412 let last_stream_id = go_away.get_last_stream_id(); 413 let streams = self.controller.get_unsent_streams(last_stream_id as u32)?; 414 415 let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?); 416 417 let mut blocked = false; 418 for stream_id in streams { 419 match self.controller.send_message_to_stream( 420 cx, 421 stream_id, 422 RespMessage::OutputExit(error.into()), 423 ) { 424 // ignore error when going away. 425 Poll::Ready(_) => {} 426 Poll::Pending => { 427 blocked = true; 428 } 429 } 430 } 431 // Exit after the allowed stream is complete. 432 self.controller.recved_go_away = Some(go_away.get_error_code()); 433 if blocked { 434 Poll::Pending 435 } else { 436 Poll::Ready(Ok(())) 437 } 438 } 439 recv_reset_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>440 fn recv_reset_frame( 441 &mut self, 442 cx: &mut Context<'_>, 443 frame: Frame, 444 ) -> Poll<Result<(), H2Error>> { 445 match self 446 .controller 447 .streams 448 .recv_remote_reset(frame.stream_id() as u32) 449 { 450 StreamEndState::OK => self.controller.send_message_to_stream( 451 cx, 452 frame.stream_id() as u32, 453 RespMessage::Output(frame), 454 ), 455 StreamEndState::Err(e) => Poll::Ready(Err(e)), 456 StreamEndState::Ignore => Poll::Ready(Ok(())), 457 } 458 } 459 recv_header_frame( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), H2Error>>460 fn recv_header_frame( 461 &mut self, 462 cx: &mut Context<'_>, 463 frame: Frame, 464 ) -> Poll<Result<(), H2Error>> { 465 match self 466 .controller 467 .streams 468 .recv_headers(frame.stream_id() as u32, frame.flags().is_end_stream()) 469 { 470 FrameRecvState::OK => self.controller.send_message_to_stream( 471 cx, 472 frame.stream_id() as u32, 473 RespMessage::Output(frame), 474 ), 475 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 476 FrameRecvState::Ignore => Poll::Ready(Ok(())), 477 } 478 } 479 recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>>480 fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll<Result<(), H2Error>> { 481 let data = if let Payload::Data(data) = frame.payload() { 482 data 483 } else { 484 // this will not happen forever. 485 return Poll::Ready(Ok(())); 486 }; 487 let id = frame.stream_id() as u32; 488 let len = data.size() as u32; 489 490 self.controller.streams.release_conn_recv_window(len)?; 491 self.controller 492 .streams 493 .release_stream_recv_window(id, len)?; 494 495 match self 496 .controller 497 .streams 498 .recv_data(id, frame.flags().is_end_stream()) 499 { 500 FrameRecvState::OK => self.controller.send_message_to_stream( 501 cx, 502 frame.stream_id() as u32, 503 RespMessage::Output(frame), 504 ), 505 FrameRecvState::Ignore => Poll::Ready(Ok(())), 506 FrameRecvState::Err(e) => Poll::Ready(Err(e)), 507 } 508 } 509 recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind>510 fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { 511 let windows = if let Payload::WindowUpdate(windows) = frame.payload() { 512 windows 513 } else { 514 // this will not happen forever. 515 return Ok(()); 516 }; 517 let id = frame.stream_id(); 518 let increment = windows.get_increment(); 519 if id == 0 { 520 self.controller 521 .streams 522 .increase_conn_send_window(increment)?; 523 self.controller.streams.reassign_conn_send_window(); 524 } else { 525 self.controller 526 .streams 527 .reassign_stream_send_window(id as u32, increment)?; 528 } 529 Ok(()) 530 } 531 manage_resp_error( &mut self, cx: &mut Context<'_>, kind: DispatchErrorKind, ) -> Poll<Result<(), DispatchErrorKind>>532 fn manage_resp_error( 533 &mut self, 534 cx: &mut Context<'_>, 535 kind: DispatchErrorKind, 536 ) -> Poll<Result<(), DispatchErrorKind>> { 537 match kind { 538 DispatchErrorKind::H2(h2) => match h2 { 539 H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code), 540 H2Error::ConnectionError(code) => self.manage_conn_error(cx, code), 541 }, 542 other => { 543 let blocked = self.exit_with_error(cx, other); 544 if blocked { 545 self.state = ManagerState::Send; 546 self.next_state = ManagerState::Exit(other); 547 Poll::Pending 548 } else { 549 Poll::Ready(Err(other)) 550 } 551 } 552 } 553 } 554 manage_stream_error( &mut self, cx: &mut Context<'_>, id: u32, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>555 fn manage_stream_error( 556 &mut self, 557 cx: &mut Context<'_>, 558 id: u32, 559 code: ErrorCode, 560 ) -> Poll<Result<(), DispatchErrorKind>> { 561 let rest_payload = RstStream::new(code.into_code()); 562 let frame = Frame::new( 563 id as usize, 564 FrameFlags::empty(), 565 Payload::RstStream(rest_payload), 566 ); 567 match self.controller.streams.send_local_reset(id) { 568 StreamEndState::OK => { 569 self.input_tx 570 .send(frame) 571 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 572 573 match self.controller.send_message_to_stream( 574 cx, 575 id, 576 RespMessage::OutputExit(DispatchErrorKind::ChannelClosed), 577 ) { 578 Poll::Ready(_) => { 579 // error at the stream level due to early exit of the coroutine in which the 580 // request is located, ignored to avoid manager coroutine exit. 581 Poll::Ready(Ok(())) 582 } 583 Poll::Pending => { 584 self.state = ManagerState::Send; 585 // stream error will not cause manager exit with error(exit state). Takes 586 // effect only if blocked. 587 self.next_state = ManagerState::Receive; 588 Poll::Pending 589 } 590 } 591 } 592 StreamEndState::Ignore => Poll::Ready(Ok(())), 593 StreamEndState::Err(e) => { 594 // This error will never happen. 595 Poll::Ready(Err(e.into())) 596 } 597 } 598 } 599 manage_conn_error( &mut self, cx: &mut Context<'_>, code: ErrorCode, ) -> Poll<Result<(), DispatchErrorKind>>600 fn manage_conn_error( 601 &mut self, 602 cx: &mut Context<'_>, 603 code: ErrorCode, 604 ) -> Poll<Result<(), DispatchErrorKind>> { 605 // last_stream_id is set to 0 to ensure that all pushed streams are 606 // shutdown. 607 let go_away_payload = Goaway::new( 608 code.into_code(), 609 self.controller.streams.latest_remote_id as usize, 610 vec![], 611 ); 612 let frame = Frame::new( 613 0, 614 FrameFlags::empty(), 615 Payload::Goaway(go_away_payload.clone()), 616 ); 617 // Avoid sending the same GO_AWAY frame multiple times. 618 if let Some(ref go_away) = self.controller.go_away_sync.going_away { 619 if go_away.get_error_code() == go_away_payload.get_error_code() 620 && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id() 621 { 622 return Poll::Ready(Ok(())); 623 } 624 } 625 self.controller.go_away_sync.going_away = Some(go_away_payload); 626 self.input_tx 627 .send(frame) 628 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 629 630 let blocked = 631 self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code))); 632 633 if blocked { 634 self.state = ManagerState::Send; 635 self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into()); 636 Poll::Pending 637 } else { 638 // TODO When current client has an error, 639 // it always sends the GO_AWAY frame at the first time and exits directly. 640 // Should we consider letting part of the unfinished stream complete? 641 Poll::Ready(Err(H2Error::ConnectionError(code).into())) 642 } 643 } 644 poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind>645 fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> { 646 // The client that receives GO_AWAY needs to return a GO_AWAY to the server 647 // before closed. The preceding operations before receiving the frame 648 // ensure that the connection is in the closing state. 649 if self.controller.streams.is_closed() { 650 let last_stream_id = self.controller.streams.latest_remote_id as usize; 651 let go_away_payload = Goaway::new(error_code, last_stream_id, vec![]); 652 let frame = Frame::new( 653 0, 654 FrameFlags::empty(), 655 Payload::Goaway(go_away_payload.clone()), 656 ); 657 658 self.send_peer_goaway(frame, go_away_payload, error_code)?; 659 return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into()); 660 } 661 Ok(()) 662 } 663 send_peer_goaway( &mut self, frame: Frame, payload: Goaway, err_code: u32, ) -> Result<(), DispatchErrorKind>664 fn send_peer_goaway( 665 &mut self, 666 frame: Frame, 667 payload: Goaway, 668 err_code: u32, 669 ) -> Result<(), DispatchErrorKind> { 670 match self.controller.go_away_sync.going_away { 671 None => { 672 self.controller.go_away_sync.going_away = Some(payload); 673 self.input_tx 674 .send(frame) 675 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 676 } 677 Some(ref go_away) => { 678 // Whether the same GOAWAY Frame has been sent before. 679 if !(go_away.get_error_code() == err_code 680 && go_away.get_last_stream_id() 681 == self.controller.streams.latest_remote_id as usize) 682 { 683 self.controller.go_away_sync.going_away = Some(payload); 684 self.input_tx 685 .send(frame) 686 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 687 } 688 } 689 } 690 Ok(()) 691 } 692 poll_recv_message( &mut self, cx: &mut Context<'_>, frame: Frame, ) -> Poll<Result<(), DispatchErrorKind>>693 fn poll_recv_message( 694 &mut self, 695 cx: &mut Context<'_>, 696 frame: Frame, 697 ) -> Poll<Result<(), DispatchErrorKind>> { 698 match self.poll_recv_frame(cx, frame) { 699 Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind), 700 Poll::Pending => { 701 self.state = ManagerState::Send; 702 self.next_state = ManagerState::Receive; 703 Poll::Pending 704 } 705 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), 706 } 707 } 708 poll_channel_closed_exit( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), DispatchErrorKind>>709 fn poll_channel_closed_exit( 710 &mut self, 711 cx: &mut Context<'_>, 712 ) -> Poll<Result<(), DispatchErrorKind>> { 713 if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) { 714 self.state = ManagerState::Send; 715 self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed); 716 Poll::Pending 717 } else { 718 Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) 719 } 720 } 721 poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()>722 fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> { 723 match self.controller.poll_blocked_message(cx, &self.input_tx) { 724 Poll::Ready(_) => { 725 self.state = self.next_state; 726 // Reset state. 727 self.next_state = ManagerState::Receive; 728 Poll::Ready(()) 729 } 730 Poll::Pending => Poll::Pending, 731 } 732 } 733 exit_with_error( &mut self, cx: &mut Context<'_>, error: DispatchErrorKind, ) -> bool734 pub(crate) fn exit_with_error( 735 &mut self, 736 cx: &mut Context<'_>, 737 error: DispatchErrorKind, 738 ) -> bool { 739 self.controller.shutdown(); 740 self.req_rx.close(); 741 self.controller.streams.clear_streams_states(); 742 743 let ids = self.controller.streams.get_all_unclosed_streams(); 744 let mut blocked = false; 745 for stream_id in ids { 746 match self.controller.send_message_to_stream( 747 cx, 748 stream_id, 749 RespMessage::OutputExit(error), 750 ) { 751 // ignore error when going away. 752 Poll::Ready(_) => {} 753 Poll::Pending => { 754 blocked = true; 755 } 756 } 757 } 758 blocked 759 } 760 } 761