1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Streams operations utils. 15 16 use std::cmp::{min, Ordering}; 17 use std::collections::{HashMap, HashSet, VecDeque}; 18 use std::task::{Context, Poll}; 19 20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload}; 21 22 use crate::runtime::UnboundedSender; 23 use crate::util::dispatcher::http2::DispatchErrorKind; 24 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow}; 25 use crate::util::h2::data_ref::BodyDataRef; 26 27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1; 28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1; 29 const INITIAL_LATEST_REMOTE_ID: u32 = 0; 30 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100; 31 32 pub(crate) enum FrameRecvState { 33 OK, 34 Ignore, 35 Err(H2Error), 36 } 37 38 pub(crate) enum DataReadState { 39 Closed, 40 // Wait for poll_read or wait for window. 41 Pending, 42 Ready(Frame), 43 Finish(Frame), 44 } 45 46 pub(crate) enum StreamEndState { 47 OK, 48 Ignore, 49 Err(H2Error), 50 } 51 52 // +--------+ 53 // send PP | | recv PP 54 // ,--------| idle |--------. 55 // / | | \ 56 // v +--------+ v 57 // +----------+ | +----------+ 58 // | | | send H / | | 59 // ,------| reserved | | recv H | reserved |------. 60 // | | (local) | | | (remote) | | 61 // | +----------+ v +----------+ | 62 // | | +--------+ | | 63 // | | recv ES | | send ES | | 64 // | send H | ,-------| open |-------. | recv H | 65 // | | / | | \ | | 66 // | v v +--------+ v v | 67 // | +----------+ | +----------+ | 68 // | | half | | | half | | 69 // | | closed | | send R / | closed | | 70 // | | (remote) | | recv R | (local) | | 71 // | +----------+ | +----------+ | 72 // | | | | | 73 // | | send ES / | recv ES / | | 74 // | | send R / v send R / | | 75 // | | recv R +--------+ recv R | | 76 // | send R / `----------->| |<-----------' send R / | 77 // | recv R | closed | recv R | 78 // `----------------------->| |<----------------------' 79 // +--------+ 80 #[derive(Copy, Clone, Debug)] 81 pub(crate) enum H2StreamState { 82 Idle, 83 // When response does not depend on request, 84 // the server can send response directly without waiting for the request to finish receiving. 85 // Therefore, the sending and receiving states of the client have their own states 86 Open { 87 send: ActiveState, 88 recv: ActiveState, 89 }, 90 #[allow(dead_code)] 91 ReservedRemote, 92 // After the request is sent, the state is waiting for the response to be received. 93 LocalHalfClosed(ActiveState), 94 // When the response is received but the request is not fully sent, 95 // this indicates the status of the request being sent 96 RemoteHalfClosed(ActiveState), 97 Closed(CloseReason), 98 } 99 100 #[derive(Copy, Clone, Debug)] 101 pub(crate) enum CloseReason { 102 LocalRst, 103 RemoteRst, 104 RemoteGoAway, 105 LocalGoAway, 106 EndStream, 107 } 108 109 #[derive(Copy, Clone, Debug)] 110 pub(crate) enum ActiveState { 111 WaitHeaders, 112 WaitData, 113 } 114 115 pub(crate) struct Stream { 116 pub(crate) recv_window: RecvWindow, 117 pub(crate) send_window: SendWindow, 118 pub(crate) state: H2StreamState, 119 pub(crate) header: Option<Frame>, 120 pub(crate) data: BodyDataRef, 121 } 122 123 pub(crate) struct RequestWrapper { 124 pub(crate) header: Frame, 125 pub(crate) data: BodyDataRef, 126 } 127 128 pub(crate) struct Streams { 129 // Records the received goaway last_stream_id. 130 pub(crate) max_send_id: u32, 131 // Records the send goaway last_stream_id. 132 pub(crate) max_recv_id: u32, 133 // Currently the client doesn't support push promise, so this value is always 0. 134 pub(crate) latest_remote_id: u32, 135 pub(crate) stream_recv_window_size: u32, 136 pub(crate) stream_send_window_size: u32, 137 max_concurrent_streams: u32, 138 current_concurrent_streams: u32, 139 flow_control: FlowControl, 140 pending_concurrency: VecDeque<u32>, 141 pending_stream_window: HashSet<u32>, 142 pending_conn_window: VecDeque<u32>, 143 pending_send: VecDeque<u32>, 144 window_updating_streams: VecDeque<u32>, 145 pub(crate) stream_map: HashMap<u32, Stream>, 146 } 147 148 macro_rules! change_stream_state { 149 (Idle: $eos: expr, $state: expr) => { 150 $state = if $eos { 151 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) 152 } else { 153 H2StreamState::Open { 154 send: ActiveState::WaitHeaders, 155 recv: ActiveState::WaitData, 156 } 157 }; 158 }; 159 (Open: $eos: expr, $state: expr, $send: expr) => { 160 $state = if $eos { 161 H2StreamState::RemoteHalfClosed($send.clone()) 162 } else { 163 H2StreamState::Open { 164 send: $send.clone(), 165 recv: ActiveState::WaitData, 166 } 167 }; 168 }; 169 (HalfClosed: $eos: expr, $state: expr) => { 170 $state = if $eos { 171 H2StreamState::Closed(CloseReason::EndStream) 172 } else { 173 H2StreamState::LocalHalfClosed(ActiveState::WaitData) 174 }; 175 }; 176 } 177 178 impl Streams { new( recv_window_size: u32, send_window_size: u32, flow_control: FlowControl, ) -> Self179 pub(crate) fn new( 180 recv_window_size: u32, 181 send_window_size: u32, 182 flow_control: FlowControl, 183 ) -> Self { 184 Self { 185 max_send_id: INITIAL_MAX_SEND_STREAM_ID, 186 max_recv_id: INITIAL_MAX_RECV_STREAM_ID, 187 latest_remote_id: INITIAL_LATEST_REMOTE_ID, 188 max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS, 189 current_concurrent_streams: 0, 190 stream_recv_window_size: recv_window_size, 191 stream_send_window_size: send_window_size, 192 flow_control, 193 pending_concurrency: VecDeque::new(), 194 pending_stream_window: HashSet::new(), 195 pending_conn_window: VecDeque::new(), 196 pending_send: VecDeque::new(), 197 window_updating_streams: VecDeque::new(), 198 stream_map: HashMap::new(), 199 } 200 } 201 decrease_current_concurrency(&mut self)202 pub(crate) fn decrease_current_concurrency(&mut self) { 203 self.current_concurrent_streams -= 1; 204 } 205 increase_current_concurrency(&mut self)206 pub(crate) fn increase_current_concurrency(&mut self) { 207 self.current_concurrent_streams += 1; 208 } 209 reach_max_concurrency(&mut self) -> bool210 pub(crate) fn reach_max_concurrency(&mut self) -> bool { 211 self.current_concurrent_streams >= self.max_concurrent_streams 212 } 213 apply_max_concurrent_streams(&mut self, num: u32)214 pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) { 215 self.max_concurrent_streams = num; 216 } 217 apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error>218 pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> { 219 let current = self.stream_send_window_size; 220 self.stream_send_window_size = size; 221 222 match current.cmp(&size) { 223 Ordering::Less => { 224 let excess = size - current; 225 for (_id, stream) in self.stream_map.iter_mut() { 226 stream.send_window.increase_size(excess)?; 227 } 228 for id in self.pending_stream_window.iter() { 229 self.pending_send.push_back(*id); 230 } 231 self.pending_stream_window.clear(); 232 } 233 Ordering::Greater => { 234 let excess = current - size; 235 for (_id, stream) in self.stream_map.iter_mut() { 236 stream.send_window.reduce_size(excess); 237 } 238 } 239 Ordering::Equal => {} 240 } 241 Ok(()) 242 } 243 apply_recv_initial_window_size(&mut self, size: u32)244 pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) { 245 let current = self.stream_recv_window_size; 246 self.stream_recv_window_size = size; 247 match current.cmp(&size) { 248 Ordering::Less => { 249 for (_id, stream) in self.stream_map.iter_mut() { 250 let extra = size - current; 251 stream.recv_window.increase_notification(extra); 252 stream.recv_window.increase_actual(extra); 253 } 254 } 255 Ordering::Greater => { 256 for (_id, stream) in self.stream_map.iter_mut() { 257 stream.recv_window.reduce_notification(current - size); 258 } 259 } 260 Ordering::Equal => {} 261 } 262 } 263 release_stream_recv_window(&mut self, id: u32, size: u32) -> Result<(), H2Error>264 pub(crate) fn release_stream_recv_window(&mut self, id: u32, size: u32) -> Result<(), H2Error> { 265 if let Some(stream) = self.stream_map.get_mut(&id) { 266 if stream.recv_window.notification_available() < size { 267 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError)); 268 } 269 stream.recv_window.recv_data(size); 270 if stream.recv_window.unreleased_size().is_some() { 271 self.window_updating_streams.push_back(id); 272 } 273 } 274 Ok(()) 275 } 276 release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error>277 pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> { 278 if self.flow_control.recv_notification_size_available() < size { 279 return Err(H2Error::ConnectionError(ErrorCode::FlowControlError)); 280 } 281 self.flow_control.recv_data(size); 282 Ok(()) 283 } 284 is_closed(&self) -> bool285 pub(crate) fn is_closed(&self) -> bool { 286 for (_id, stream) in self.stream_map.iter() { 287 match stream.state { 288 H2StreamState::Closed(_) => {} 289 _ => { 290 return false; 291 } 292 } 293 } 294 true 295 } 296 stream_state(&self, id: u32) -> Option<H2StreamState>297 pub(crate) fn stream_state(&self, id: u32) -> Option<H2StreamState> { 298 self.stream_map.get(&id).map(|stream| stream.state) 299 } 300 insert(&mut self, id: u32, request: RequestWrapper)301 pub(crate) fn insert(&mut self, id: u32, request: RequestWrapper) { 302 let send_window = SendWindow::new(self.stream_send_window_size as i32); 303 let recv_window = RecvWindow::new(self.stream_recv_window_size as i32); 304 305 let stream = Stream::new(recv_window, send_window, request.header, request.data); 306 self.stream_map.insert(id, stream); 307 } 308 push_back_pending_send(&mut self, id: u32)309 pub(crate) fn push_back_pending_send(&mut self, id: u32) { 310 self.pending_send.push_back(id); 311 } 312 push_pending_concurrency(&mut self, id: u32)313 pub(crate) fn push_pending_concurrency(&mut self, id: u32) { 314 self.pending_concurrency.push_back(id); 315 } 316 next_pending_stream(&mut self) -> Option<u32>317 pub(crate) fn next_pending_stream(&mut self) -> Option<u32> { 318 self.pending_send.pop_front() 319 } 320 pending_stream_num(&self) -> usize321 pub(crate) fn pending_stream_num(&self) -> usize { 322 self.pending_send.len() 323 } 324 try_consume_pending_concurrency(&mut self)325 pub(crate) fn try_consume_pending_concurrency(&mut self) { 326 while !self.reach_max_concurrency() { 327 match self.pending_concurrency.pop_front() { 328 None => { 329 return; 330 } 331 Some(id) => { 332 self.increase_current_concurrency(); 333 self.push_back_pending_send(id); 334 } 335 } 336 } 337 } 338 increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error>339 pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> { 340 self.flow_control.increase_send_size(size) 341 } 342 reassign_conn_send_window(&mut self)343 pub(crate) fn reassign_conn_send_window(&mut self) { 344 // Since the data structure of the body is a stream, 345 // the size of a body cannot be obtained, 346 // so all streams in pending_conn_window are added to the pending_send queue 347 // again. 348 loop { 349 match self.pending_conn_window.pop_front() { 350 None => break, 351 Some(id) => { 352 self.push_back_pending_send(id); 353 } 354 } 355 } 356 } 357 reassign_stream_send_window( &mut self, id: u32, size: u32, ) -> Result<(), H2Error>358 pub(crate) fn reassign_stream_send_window( 359 &mut self, 360 id: u32, 361 size: u32, 362 ) -> Result<(), H2Error> { 363 if let Some(stream) = self.stream_map.get_mut(&id) { 364 stream.send_window.increase_size(size)?; 365 } 366 if self.pending_stream_window.take(&id).is_some() { 367 self.pending_send.push_back(id); 368 } 369 Ok(()) 370 } 371 window_update_conn( &mut self, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>372 pub(crate) fn window_update_conn( 373 &mut self, 374 sender: &UnboundedSender<Frame>, 375 ) -> Result<(), DispatchErrorKind> { 376 if let Some(window_update) = self.flow_control.check_conn_recv_window_update() { 377 sender 378 .send(window_update) 379 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 380 } 381 Ok(()) 382 } 383 window_update_streams( &mut self, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>384 pub(crate) fn window_update_streams( 385 &mut self, 386 sender: &UnboundedSender<Frame>, 387 ) -> Result<(), DispatchErrorKind> { 388 loop { 389 match self.window_updating_streams.pop_front() { 390 None => return Ok(()), 391 Some(id) => { 392 if let Some(stream) = self.stream_map.get_mut(&id) { 393 if !stream.is_init_or_active_flow_control() { 394 return Ok(()); 395 } 396 if let Some(window_update) = stream.recv_window.check_window_update(id) { 397 sender 398 .send(window_update) 399 .map_err(|_e| DispatchErrorKind::ChannelClosed)?; 400 } 401 } 402 } 403 } 404 } 405 } 406 headers(&mut self, id: u32) -> Result<Option<Frame>, H2Error>407 pub(crate) fn headers(&mut self, id: u32) -> Result<Option<Frame>, H2Error> { 408 match self.stream_map.get_mut(&id) { 409 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 410 Some(stream) => match stream.state { 411 H2StreamState::Closed(_) => Ok(None), 412 _ => Ok(stream.header.take()), 413 }, 414 } 415 } 416 poll_read_body( &mut self, cx: &mut Context<'_>, id: u32, ) -> Result<DataReadState, H2Error>417 pub(crate) fn poll_read_body( 418 &mut self, 419 cx: &mut Context<'_>, 420 id: u32, 421 ) -> Result<DataReadState, H2Error> { 422 // TODO Since the Array length needs to be a constant, 423 // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE 424 // updated in SETTINGS 425 const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024; 426 427 match self.stream_map.get_mut(&id) { 428 None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)), 429 Some(stream) => match stream.state { 430 H2StreamState::Closed(_) => Ok(DataReadState::Closed), 431 _ => { 432 let stream_send_vacant = stream.send_window.size_available() as usize; 433 if stream_send_vacant == 0 { 434 self.pending_stream_window.insert(id); 435 return Ok(DataReadState::Pending); 436 } 437 let conn_send_vacant = self.flow_control.send_size_available(); 438 if conn_send_vacant == 0 { 439 self.pending_conn_window.push_back(id); 440 return Ok(DataReadState::Pending); 441 } 442 443 let available = min(stream_send_vacant, conn_send_vacant); 444 let len = min(available, DEFAULT_MAX_FRAME_SIZE); 445 446 let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE]; 447 self.poll_sized_data(cx, id, &mut buf[..len]) 448 } 449 }, 450 } 451 } 452 poll_sized_data( &mut self, cx: &mut Context<'_>, id: u32, buf: &mut [u8], ) -> Result<DataReadState, H2Error>453 fn poll_sized_data( 454 &mut self, 455 cx: &mut Context<'_>, 456 id: u32, 457 buf: &mut [u8], 458 ) -> Result<DataReadState, H2Error> { 459 let stream = if let Some(stream) = self.stream_map.get_mut(&id) { 460 stream 461 } else { 462 return Err(H2Error::ConnectionError(ErrorCode::IntervalError)); 463 }; 464 match stream.data.poll_read(cx, buf)? { 465 Poll::Ready(size) => { 466 if size > 0 { 467 stream.send_window.send_data(size as u32); 468 self.flow_control.send_data(size as u32); 469 let data_vec = Vec::from(&buf[..size]); 470 let flag = FrameFlags::new(0); 471 472 Ok(DataReadState::Ready(Frame::new( 473 id as usize, 474 flag, 475 Payload::Data(Data::new(data_vec)), 476 ))) 477 } else { 478 let data_vec = vec![]; 479 let mut flag = FrameFlags::new(1); 480 flag.set_end_stream(true); 481 Ok(DataReadState::Finish(Frame::new( 482 id as usize, 483 flag, 484 Payload::Data(Data::new(data_vec)), 485 ))) 486 } 487 } 488 Poll::Pending => { 489 self.push_back_pending_send(id); 490 Ok(DataReadState::Pending) 491 } 492 } 493 } 494 get_go_away_streams(&mut self, last_stream_id: u32) -> Vec<u32>495 pub(crate) fn get_go_away_streams(&mut self, last_stream_id: u32) -> Vec<u32> { 496 let mut ids = vec![]; 497 for (id, unsent_stream) in self.stream_map.iter_mut() { 498 if *id >= last_stream_id { 499 match unsent_stream.state { 500 // TODO Whether the close state needs to be selected. 501 H2StreamState::Closed(_) => {} 502 H2StreamState::Idle => { 503 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 504 unsent_stream.header = None; 505 unsent_stream.data.clear(); 506 } 507 _ => { 508 self.current_concurrent_streams -= 1; 509 unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); 510 unsent_stream.header = None; 511 unsent_stream.data.clear(); 512 } 513 }; 514 ids.push(*id); 515 } 516 } 517 ids 518 } 519 get_all_unclosed_streams(&mut self) -> Vec<u32>520 pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<u32> { 521 let mut ids = vec![]; 522 for (id, stream) in self.stream_map.iter_mut() { 523 match stream.state { 524 H2StreamState::Closed(_) => {} 525 _ => { 526 stream.header = None; 527 stream.data.clear(); 528 stream.state = H2StreamState::Closed(CloseReason::LocalGoAway); 529 ids.push(*id); 530 } 531 } 532 } 533 ids 534 } 535 clear_streams_states(&mut self)536 pub(crate) fn clear_streams_states(&mut self) { 537 self.window_updating_streams.clear(); 538 self.pending_stream_window.clear(); 539 self.pending_send.clear(); 540 self.pending_conn_window.clear(); 541 self.pending_concurrency.clear(); 542 } 543 send_local_reset(&mut self, id: u32) -> StreamEndState544 pub(crate) fn send_local_reset(&mut self, id: u32) -> StreamEndState { 545 return match self.stream_map.get_mut(&id) { 546 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 547 Some(stream) => match stream.state { 548 H2StreamState::Closed( 549 CloseReason::LocalRst 550 | CloseReason::LocalGoAway 551 | CloseReason::RemoteRst 552 | CloseReason::RemoteGoAway, 553 ) => StreamEndState::Ignore, 554 H2StreamState::Closed(CloseReason::EndStream) => { 555 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 556 StreamEndState::Ignore 557 } 558 _ => { 559 stream.state = H2StreamState::Closed(CloseReason::LocalRst); 560 stream.header = None; 561 stream.data.clear(); 562 self.decrease_current_concurrency(); 563 StreamEndState::OK 564 } 565 }, 566 }; 567 } 568 send_headers_frame(&mut self, id: u32, eos: bool) -> FrameRecvState569 pub(crate) fn send_headers_frame(&mut self, id: u32, eos: bool) -> FrameRecvState { 570 match self.stream_map.get_mut(&id) { 571 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 572 Some(stream) => match &stream.state { 573 H2StreamState::Idle => { 574 stream.state = if eos { 575 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) 576 } else { 577 H2StreamState::Open { 578 send: ActiveState::WaitData, 579 recv: ActiveState::WaitHeaders, 580 } 581 }; 582 } 583 H2StreamState::Open { 584 send: ActiveState::WaitHeaders, 585 recv, 586 } => { 587 stream.state = if eos { 588 H2StreamState::LocalHalfClosed(*recv) 589 } else { 590 H2StreamState::Open { 591 send: ActiveState::WaitData, 592 recv: *recv, 593 } 594 }; 595 } 596 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => { 597 stream.state = if eos { 598 self.current_concurrent_streams -= 1; 599 H2StreamState::Closed(CloseReason::EndStream) 600 } else { 601 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) 602 }; 603 } 604 _ => { 605 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 606 } 607 }, 608 } 609 FrameRecvState::OK 610 } 611 send_data_frame(&mut self, id: u32, eos: bool) -> FrameRecvState612 pub(crate) fn send_data_frame(&mut self, id: u32, eos: bool) -> FrameRecvState { 613 match self.stream_map.get_mut(&id) { 614 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 615 Some(stream) => match &stream.state { 616 H2StreamState::Open { 617 send: ActiveState::WaitData, 618 recv, 619 } => { 620 if eos { 621 stream.state = H2StreamState::LocalHalfClosed(*recv); 622 } 623 } 624 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => { 625 if eos { 626 self.current_concurrent_streams -= 1; 627 stream.state = H2StreamState::Closed(CloseReason::EndStream); 628 } 629 } 630 _ => { 631 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 632 } 633 }, 634 } 635 FrameRecvState::OK 636 } 637 recv_remote_reset(&mut self, id: u32) -> StreamEndState638 pub(crate) fn recv_remote_reset(&mut self, id: u32) -> StreamEndState { 639 if id > self.max_recv_id { 640 return StreamEndState::Ignore; 641 } 642 return match self.stream_map.get_mut(&id) { 643 None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 644 Some(stream) => match stream.state { 645 H2StreamState::Closed(..) => StreamEndState::Ignore, 646 _ => { 647 stream.state = H2StreamState::Closed(CloseReason::RemoteRst); 648 stream.header = None; 649 stream.data.clear(); 650 self.decrease_current_concurrency(); 651 StreamEndState::OK 652 } 653 }, 654 }; 655 } 656 recv_headers(&mut self, id: u32, eos: bool) -> FrameRecvState657 pub(crate) fn recv_headers(&mut self, id: u32, eos: bool) -> FrameRecvState { 658 if id > self.max_recv_id { 659 return FrameRecvState::Ignore; 660 } 661 662 match self.stream_map.get_mut(&id) { 663 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 664 Some(stream) => match &stream.state { 665 H2StreamState::Idle => { 666 change_stream_state!(Idle: eos, stream.state); 667 } 668 H2StreamState::ReservedRemote => { 669 change_stream_state!(HalfClosed: eos, stream.state); 670 if eos { 671 self.decrease_current_concurrency(); 672 } 673 } 674 H2StreamState::Open { 675 send, 676 recv: ActiveState::WaitHeaders, 677 } => { 678 change_stream_state!(Open: eos, stream.state, send); 679 } 680 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => { 681 change_stream_state!(HalfClosed: eos, stream.state); 682 if eos { 683 self.decrease_current_concurrency(); 684 } 685 } 686 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 687 return FrameRecvState::Ignore; 688 } 689 _ => { 690 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 691 } 692 }, 693 } 694 FrameRecvState::OK 695 } 696 recv_data(&mut self, id: u32, eos: bool) -> FrameRecvState697 pub(crate) fn recv_data(&mut self, id: u32, eos: bool) -> FrameRecvState { 698 if id > self.max_recv_id { 699 return FrameRecvState::Ignore; 700 } 701 match self.stream_map.get_mut(&id) { 702 None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)), 703 Some(stream) => match &stream.state { 704 H2StreamState::Open { 705 send, 706 recv: ActiveState::WaitData, 707 } => { 708 if eos { 709 stream.state = H2StreamState::RemoteHalfClosed(*send); 710 } 711 } 712 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => { 713 if eos { 714 stream.state = H2StreamState::Closed(CloseReason::EndStream); 715 self.decrease_current_concurrency(); 716 } 717 } 718 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => { 719 return FrameRecvState::Ignore; 720 } 721 _ => { 722 return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); 723 } 724 }, 725 } 726 FrameRecvState::OK 727 } 728 } 729 730 impl Stream { new( recv_window: RecvWindow, send_window: SendWindow, headers: Frame, data: BodyDataRef, ) -> Self731 pub(crate) fn new( 732 recv_window: RecvWindow, 733 send_window: SendWindow, 734 headers: Frame, 735 data: BodyDataRef, 736 ) -> Self { 737 Self { 738 recv_window, 739 send_window, 740 state: H2StreamState::Idle, 741 header: Some(headers), 742 data, 743 } 744 } 745 is_init_or_active_flow_control(&self) -> bool746 pub(crate) fn is_init_or_active_flow_control(&self) -> bool { 747 matches!( 748 self.state, 749 H2StreamState::Idle 750 | H2StreamState::Open { 751 recv: ActiveState::WaitData, 752 .. 753 } 754 | H2StreamState::LocalHalfClosed(ActiveState::WaitData) 755 ) 756 } 757 } 758