1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Streams operations utils.
15 
16 use std::cmp::{min, Ordering};
17 use std::collections::{HashMap, HashSet, VecDeque};
18 use std::task::{Context, Poll};
19 
20 use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload};
21 
22 use crate::runtime::UnboundedSender;
23 use crate::util::dispatcher::http2::DispatchErrorKind;
24 use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
25 use crate::util::h2::data_ref::BodyDataRef;
26 
27 pub(crate) const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1;
28 pub(crate) const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1;
29 const INITIAL_LATEST_REMOTE_ID: u32 = 0;
30 const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
31 
32 pub(crate) enum FrameRecvState {
33     OK,
34     Ignore,
35     Err(H2Error),
36 }
37 
38 pub(crate) enum DataReadState {
39     Closed,
40     // Wait for poll_read or wait for window.
41     Pending,
42     Ready(Frame),
43     Finish(Frame),
44 }
45 
46 pub(crate) enum StreamEndState {
47     OK,
48     Ignore,
49     Err(H2Error),
50 }
51 
52 //                              +--------+
53 //                      send PP |        | recv PP
54 //                     ,--------|  idle  |--------.
55 //                    /         |        |         \
56 //                   v          +--------+          v
57 //            +----------+          |           +----------+
58 //            |          |          | send H /  |          |
59 //     ,------| reserved |          | recv H    | reserved |------.
60 //     |      | (local)  |          |           | (remote) |      |
61 //     |      +----------+          v           +----------+      |
62 //     |          |             +--------+             |          |
63 //     |          |     recv ES |        | send ES     |          |
64 //     |   send H |     ,-------|  open  |-------.     | recv H   |
65 //     |          |    /        |        |        \    |          |
66 //     |          v   v         +--------+         v   v          |
67 //     |      +----------+          |           +----------+      |
68 //     |      |   half   |          |           |   half   |      |
69 //     |      |  closed  |          | send R /  |  closed  |      |
70 //     |      | (remote) |          | recv R    | (local)  |      |
71 //     |      +----------+          |           +----------+      |
72 //     |           |                |                 |           |
73 //     |           | send ES /      |       recv ES / |           |
74 //     |           | send R /       v        send R / |           |
75 //     |           | recv R     +--------+   recv R   |           |
76 //     | send R /  `----------->|        |<-----------'  send R / |
77 //     | recv R                 | closed |               recv R   |
78 //     `----------------------->|        |<----------------------'
79 //                              +--------+
80 #[derive(Copy, Clone, Debug)]
81 pub(crate) enum H2StreamState {
82     Idle,
83     // When response does not depend on request,
84     // the server can send response directly without waiting for the request to finish receiving.
85     // Therefore, the sending and receiving states of the client have their own states
86     Open {
87         send: ActiveState,
88         recv: ActiveState,
89     },
90     #[allow(dead_code)]
91     ReservedRemote,
92     // After the request is sent, the state is waiting for the response to be received.
93     LocalHalfClosed(ActiveState),
94     // When the response is received but the request is not fully sent,
95     // this indicates the status of the request being sent
96     RemoteHalfClosed(ActiveState),
97     Closed(CloseReason),
98 }
99 
100 #[derive(Copy, Clone, Debug)]
101 pub(crate) enum CloseReason {
102     LocalRst,
103     RemoteRst,
104     RemoteGoAway,
105     LocalGoAway,
106     EndStream,
107 }
108 
109 #[derive(Copy, Clone, Debug)]
110 pub(crate) enum ActiveState {
111     WaitHeaders,
112     WaitData,
113 }
114 
115 pub(crate) struct Stream {
116     pub(crate) recv_window: RecvWindow,
117     pub(crate) send_window: SendWindow,
118     pub(crate) state: H2StreamState,
119     pub(crate) header: Option<Frame>,
120     pub(crate) data: BodyDataRef,
121 }
122 
123 pub(crate) struct RequestWrapper {
124     pub(crate) header: Frame,
125     pub(crate) data: BodyDataRef,
126 }
127 
128 pub(crate) struct Streams {
129     // Records the received goaway last_stream_id.
130     pub(crate) max_send_id: u32,
131     // Records the send goaway last_stream_id.
132     pub(crate) max_recv_id: u32,
133     // Currently the client doesn't support push promise, so this value is always 0.
134     pub(crate) latest_remote_id: u32,
135     pub(crate) stream_recv_window_size: u32,
136     pub(crate) stream_send_window_size: u32,
137     max_concurrent_streams: u32,
138     current_concurrent_streams: u32,
139     flow_control: FlowControl,
140     pending_concurrency: VecDeque<u32>,
141     pending_stream_window: HashSet<u32>,
142     pending_conn_window: VecDeque<u32>,
143     pending_send: VecDeque<u32>,
144     window_updating_streams: VecDeque<u32>,
145     pub(crate) stream_map: HashMap<u32, Stream>,
146 }
147 
148 macro_rules! change_stream_state {
149     (Idle: $eos: expr, $state: expr) => {
150         $state = if $eos {
151             H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
152         } else {
153             H2StreamState::Open {
154                 send: ActiveState::WaitHeaders,
155                 recv: ActiveState::WaitData,
156             }
157         };
158     };
159     (Open: $eos: expr, $state: expr, $send: expr) => {
160         $state = if $eos {
161             H2StreamState::RemoteHalfClosed($send.clone())
162         } else {
163             H2StreamState::Open {
164                 send: $send.clone(),
165                 recv: ActiveState::WaitData,
166             }
167         };
168     };
169     (HalfClosed: $eos: expr, $state: expr) => {
170         $state = if $eos {
171             H2StreamState::Closed(CloseReason::EndStream)
172         } else {
173             H2StreamState::LocalHalfClosed(ActiveState::WaitData)
174         };
175     };
176 }
177 
178 impl Streams {
new( recv_window_size: u32, send_window_size: u32, flow_control: FlowControl, ) -> Self179     pub(crate) fn new(
180         recv_window_size: u32,
181         send_window_size: u32,
182         flow_control: FlowControl,
183     ) -> Self {
184         Self {
185             max_send_id: INITIAL_MAX_SEND_STREAM_ID,
186             max_recv_id: INITIAL_MAX_RECV_STREAM_ID,
187             latest_remote_id: INITIAL_LATEST_REMOTE_ID,
188             max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
189             current_concurrent_streams: 0,
190             stream_recv_window_size: recv_window_size,
191             stream_send_window_size: send_window_size,
192             flow_control,
193             pending_concurrency: VecDeque::new(),
194             pending_stream_window: HashSet::new(),
195             pending_conn_window: VecDeque::new(),
196             pending_send: VecDeque::new(),
197             window_updating_streams: VecDeque::new(),
198             stream_map: HashMap::new(),
199         }
200     }
201 
decrease_current_concurrency(&mut self)202     pub(crate) fn decrease_current_concurrency(&mut self) {
203         self.current_concurrent_streams -= 1;
204     }
205 
increase_current_concurrency(&mut self)206     pub(crate) fn increase_current_concurrency(&mut self) {
207         self.current_concurrent_streams += 1;
208     }
209 
reach_max_concurrency(&mut self) -> bool210     pub(crate) fn reach_max_concurrency(&mut self) -> bool {
211         self.current_concurrent_streams >= self.max_concurrent_streams
212     }
213 
apply_max_concurrent_streams(&mut self, num: u32)214     pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) {
215         self.max_concurrent_streams = num;
216     }
217 
apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error>218     pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
219         let current = self.stream_send_window_size;
220         self.stream_send_window_size = size;
221 
222         match current.cmp(&size) {
223             Ordering::Less => {
224                 let excess = size - current;
225                 for (_id, stream) in self.stream_map.iter_mut() {
226                     stream.send_window.increase_size(excess)?;
227                 }
228                 for id in self.pending_stream_window.iter() {
229                     self.pending_send.push_back(*id);
230                 }
231                 self.pending_stream_window.clear();
232             }
233             Ordering::Greater => {
234                 let excess = current - size;
235                 for (_id, stream) in self.stream_map.iter_mut() {
236                     stream.send_window.reduce_size(excess);
237                 }
238             }
239             Ordering::Equal => {}
240         }
241         Ok(())
242     }
243 
apply_recv_initial_window_size(&mut self, size: u32)244     pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) {
245         let current = self.stream_recv_window_size;
246         self.stream_recv_window_size = size;
247         match current.cmp(&size) {
248             Ordering::Less => {
249                 for (_id, stream) in self.stream_map.iter_mut() {
250                     let extra = size - current;
251                     stream.recv_window.increase_notification(extra);
252                     stream.recv_window.increase_actual(extra);
253                 }
254             }
255             Ordering::Greater => {
256                 for (_id, stream) in self.stream_map.iter_mut() {
257                     stream.recv_window.reduce_notification(current - size);
258                 }
259             }
260             Ordering::Equal => {}
261         }
262     }
263 
release_stream_recv_window(&mut self, id: u32, size: u32) -> Result<(), H2Error>264     pub(crate) fn release_stream_recv_window(&mut self, id: u32, size: u32) -> Result<(), H2Error> {
265         if let Some(stream) = self.stream_map.get_mut(&id) {
266             if stream.recv_window.notification_available() < size {
267                 return Err(H2Error::StreamError(id, ErrorCode::FlowControlError));
268             }
269             stream.recv_window.recv_data(size);
270             if stream.recv_window.unreleased_size().is_some() {
271                 self.window_updating_streams.push_back(id);
272             }
273         }
274         Ok(())
275     }
276 
release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error>277     pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> {
278         if self.flow_control.recv_notification_size_available() < size {
279             return Err(H2Error::ConnectionError(ErrorCode::FlowControlError));
280         }
281         self.flow_control.recv_data(size);
282         Ok(())
283     }
284 
is_closed(&self) -> bool285     pub(crate) fn is_closed(&self) -> bool {
286         for (_id, stream) in self.stream_map.iter() {
287             match stream.state {
288                 H2StreamState::Closed(_) => {}
289                 _ => {
290                     return false;
291                 }
292             }
293         }
294         true
295     }
296 
stream_state(&self, id: u32) -> Option<H2StreamState>297     pub(crate) fn stream_state(&self, id: u32) -> Option<H2StreamState> {
298         self.stream_map.get(&id).map(|stream| stream.state)
299     }
300 
insert(&mut self, id: u32, request: RequestWrapper)301     pub(crate) fn insert(&mut self, id: u32, request: RequestWrapper) {
302         let send_window = SendWindow::new(self.stream_send_window_size as i32);
303         let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
304 
305         let stream = Stream::new(recv_window, send_window, request.header, request.data);
306         self.stream_map.insert(id, stream);
307     }
308 
push_back_pending_send(&mut self, id: u32)309     pub(crate) fn push_back_pending_send(&mut self, id: u32) {
310         self.pending_send.push_back(id);
311     }
312 
push_pending_concurrency(&mut self, id: u32)313     pub(crate) fn push_pending_concurrency(&mut self, id: u32) {
314         self.pending_concurrency.push_back(id);
315     }
316 
next_pending_stream(&mut self) -> Option<u32>317     pub(crate) fn next_pending_stream(&mut self) -> Option<u32> {
318         self.pending_send.pop_front()
319     }
320 
pending_stream_num(&self) -> usize321     pub(crate) fn pending_stream_num(&self) -> usize {
322         self.pending_send.len()
323     }
324 
try_consume_pending_concurrency(&mut self)325     pub(crate) fn try_consume_pending_concurrency(&mut self) {
326         while !self.reach_max_concurrency() {
327             match self.pending_concurrency.pop_front() {
328                 None => {
329                     return;
330                 }
331                 Some(id) => {
332                     self.increase_current_concurrency();
333                     self.push_back_pending_send(id);
334                 }
335             }
336         }
337     }
338 
increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error>339     pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> {
340         self.flow_control.increase_send_size(size)
341     }
342 
reassign_conn_send_window(&mut self)343     pub(crate) fn reassign_conn_send_window(&mut self) {
344         // Since the data structure of the body is a stream,
345         // the size of a body cannot be obtained,
346         // so all streams in pending_conn_window are added to the pending_send queue
347         // again.
348         loop {
349             match self.pending_conn_window.pop_front() {
350                 None => break,
351                 Some(id) => {
352                     self.push_back_pending_send(id);
353                 }
354             }
355         }
356     }
357 
reassign_stream_send_window( &mut self, id: u32, size: u32, ) -> Result<(), H2Error>358     pub(crate) fn reassign_stream_send_window(
359         &mut self,
360         id: u32,
361         size: u32,
362     ) -> Result<(), H2Error> {
363         if let Some(stream) = self.stream_map.get_mut(&id) {
364             stream.send_window.increase_size(size)?;
365         }
366         if self.pending_stream_window.take(&id).is_some() {
367             self.pending_send.push_back(id);
368         }
369         Ok(())
370     }
371 
window_update_conn( &mut self, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>372     pub(crate) fn window_update_conn(
373         &mut self,
374         sender: &UnboundedSender<Frame>,
375     ) -> Result<(), DispatchErrorKind> {
376         if let Some(window_update) = self.flow_control.check_conn_recv_window_update() {
377             sender
378                 .send(window_update)
379                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
380         }
381         Ok(())
382     }
383 
window_update_streams( &mut self, sender: &UnboundedSender<Frame>, ) -> Result<(), DispatchErrorKind>384     pub(crate) fn window_update_streams(
385         &mut self,
386         sender: &UnboundedSender<Frame>,
387     ) -> Result<(), DispatchErrorKind> {
388         loop {
389             match self.window_updating_streams.pop_front() {
390                 None => return Ok(()),
391                 Some(id) => {
392                     if let Some(stream) = self.stream_map.get_mut(&id) {
393                         if !stream.is_init_or_active_flow_control() {
394                             return Ok(());
395                         }
396                         if let Some(window_update) = stream.recv_window.check_window_update(id) {
397                             sender
398                                 .send(window_update)
399                                 .map_err(|_e| DispatchErrorKind::ChannelClosed)?;
400                         }
401                     }
402                 }
403             }
404         }
405     }
406 
headers(&mut self, id: u32) -> Result<Option<Frame>, H2Error>407     pub(crate) fn headers(&mut self, id: u32) -> Result<Option<Frame>, H2Error> {
408         match self.stream_map.get_mut(&id) {
409             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
410             Some(stream) => match stream.state {
411                 H2StreamState::Closed(_) => Ok(None),
412                 _ => Ok(stream.header.take()),
413             },
414         }
415     }
416 
poll_read_body( &mut self, cx: &mut Context<'_>, id: u32, ) -> Result<DataReadState, H2Error>417     pub(crate) fn poll_read_body(
418         &mut self,
419         cx: &mut Context<'_>,
420         id: u32,
421     ) -> Result<DataReadState, H2Error> {
422         // TODO Since the Array length needs to be a constant,
423         // the minimum value is used here, which can be optimized to the MAX_FRAME_SIZE
424         // updated in SETTINGS
425         const DEFAULT_MAX_FRAME_SIZE: usize = 16 * 1024;
426 
427         match self.stream_map.get_mut(&id) {
428             None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),
429             Some(stream) => match stream.state {
430                 H2StreamState::Closed(_) => Ok(DataReadState::Closed),
431                 _ => {
432                     let stream_send_vacant = stream.send_window.size_available() as usize;
433                     if stream_send_vacant == 0 {
434                         self.pending_stream_window.insert(id);
435                         return Ok(DataReadState::Pending);
436                     }
437                     let conn_send_vacant = self.flow_control.send_size_available();
438                     if conn_send_vacant == 0 {
439                         self.pending_conn_window.push_back(id);
440                         return Ok(DataReadState::Pending);
441                     }
442 
443                     let available = min(stream_send_vacant, conn_send_vacant);
444                     let len = min(available, DEFAULT_MAX_FRAME_SIZE);
445 
446                     let mut buf = [0u8; DEFAULT_MAX_FRAME_SIZE];
447                     self.poll_sized_data(cx, id, &mut buf[..len])
448                 }
449             },
450         }
451     }
452 
poll_sized_data( &mut self, cx: &mut Context<'_>, id: u32, buf: &mut [u8], ) -> Result<DataReadState, H2Error>453     fn poll_sized_data(
454         &mut self,
455         cx: &mut Context<'_>,
456         id: u32,
457         buf: &mut [u8],
458     ) -> Result<DataReadState, H2Error> {
459         let stream = if let Some(stream) = self.stream_map.get_mut(&id) {
460             stream
461         } else {
462             return Err(H2Error::ConnectionError(ErrorCode::IntervalError));
463         };
464         match stream.data.poll_read(cx, buf)? {
465             Poll::Ready(size) => {
466                 if size > 0 {
467                     stream.send_window.send_data(size as u32);
468                     self.flow_control.send_data(size as u32);
469                     let data_vec = Vec::from(&buf[..size]);
470                     let flag = FrameFlags::new(0);
471 
472                     Ok(DataReadState::Ready(Frame::new(
473                         id as usize,
474                         flag,
475                         Payload::Data(Data::new(data_vec)),
476                     )))
477                 } else {
478                     let data_vec = vec![];
479                     let mut flag = FrameFlags::new(1);
480                     flag.set_end_stream(true);
481                     Ok(DataReadState::Finish(Frame::new(
482                         id as usize,
483                         flag,
484                         Payload::Data(Data::new(data_vec)),
485                     )))
486                 }
487             }
488             Poll::Pending => {
489                 self.push_back_pending_send(id);
490                 Ok(DataReadState::Pending)
491             }
492         }
493     }
494 
get_go_away_streams(&mut self, last_stream_id: u32) -> Vec<u32>495     pub(crate) fn get_go_away_streams(&mut self, last_stream_id: u32) -> Vec<u32> {
496         let mut ids = vec![];
497         for (id, unsent_stream) in self.stream_map.iter_mut() {
498             if *id >= last_stream_id {
499                 match unsent_stream.state {
500                     // TODO Whether the close state needs to be selected.
501                     H2StreamState::Closed(_) => {}
502                     H2StreamState::Idle => {
503                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
504                         unsent_stream.header = None;
505                         unsent_stream.data.clear();
506                     }
507                     _ => {
508                         self.current_concurrent_streams -= 1;
509                         unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway);
510                         unsent_stream.header = None;
511                         unsent_stream.data.clear();
512                     }
513                 };
514                 ids.push(*id);
515             }
516         }
517         ids
518     }
519 
get_all_unclosed_streams(&mut self) -> Vec<u32>520     pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec<u32> {
521         let mut ids = vec![];
522         for (id, stream) in self.stream_map.iter_mut() {
523             match stream.state {
524                 H2StreamState::Closed(_) => {}
525                 _ => {
526                     stream.header = None;
527                     stream.data.clear();
528                     stream.state = H2StreamState::Closed(CloseReason::LocalGoAway);
529                     ids.push(*id);
530                 }
531             }
532         }
533         ids
534     }
535 
clear_streams_states(&mut self)536     pub(crate) fn clear_streams_states(&mut self) {
537         self.window_updating_streams.clear();
538         self.pending_stream_window.clear();
539         self.pending_send.clear();
540         self.pending_conn_window.clear();
541         self.pending_concurrency.clear();
542     }
543 
send_local_reset(&mut self, id: u32) -> StreamEndState544     pub(crate) fn send_local_reset(&mut self, id: u32) -> StreamEndState {
545         return match self.stream_map.get_mut(&id) {
546             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
547             Some(stream) => match stream.state {
548                 H2StreamState::Closed(
549                     CloseReason::LocalRst
550                     | CloseReason::LocalGoAway
551                     | CloseReason::RemoteRst
552                     | CloseReason::RemoteGoAway,
553                 ) => StreamEndState::Ignore,
554                 H2StreamState::Closed(CloseReason::EndStream) => {
555                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
556                     StreamEndState::Ignore
557                 }
558                 _ => {
559                     stream.state = H2StreamState::Closed(CloseReason::LocalRst);
560                     stream.header = None;
561                     stream.data.clear();
562                     self.decrease_current_concurrency();
563                     StreamEndState::OK
564                 }
565             },
566         };
567     }
568 
send_headers_frame(&mut self, id: u32, eos: bool) -> FrameRecvState569     pub(crate) fn send_headers_frame(&mut self, id: u32, eos: bool) -> FrameRecvState {
570         match self.stream_map.get_mut(&id) {
571             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
572             Some(stream) => match &stream.state {
573                 H2StreamState::Idle => {
574                     stream.state = if eos {
575                         H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders)
576                     } else {
577                         H2StreamState::Open {
578                             send: ActiveState::WaitData,
579                             recv: ActiveState::WaitHeaders,
580                         }
581                     };
582                 }
583                 H2StreamState::Open {
584                     send: ActiveState::WaitHeaders,
585                     recv,
586                 } => {
587                     stream.state = if eos {
588                         H2StreamState::LocalHalfClosed(*recv)
589                     } else {
590                         H2StreamState::Open {
591                             send: ActiveState::WaitData,
592                             recv: *recv,
593                         }
594                     };
595                 }
596                 H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders) => {
597                     stream.state = if eos {
598                         self.current_concurrent_streams -= 1;
599                         H2StreamState::Closed(CloseReason::EndStream)
600                     } else {
601                         H2StreamState::RemoteHalfClosed(ActiveState::WaitData)
602                     };
603                 }
604                 _ => {
605                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
606                 }
607             },
608         }
609         FrameRecvState::OK
610     }
611 
send_data_frame(&mut self, id: u32, eos: bool) -> FrameRecvState612     pub(crate) fn send_data_frame(&mut self, id: u32, eos: bool) -> FrameRecvState {
613         match self.stream_map.get_mut(&id) {
614             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
615             Some(stream) => match &stream.state {
616                 H2StreamState::Open {
617                     send: ActiveState::WaitData,
618                     recv,
619                 } => {
620                     if eos {
621                         stream.state = H2StreamState::LocalHalfClosed(*recv);
622                     }
623                 }
624                 H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => {
625                     if eos {
626                         self.current_concurrent_streams -= 1;
627                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
628                     }
629                 }
630                 _ => {
631                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
632                 }
633             },
634         }
635         FrameRecvState::OK
636     }
637 
recv_remote_reset(&mut self, id: u32) -> StreamEndState638     pub(crate) fn recv_remote_reset(&mut self, id: u32) -> StreamEndState {
639         if id > self.max_recv_id {
640             return StreamEndState::Ignore;
641         }
642         return match self.stream_map.get_mut(&id) {
643             None => StreamEndState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
644             Some(stream) => match stream.state {
645                 H2StreamState::Closed(..) => StreamEndState::Ignore,
646                 _ => {
647                     stream.state = H2StreamState::Closed(CloseReason::RemoteRst);
648                     stream.header = None;
649                     stream.data.clear();
650                     self.decrease_current_concurrency();
651                     StreamEndState::OK
652                 }
653             },
654         };
655     }
656 
recv_headers(&mut self, id: u32, eos: bool) -> FrameRecvState657     pub(crate) fn recv_headers(&mut self, id: u32, eos: bool) -> FrameRecvState {
658         if id > self.max_recv_id {
659             return FrameRecvState::Ignore;
660         }
661 
662         match self.stream_map.get_mut(&id) {
663             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
664             Some(stream) => match &stream.state {
665                 H2StreamState::Idle => {
666                     change_stream_state!(Idle: eos, stream.state);
667                 }
668                 H2StreamState::ReservedRemote => {
669                     change_stream_state!(HalfClosed: eos, stream.state);
670                     if eos {
671                         self.decrease_current_concurrency();
672                     }
673                 }
674                 H2StreamState::Open {
675                     send,
676                     recv: ActiveState::WaitHeaders,
677                 } => {
678                     change_stream_state!(Open: eos, stream.state, send);
679                 }
680                 H2StreamState::LocalHalfClosed(ActiveState::WaitHeaders) => {
681                     change_stream_state!(HalfClosed: eos, stream.state);
682                     if eos {
683                         self.decrease_current_concurrency();
684                     }
685                 }
686                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
687                     return FrameRecvState::Ignore;
688                 }
689                 _ => {
690                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
691                 }
692             },
693         }
694         FrameRecvState::OK
695     }
696 
recv_data(&mut self, id: u32, eos: bool) -> FrameRecvState697     pub(crate) fn recv_data(&mut self, id: u32, eos: bool) -> FrameRecvState {
698         if id > self.max_recv_id {
699             return FrameRecvState::Ignore;
700         }
701         match self.stream_map.get_mut(&id) {
702             None => return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError)),
703             Some(stream) => match &stream.state {
704                 H2StreamState::Open {
705                     send,
706                     recv: ActiveState::WaitData,
707                 } => {
708                     if eos {
709                         stream.state = H2StreamState::RemoteHalfClosed(*send);
710                     }
711                 }
712                 H2StreamState::LocalHalfClosed(ActiveState::WaitData) => {
713                     if eos {
714                         stream.state = H2StreamState::Closed(CloseReason::EndStream);
715                         self.decrease_current_concurrency();
716                     }
717                 }
718                 H2StreamState::Closed(CloseReason::LocalGoAway | CloseReason::LocalRst) => {
719                     return FrameRecvState::Ignore;
720                 }
721                 _ => {
722                     return FrameRecvState::Err(H2Error::ConnectionError(ErrorCode::ProtocolError));
723                 }
724             },
725         }
726         FrameRecvState::OK
727     }
728 }
729 
730 impl Stream {
new( recv_window: RecvWindow, send_window: SendWindow, headers: Frame, data: BodyDataRef, ) -> Self731     pub(crate) fn new(
732         recv_window: RecvWindow,
733         send_window: SendWindow,
734         headers: Frame,
735         data: BodyDataRef,
736     ) -> Self {
737         Self {
738             recv_window,
739             send_window,
740             state: H2StreamState::Idle,
741             header: Some(headers),
742             data,
743         }
744     }
745 
is_init_or_active_flow_control(&self) -> bool746     pub(crate) fn is_init_or_active_flow_control(&self) -> bool {
747         matches!(
748             self.state,
749             H2StreamState::Idle
750                 | H2StreamState::Open {
751                     recv: ActiveState::WaitData,
752                     ..
753                 }
754                 | H2StreamState::LocalHalfClosed(ActiveState::WaitData)
755         )
756     }
757 }
758