// Copyright (c) 2023 Huawei Device Co., Ltd.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Streams operations utils.
use std::cmp::{min, Ordering};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload};
use crate::runtime::UnboundedSender;
use crate::util::dispatcher::http2::DispatchErrorKind;
use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow};
use crate::util::h2::data_ref::BodyDataRef;
pub(crate) const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1;
pub(crate) const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1;
const INITIAL_LATEST_REMOTE_ID: u32 = 0;
const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100;
pub(crate) enum FrameRecvState {
OK,
Ignore,
Err(H2Error),
}
pub(crate) enum DataReadState {
Closed,
// Wait for poll_read or wait for window.
Pending,
Ready(Frame),
Finish(Frame),
}
pub(crate) enum StreamEndState {
OK,
Ignore,
Err(H2Error),
}
// +--------+
// send PP | | recv PP
// ,--------| idle |--------.
// / | | \
// v +--------+ v
// +----------+ | +----------+
// | | | send H / | |
// ,------| reserved | | recv H | reserved |------.
// | | (local) | | | (remote) | |
// | +----------+ v +----------+ |
// | | +--------+ | |
// | | recv ES | | send ES | |
// | send H | ,-------| open |-------. | recv H |
// | | / | | \ | |
// | v v +--------+ v v |
// | +----------+ | +----------+ |
// | | half | | | half | |
// | | closed | | send R / | closed | |
// | | (remote) | | recv R | (local) | |
// | +----------+ | +----------+ |
// | | | | |
// | | send ES / | recv ES / | |
// | | send R / v send R / | |
// | | recv R +--------+ recv R | |
// | send R / `----------->| |<-----------' send R / |
// | recv R | closed | recv R |
// `----------------------->| |<----------------------'
// +--------+
#[derive(Copy, Clone, Debug)]
pub(crate) enum H2StreamState {
Idle,
// When response does not depend on request,
// the server can send response directly without waiting for the request to finish receiving.
// Therefore, the sending and receiving states of the client have their own states
Open {
send: ActiveState,
recv: ActiveState,
},
#[allow(dead_code)]
ReservedRemote,
// After the request is sent, the state is waiting for the response to be received.
LocalHalfClosed(ActiveState),
// When the response is received but the request is not fully sent,
// this indicates the status of the request being sent
RemoteHalfClosed(ActiveState),
Closed(CloseReason),
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum CloseReason {
LocalRst,
RemoteRst,
RemoteGoAway,
LocalGoAway,
EndStream,
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum ActiveState {
WaitHeaders,
WaitData,
}
pub(crate) struct Stream {
pub(crate) recv_window: RecvWindow,
pub(crate) send_window: SendWindow,
pub(crate) state: H2StreamState,
pub(crate) header: Option,
pub(crate) data: BodyDataRef,
}
pub(crate) struct RequestWrapper {
pub(crate) header: Frame,
pub(crate) data: BodyDataRef,
}
pub(crate) struct Streams {
// Records the received goaway last_stream_id.
pub(crate) max_send_id: u32,
// Records the send goaway last_stream_id.
pub(crate) max_recv_id: u32,
// Currently the client doesn't support push promise, so this value is always 0.
pub(crate) latest_remote_id: u32,
pub(crate) stream_recv_window_size: u32,
pub(crate) stream_send_window_size: u32,
max_concurrent_streams: u32,
current_concurrent_streams: u32,
flow_control: FlowControl,
pending_concurrency: VecDeque,
pending_stream_window: HashSet,
pending_conn_window: VecDeque,
pending_send: VecDeque,
window_updating_streams: VecDeque,
pub(crate) stream_map: HashMap,
}
macro_rules! change_stream_state {
(Idle: $eos: expr, $state: expr) => {
$state = if $eos {
H2StreamState::RemoteHalfClosed(ActiveState::WaitHeaders)
} else {
H2StreamState::Open {
send: ActiveState::WaitHeaders,
recv: ActiveState::WaitData,
}
};
};
(Open: $eos: expr, $state: expr, $send: expr) => {
$state = if $eos {
H2StreamState::RemoteHalfClosed($send.clone())
} else {
H2StreamState::Open {
send: $send.clone(),
recv: ActiveState::WaitData,
}
};
};
(HalfClosed: $eos: expr, $state: expr) => {
$state = if $eos {
H2StreamState::Closed(CloseReason::EndStream)
} else {
H2StreamState::LocalHalfClosed(ActiveState::WaitData)
};
};
}
impl Streams {
pub(crate) fn new(
recv_window_size: u32,
send_window_size: u32,
flow_control: FlowControl,
) -> Self {
Self {
max_send_id: INITIAL_MAX_SEND_STREAM_ID,
max_recv_id: INITIAL_MAX_RECV_STREAM_ID,
latest_remote_id: INITIAL_LATEST_REMOTE_ID,
max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
current_concurrent_streams: 0,
stream_recv_window_size: recv_window_size,
stream_send_window_size: send_window_size,
flow_control,
pending_concurrency: VecDeque::new(),
pending_stream_window: HashSet::new(),
pending_conn_window: VecDeque::new(),
pending_send: VecDeque::new(),
window_updating_streams: VecDeque::new(),
stream_map: HashMap::new(),
}
}
pub(crate) fn decrease_current_concurrency(&mut self) {
self.current_concurrent_streams -= 1;
}
pub(crate) fn increase_current_concurrency(&mut self) {
self.current_concurrent_streams += 1;
}
pub(crate) fn reach_max_concurrency(&mut self) -> bool {
self.current_concurrent_streams >= self.max_concurrent_streams
}
pub(crate) fn apply_max_concurrent_streams(&mut self, num: u32) {
self.max_concurrent_streams = num;
}
pub(crate) fn apply_send_initial_window_size(&mut self, size: u32) -> Result<(), H2Error> {
let current = self.stream_send_window_size;
self.stream_send_window_size = size;
match current.cmp(&size) {
Ordering::Less => {
let excess = size - current;
for (_id, stream) in self.stream_map.iter_mut() {
stream.send_window.increase_size(excess)?;
}
for id in self.pending_stream_window.iter() {
self.pending_send.push_back(*id);
}
self.pending_stream_window.clear();
}
Ordering::Greater => {
let excess = current - size;
for (_id, stream) in self.stream_map.iter_mut() {
stream.send_window.reduce_size(excess);
}
}
Ordering::Equal => {}
}
Ok(())
}
pub(crate) fn apply_recv_initial_window_size(&mut self, size: u32) {
let current = self.stream_recv_window_size;
self.stream_recv_window_size = size;
match current.cmp(&size) {
Ordering::Less => {
for (_id, stream) in self.stream_map.iter_mut() {
let extra = size - current;
stream.recv_window.increase_notification(extra);
stream.recv_window.increase_actual(extra);
}
}
Ordering::Greater => {
for (_id, stream) in self.stream_map.iter_mut() {
stream.recv_window.reduce_notification(current - size);
}
}
Ordering::Equal => {}
}
}
pub(crate) fn release_stream_recv_window(&mut self, id: u32, size: u32) -> Result<(), H2Error> {
if let Some(stream) = self.stream_map.get_mut(&id) {
if stream.recv_window.notification_available() < size {
return Err(H2Error::StreamError(id, ErrorCode::FlowControlError));
}
stream.recv_window.recv_data(size);
if stream.recv_window.unreleased_size().is_some() {
self.window_updating_streams.push_back(id);
}
}
Ok(())
}
pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> {
if self.flow_control.recv_notification_size_available() < size {
return Err(H2Error::ConnectionError(ErrorCode::FlowControlError));
}
self.flow_control.recv_data(size);
Ok(())
}
pub(crate) fn is_closed(&self) -> bool {
for (_id, stream) in self.stream_map.iter() {
match stream.state {
H2StreamState::Closed(_) => {}
_ => {
return false;
}
}
}
true
}
pub(crate) fn stream_state(&self, id: u32) -> Option {
self.stream_map.get(&id).map(|stream| stream.state)
}
pub(crate) fn insert(&mut self, id: u32, request: RequestWrapper) {
let send_window = SendWindow::new(self.stream_send_window_size as i32);
let recv_window = RecvWindow::new(self.stream_recv_window_size as i32);
let stream = Stream::new(recv_window, send_window, request.header, request.data);
self.stream_map.insert(id, stream);
}
pub(crate) fn push_back_pending_send(&mut self, id: u32) {
self.pending_send.push_back(id);
}
pub(crate) fn push_pending_concurrency(&mut self, id: u32) {
self.pending_concurrency.push_back(id);
}
pub(crate) fn next_pending_stream(&mut self) -> Option {
self.pending_send.pop_front()
}
pub(crate) fn pending_stream_num(&self) -> usize {
self.pending_send.len()
}
pub(crate) fn try_consume_pending_concurrency(&mut self) {
while !self.reach_max_concurrency() {
match self.pending_concurrency.pop_front() {
None => {
return;
}
Some(id) => {
self.increase_current_concurrency();
self.push_back_pending_send(id);
}
}
}
}
pub(crate) fn increase_conn_send_window(&mut self, size: u32) -> Result<(), H2Error> {
self.flow_control.increase_send_size(size)
}
pub(crate) fn reassign_conn_send_window(&mut self) {
// Since the data structure of the body is a stream,
// the size of a body cannot be obtained,
// so all streams in pending_conn_window are added to the pending_send queue
// again.
loop {
match self.pending_conn_window.pop_front() {
None => break,
Some(id) => {
self.push_back_pending_send(id);
}
}
}
}
pub(crate) fn reassign_stream_send_window(
&mut self,
id: u32,
size: u32,
) -> Result<(), H2Error> {
if let Some(stream) = self.stream_map.get_mut(&id) {
stream.send_window.increase_size(size)?;
}
if self.pending_stream_window.take(&id).is_some() {
self.pending_send.push_back(id);
}
Ok(())
}
pub(crate) fn window_update_conn(
&mut self,
sender: &UnboundedSender,
) -> Result<(), DispatchErrorKind> {
if let Some(window_update) = self.flow_control.check_conn_recv_window_update() {
sender
.send(window_update)
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
}
Ok(())
}
pub(crate) fn window_update_streams(
&mut self,
sender: &UnboundedSender,
) -> Result<(), DispatchErrorKind> {
loop {
match self.window_updating_streams.pop_front() {
None => return Ok(()),
Some(id) => {
if let Some(stream) = self.stream_map.get_mut(&id) {
if !stream.is_init_or_active_flow_control() {
return Ok(());
}
if let Some(window_update) = stream.recv_window.check_window_update(id) {
sender
.send(window_update)
.map_err(|_e| DispatchErrorKind::ChannelClosed)?;
}
}
}
}
}
}
pub(crate) fn headers(&mut self, id: u32) -> Result