1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use octets::{Octets, OctetsMut}; 15 16 use crate::h3::frame_new::{Headers, Payload}; 17 // use crate::h3::octets::WriteVarint; 18 use crate::h3::qpack::table::DynamicTable; 19 use crate::h3::qpack::{DecoderInst, QpackEncoder}; 20 use crate::h3::{frame_new, Frame}; 21 22 #[derive(PartialEq, Debug)] 23 enum FrameEncoderState { 24 // The initial state for the frame encoder. 25 Idle, 26 FrameComplete, 27 PayloadComplete, 28 // Header Frame 29 EncodingHeadersFrame, 30 EncodingHeadersPayload, 31 // Data Frame 32 EncodingDataFrame, 33 EncodingDataPaylaod, 34 // CancelPush Frame 35 EncodingCancelPushFrame, 36 EncodingCancelPushPayload, 37 // Settings Frame 38 EncodingSettingsFrame, 39 EncodingSettingsPayload, 40 // PushPromise Frame 41 EncodingPushPromiseFrame, 42 EncodingPushPromisePayload, 43 // Goaway Frame 44 EncodingGoawayFrame, 45 EncodingGoawayPayload, 46 // MaxPushId Frame 47 EncodingMaxPushIdFrame, 48 EncodingMaxPushIdPayload, 49 } 50 51 pub struct FrameEncoder<'a> { 52 qpack_encoder: QpackEncoder<'a>, 53 stream_id: usize, 54 // other frames 55 current_frame: Option<Frame>, 56 state: FrameEncoderState, 57 encoded_bytes: usize, 58 buf_offset: usize, 59 payload_offset: usize, 60 } 61 62 impl<'a> FrameEncoder<'a> { 63 /// Create a FrameEncoder 64 /// note: user should give the qpack's dynamic table, which is shared with 65 /// Decoder. new( table: &'a mut DynamicTable, qpack_all_post: bool, qpack_drain_index: usize, stream_id: usize, ) -> Self66 pub(crate) fn new( 67 table: &'a mut DynamicTable, 68 qpack_all_post: bool, 69 qpack_drain_index: usize, 70 stream_id: usize, 71 ) -> Self { 72 Self { 73 qpack_encoder: QpackEncoder::new(table, stream_id, qpack_all_post, qpack_drain_index), 74 stream_id, 75 current_frame: None, 76 state: FrameEncoderState::Idle, 77 encoded_bytes: 0, 78 buf_offset: 0, 79 payload_offset: 0, 80 } 81 } 82 83 /// Sets the current frame to be encoded by the `FrameEncoder`. The state of 84 /// the encoder is updated based on the payload type of the frame. set_frame(&mut self, frame: Frame)85 pub fn set_frame(&mut self, frame: Frame) { 86 self.current_frame = Some(frame); 87 // Reset the encoded bytes counter 88 self.encoded_bytes = 0; 89 // set frame state 90 match &self.current_frame { 91 Some(frame) => match frame.frame_type() { 92 &frame_new::HEADERS_FRAME_TYPE_ID => { 93 if let Payload::Headers(h) = frame.payload() { 94 // todo! header压缩 95 self.qpack_encoder.set_parts(h.get_part()); 96 // complete output in one go. 97 let payload_size = 98 self.qpack_encoder.encode(&mut self.header_payload_buffer); 99 self.remaining_header_payload = payload_size; 100 self.state = FrameEncoderState::EncodingHeadersFrame; 101 } 102 } 103 &frame_new::DATA_FRAME_TYPE_ID => self.state = FrameEncoderState::EncodingDataFrame, 104 &frame_new::CANCEL_PUSH_FRAME_TYPE_ID => { 105 self.state = FrameEncoderState::EncodingCancelPushFrame 106 } 107 &frame_new::SETTINGS_FRAME_TYPE_ID => { 108 self.state = FrameEncoderState::EncodingSettingsFrame 109 } 110 &frame_new::PUSH_PROMISE_FRAME_TYPE_ID => { 111 self.state = FrameEncoderState::EncodingPushPromiseFrame 112 } 113 &frame_new::GOAWAY_FRAME_TYPE_ID => { 114 self.state = FrameEncoderState::EncodingGoawayFrame 115 } 116 &frame_new::MAX_PUSH_FRAME_TYPE_ID => { 117 self.state = FrameEncoderState::EncodingMaxPushIdFrame 118 } 119 _ => {} 120 }, 121 None => self.state = FrameEncoderState::Idle, 122 } 123 } 124 encode_payload(&self, buf: &mut OctetsMut, data: &[u8], start: usize) -> usize125 fn encode_payload(&self, buf: &mut OctetsMut, data: &[u8], start: usize) -> usize { 126 let data_len = data.len(); 127 let remaining_data_bytes = data_len.saturating_sub(start); 128 let bytes_to_write = remaining_data_bytes.min(buf.len()); 129 // use unwrap, because data len must be smaller than buf len 130 buf.put_bytes(&data[start..start + bytes_to_write]).unwrap(); 131 bytes_to_write 132 } 133 encode_frame(&self, frame_ref: Option<&Frame>, buf: &mut [u8]) -> Result<usize, Err>134 fn encode_frame(&self, frame_ref: Option<&Frame>, buf: &mut [u8]) -> Result<usize, Err> { 135 if let Some(frame) = frame_ref { 136 let mut octet_buf = OctetsMut::with_slice(buf); 137 octet_buf.put_varint(frame.frame_type().clone())?; 138 octet_buf.put_varint(frame.frame_len().clone())?; 139 let size = octet_buf.off(); 140 Ok(size) 141 } else { 142 Err(FrameEncoderErr::NoCurrentFrame) 143 } 144 } 145 encode(&mut self, buf: &mut [u8]) -> Result<usize, Err>146 pub fn encode(&mut self, buf: &mut [u8]) -> Result<usize, Err> { 147 let mut written_bytes = 0; 148 149 while written_bytes < buf.len() { 150 match self.state { 151 FrameEncoderState::Idle 152 | FrameEncoderState::PayloadComplete 153 | FrameEncoderState::FrameComplete => { 154 break; 155 } 156 FrameEncoderState::EncodingHeadersFrame => { 157 match self.encode_frame(self.current_frame.as_ref(), buf) { 158 Ok(size) => { 159 self.encoded_bytes += size; 160 self.state = FrameEncoderState::EncodingHeadersPayload; 161 } 162 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 163 } 164 } 165 FrameEncoderState::EncodingHeadersPayload => { 166 if let Some(frame) = self.current_frame.as_ref() { 167 if let Payload::Headers(h) = frame.payload() { 168 let buf_remain = &mut buf[self.encoded_bytes..]; 169 let size_remain = buf_remain.len(); 170 let mut octet_buf = OctetsMut::with_slice(buf_remain); 171 if (h.get_headers().len() - self.payload_offset) < size_remain { 172 self.encode_payload( 173 &mut octet_buf, 174 h.get_headers(), 175 self.payload_offset, 176 ); 177 self.payload_offset = 0; 178 self.state = FrameEncoderState::PayloadComplete; 179 } else { 180 let writen_bytes = self.encode_payload( 181 &mut octet_buf, 182 h.get_headers(), 183 self.payload_offset, 184 ); 185 self.payload_offset += writen_bytes; 186 self.encoded_bytes += writen_bytes; 187 } 188 let size = octet_buf.off(); 189 Ok(size) 190 } else { 191 Err(FrameEncoderErr) 192 } 193 } else { 194 Err(FrameEncoderErr::NoCurrentFrame) 195 } 196 } 197 198 FrameEncoderState::EncodingDataFrame => { 199 match self.encode_frame(self.current_frame.as_ref(), buf) { 200 Ok(size) => { 201 self.encoded_bytes += size; 202 self.state = FrameEncoderState::EncodingDataPaylaod; 203 } 204 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 205 } 206 } 207 FrameEncoderState::EncodingDataPaylaod => { 208 if let Some(frame) = self.current_frame.as_ref() { 209 if let Payload::Data(d) = frame.payload() { 210 let buf_remain = &mut buf[self.encoded_bytes..]; 211 let size_remain = buf_remain.len(); 212 let mut octet_buf = OctetsMut::with_slice(buf_remain); 213 if (d.data().len() - self.payload_offset) < size_remain { 214 self.encode_payload(&mut octet_buf, d.data(), self.payload_offset); 215 self.payload_offset = 0; 216 self.state = FrameEncoderState::PayloadComplete; 217 } else { 218 let writen_bytes = self.encode_payload( 219 &mut octet_buf, 220 d.data(), 221 self.payload_offset, 222 ); 223 self.payload_offset += writen_bytes; 224 self.encoded_bytes += writen_bytes; 225 } 226 let size = octet_buf.off(); 227 Ok(size) 228 } else { 229 Err(FrameEncoderErr) 230 } 231 } else { 232 Err(FrameEncoderErr::NoCurrentFrame) 233 } 234 } 235 236 FrameEncoderState::EncodingCancelPushFrame => { 237 match self.encode_frame(self.current_frame.as_ref(), buf) { 238 Ok(size) => { 239 self.encoded_bytes += size; 240 self.state = FrameEncoderState::EncodingCancelPushPayload; 241 } 242 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 243 } 244 } 245 FrameEncoderState::EncodingCancelPushPayload => { 246 if let Some(frame) = self.current_frame.as_ref() { 247 if let Payload::CancelPush(cp) = frame.payload() { 248 let buf_remain = &mut buf[self.encoded_bytes..]; 249 let mut octet_buf = OctetsMut::with_slice(buf_remain); 250 octet_buf.put_varint(cp.get_push_id().clone())?; 251 self.state = FrameEncoderState::PayloadComplete; 252 let size = octet_buf.off(); 253 Ok(size) 254 } else { 255 Err(FrameEncoderErr) 256 } 257 } else { 258 Err(FrameEncoderErr::NoCurrentFrame) 259 } 260 } 261 262 FrameEncoderState::EncodingSettingsFrame => { 263 match self.encode_frame(self.current_frame.as_ref(), buf) { 264 Ok(size) => { 265 self.encoded_bytes += size; 266 self.state = FrameEncoderState::EncodingSettingsPayload; 267 } 268 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 269 } 270 } 271 FrameEncoderState::EncodingSettingsPayload => { 272 if let Some(frame) = self.current_frame.as_ref() { 273 if let Payload::Settings(s) = frame.payload() { 274 let buf_remain = &mut buf[self.encoded_bytes..]; 275 let mut octet_buf = OctetsMut::with_slice(buf_remain); 276 if let Some(val) = s.get_max_fied_section_size() { 277 octet_buf.put_varint(frame_new::SETTINGS_MAX_FIELD_SECTION_SIZE)?; 278 octet_buf.put_varint(val.clone())?; 279 } 280 281 if let Some(val) = s.get_qpack_max_table_capacity() { 282 octet_buf 283 .put_varint(frame_new::SETTINGS_QPACK_MAX_TABLE_CAPACITY)?; 284 octet_buf.put_varint(val.clone())?; 285 } 286 287 if let Some(val) = s.get_qpack_block_stream() { 288 octet_buf.put_varint(frame_new::SETTINGS_QPACK_BLOCKED_STREAMS)?; 289 octet_buf.put_varint(val.clone())?; 290 } 291 292 if let Some(val) = s.get_connect_protocol_enabled() { 293 octet_buf 294 .put_varint(frame_new::SETTINGS_ENABLE_CONNECT_PROTOCOL)?; 295 octet_buf.put_varint(val.clone())?; 296 } 297 298 if let Some(val) = s.get_h3_datagram() { 299 octet_buf.put_varint(frame_new::SETTINGS_H3_DATAGRAM_00)?; 300 octet_buf.put_varint(val.clone())?; 301 octet_buf.put_varint(frame_new::SETTINGS_H3_DATAGRAM)?; 302 octet_buf.put_varint(val.clone())?; 303 } 304 305 if octet_buf.off() == 0 { 306 Err(FrameEncoderErr::NoCurrentFrame) 307 } 308 self.encoded_bytes += octet_buf.off(); 309 self.state = FrameEncoderState::PayloadComplete; 310 Ok(octet_buf.off()) 311 } 312 } 313 } 314 315 FrameEncoderState::EncodingPushPromiseFrame => { 316 match self.encode_frame(self.current_frame.as_ref(), buf) { 317 Ok(size) => { 318 self.encoded_bytes += size; 319 self.state = FrameEncoderState::EncodingPushPromisePayload; 320 } 321 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 322 } 323 } 324 // todo! 325 FrameEncoderState::EncodingPushPromisePayload => {} 326 327 FrameEncoderState::EncodingGoawayFrame => { 328 match self.encode_frame(self.current_frame.as_ref(), buf) { 329 Ok(size) => { 330 self.encoded_bytes += size; 331 self.state = FrameEncoderState::EncodingGoawayPayload; 332 } 333 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 334 } 335 } 336 FrameEncoderState::EncodingGoawayPayload => { 337 if let Some(frame) = self.current_frame.as_ref() { 338 if let Payload::Goaway(g) = frame.payload() { 339 let buf_remain = &mut buf[self.encoded_bytes..]; 340 let mut octet_buf = OctetsMut::with_slice(buf_remain); 341 octet_buf.put_varint(g.get_id().clone())?; 342 self.state = FrameEncoderState::PayloadComplete; 343 let size = octet_buf.off(); 344 Ok(size) 345 } else { 346 Err(FrameEncoderErr) 347 } 348 } else { 349 Err(FrameEncoderErr::NoCurrentFrame) 350 } 351 } 352 353 FrameEncoderState::EncodingMaxPushIdFrame => { 354 match self.encode_frame(self.current_frame.as_ref(), buf) { 355 Ok(size) => { 356 self.encoded_bytes += size; 357 self.state = FrameEncoderState::EncodingMaxPushIdPayload; 358 } 359 Err(_) => Err(FrameEncoderErr::NoCurrentFrame), 360 } 361 } 362 FrameEncoderState::EncodingMaxPushIdPayload => { 363 if let Some(frame) = self.current_frame.as_ref() { 364 if let Payload::MaxPushId(max) = frame.payload() { 365 let buf_remain = &mut buf[self.encoded_bytes..]; 366 let mut octet_buf = OctetsMut::with_slice(buf_remain); 367 octet_buf.put_varint(max.get_id().clone())?; 368 self.state = FrameEncoderState::PayloadComplete; 369 let size = octet_buf.off(); 370 Ok(size) 371 } else { 372 Err(FrameEncoderErr) 373 } 374 } else { 375 Err(FrameEncoderErr::NoCurrentFrame) 376 } 377 } 378 _ => {} 379 } 380 } 381 Ok(written_bytes) 382 } 383 384 /// Encoder can modify size of the dynamic table, initial size of the table 385 /// is 0. the size can also be updated from decoder update_dyn_size(&mut self, max_size: usize)386 pub(crate) fn update_dyn_size(&mut self, max_size: usize) { 387 let cur_qpack = self.qpack_encoder.set_capacity( 388 max_size, 389 &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..], 390 ); 391 self.remaining_qpack_payload += cur_qpack; 392 } 393 394 /// User call `encode_header` to encode a header. encode_header(&mut self, headers: &Headers)395 pub fn encode_header(&mut self, headers: &Headers) { 396 self.qpack_encoder.set_parts(headers.get_parts()); 397 let (cur_qpack, cur_header, _) = self.qpack_encoder.encode( 398 &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..], 399 &mut self.header_payload_buffer[self.remaining_header_payload..], 400 ); 401 self.remaining_header_payload += cur_header; 402 self.remaining_qpack_payload += cur_qpack; 403 } 404 405 /// User must call `finish_encode_header` to end a batch of `encode_header`, 406 /// so as to add prefix to this stream. finish_encode_header(&mut self)407 pub fn finish_encode_header(&mut self) { 408 let (cur_qpack, cur_header, mut prefix) = self.qpack_encoder.encode( 409 &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..], 410 &mut self.header_payload_buffer[self.remaining_header_payload..], 411 ); 412 self.remaining_header_payload += cur_header; 413 self.remaining_qpack_payload += cur_qpack; 414 if let Some((prefix_buf, cur_prefix)) = prefix { 415 self.header_payload_buffer 416 .copy_within(0..self.remaining_header_payload, cur_prefix); 417 self.header_payload_buffer[..cur_prefix].copy_from_slice(&prefix_buf[..cur_prefix]); 418 self.remaining_header_payload += cur_prefix; 419 } 420 } 421 422 /// User call `decode_ins` to decode peer's qpack_decoder_stream. decode_ins(&mut self, buf: &[u8])423 pub fn decode_ins(&mut self, buf: &[u8]) { 424 match self.qpack_encoder.decode_ins(buf) { 425 Ok(Some(DecoderInst::StreamCancel)) => { 426 // todo: cancel this stream. 427 } 428 _ => {} 429 } 430 } 431 } 432 433 #[cfg(test)] 434 mod ut_headers_encode { 435 use crate::h3::encoder::FrameEncoder; 436 use crate::h3::frame::Headers; 437 use crate::h3::parts::Parts; 438 use crate::h3::qpack::table::{DynamicTable, Field}; 439 use crate::test_util::decode; 440 441 /// `s_res`: header stream after encoding by QPACK. 442 /// `q_res`: QPACK stream after encoding by QPACK. 443 #[test] 444 /// The encoder sends an encoded field section containing a literal 445 /// representation of a field with a static name reference. literal_field_line_with_name_reference()446 fn literal_field_line_with_name_reference() { 447 let mut table = DynamicTable::with_empty(); 448 let mut f_encoder = FrameEncoder::new(&mut table, false, 0, 0); 449 450 let s_res = decode("0000510b2f696e6465782e68746d6c").unwrap(); 451 let headers = [(Field::Path, String::from("/index.html"))]; 452 453 for (field, value) in headers.iter() { 454 let mut part = Parts::new(); 455 println!("encoding: HEADER: {:?} , VALUE: {:?}", field, value); 456 part.update(field.clone(), value.clone()); 457 let header = Headers::new(part.clone()); 458 f_encoder.encode_header(&header); 459 } 460 f_encoder.finish_encode_header(); 461 println!( 462 "header_payload_buffer: {:?}", 463 f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec() 464 ); 465 assert_eq!( 466 s_res, 467 f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec() 468 ); 469 } 470 471 #[test] 472 /// The encoder sets the dynamic table capacity, inserts a header with a 473 /// dynamic name reference, then sends a potentially blocking, encoded 474 /// field section referencing this new entry. The decoder acknowledges 475 /// processing the encoded field section, which implicitly acknowledges 476 /// all dynamic table insertions up to the Required Insert Count. dynamic_table()477 fn dynamic_table() { 478 let mut table = DynamicTable::with_empty(); 479 480 let mut f_encoder = FrameEncoder::new(&mut table, true, 0, 0); 481 482 f_encoder.update_dyn_size(220); 483 let s_res = decode("03811011").unwrap(); 484 let q_res = 485 decode("3fbd01c00f7777772e6578616d706c652e636f6dc10c2f73616d706c652f70617468").unwrap(); 486 let headers = [ 487 (Field::Authority, String::from("www.example.com")), 488 (Field::Path, String::from("/sample/path")), 489 ]; 490 for (field, value) in headers.iter() { 491 println!("encoding: HEADER: {:?} , VALUE: {:?}", field, value); 492 let mut part = Parts::new(); 493 part.update(field.clone(), value.clone()); 494 let header = Headers::new(part.clone()); 495 f_encoder.encode_header(&header); 496 } 497 f_encoder.finish_encode_header(); 498 println!( 499 "header_payload_buffer: {:?}", 500 f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec() 501 ); 502 println!( 503 "qpack_encoder_buffer: {:?}", 504 f_encoder.qpack_encoder_buffer[..f_encoder.remaining_qpack_payload].to_vec() 505 ); 506 assert_eq!( 507 s_res, 508 f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec() 509 ); 510 assert_eq!( 511 q_res, 512 f_encoder.qpack_encoder_buffer[..f_encoder.remaining_qpack_payload].to_vec() 513 ); 514 } 515 } 516