1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use octets::{Octets, OctetsMut};
15 
16 use crate::h3::frame_new::{Headers, Payload};
17 // use crate::h3::octets::WriteVarint;
18 use crate::h3::qpack::table::DynamicTable;
19 use crate::h3::qpack::{DecoderInst, QpackEncoder};
20 use crate::h3::{frame_new, Frame};
21 
22 #[derive(PartialEq, Debug)]
23 enum FrameEncoderState {
24     // The initial state for the frame encoder.
25     Idle,
26     FrameComplete,
27     PayloadComplete,
28     // Header Frame
29     EncodingHeadersFrame,
30     EncodingHeadersPayload,
31     // Data Frame
32     EncodingDataFrame,
33     EncodingDataPaylaod,
34     // CancelPush Frame
35     EncodingCancelPushFrame,
36     EncodingCancelPushPayload,
37     // Settings Frame
38     EncodingSettingsFrame,
39     EncodingSettingsPayload,
40     // PushPromise Frame
41     EncodingPushPromiseFrame,
42     EncodingPushPromisePayload,
43     // Goaway Frame
44     EncodingGoawayFrame,
45     EncodingGoawayPayload,
46     // MaxPushId Frame
47     EncodingMaxPushIdFrame,
48     EncodingMaxPushIdPayload,
49 }
50 
51 pub struct FrameEncoder<'a> {
52     qpack_encoder: QpackEncoder<'a>,
53     stream_id: usize,
54     // other frames
55     current_frame: Option<Frame>,
56     state: FrameEncoderState,
57     encoded_bytes: usize,
58     buf_offset: usize,
59     payload_offset: usize,
60 }
61 
62 impl<'a> FrameEncoder<'a> {
63     /// Create a FrameEncoder
64     /// note: user should give the qpack's dynamic table, which is shared with
65     /// Decoder.
new( table: &'a mut DynamicTable, qpack_all_post: bool, qpack_drain_index: usize, stream_id: usize, ) -> Self66     pub(crate) fn new(
67         table: &'a mut DynamicTable,
68         qpack_all_post: bool,
69         qpack_drain_index: usize,
70         stream_id: usize,
71     ) -> Self {
72         Self {
73             qpack_encoder: QpackEncoder::new(table, stream_id, qpack_all_post, qpack_drain_index),
74             stream_id,
75             current_frame: None,
76             state: FrameEncoderState::Idle,
77             encoded_bytes: 0,
78             buf_offset: 0,
79             payload_offset: 0,
80         }
81     }
82 
83     /// Sets the current frame to be encoded by the `FrameEncoder`. The state of
84     /// the encoder is updated based on the payload type of the frame.
set_frame(&mut self, frame: Frame)85     pub fn set_frame(&mut self, frame: Frame) {
86         self.current_frame = Some(frame);
87         // Reset the encoded bytes counter
88         self.encoded_bytes = 0;
89         // set frame state
90         match &self.current_frame {
91             Some(frame) => match frame.frame_type() {
92                 &frame_new::HEADERS_FRAME_TYPE_ID => {
93                     if let Payload::Headers(h) = frame.payload() {
94                         // todo! header压缩
95                         self.qpack_encoder.set_parts(h.get_part());
96                         // complete output in one go.
97                         let payload_size =
98                             self.qpack_encoder.encode(&mut self.header_payload_buffer);
99                         self.remaining_header_payload = payload_size;
100                         self.state = FrameEncoderState::EncodingHeadersFrame;
101                     }
102                 }
103                 &frame_new::DATA_FRAME_TYPE_ID => self.state = FrameEncoderState::EncodingDataFrame,
104                 &frame_new::CANCEL_PUSH_FRAME_TYPE_ID => {
105                     self.state = FrameEncoderState::EncodingCancelPushFrame
106                 }
107                 &frame_new::SETTINGS_FRAME_TYPE_ID => {
108                     self.state = FrameEncoderState::EncodingSettingsFrame
109                 }
110                 &frame_new::PUSH_PROMISE_FRAME_TYPE_ID => {
111                     self.state = FrameEncoderState::EncodingPushPromiseFrame
112                 }
113                 &frame_new::GOAWAY_FRAME_TYPE_ID => {
114                     self.state = FrameEncoderState::EncodingGoawayFrame
115                 }
116                 &frame_new::MAX_PUSH_FRAME_TYPE_ID => {
117                     self.state = FrameEncoderState::EncodingMaxPushIdFrame
118                 }
119                 _ => {}
120             },
121             None => self.state = FrameEncoderState::Idle,
122         }
123     }
124 
encode_payload(&self, buf: &mut OctetsMut, data: &[u8], start: usize) -> usize125     fn encode_payload(&self, buf: &mut OctetsMut, data: &[u8], start: usize) -> usize {
126         let data_len = data.len();
127         let remaining_data_bytes = data_len.saturating_sub(start);
128         let bytes_to_write = remaining_data_bytes.min(buf.len());
129         // use unwrap, because data len must be smaller than buf len
130         buf.put_bytes(&data[start..start + bytes_to_write]).unwrap();
131         bytes_to_write
132     }
133 
encode_frame(&self, frame_ref: Option<&Frame>, buf: &mut [u8]) -> Result<usize, Err>134     fn encode_frame(&self, frame_ref: Option<&Frame>, buf: &mut [u8]) -> Result<usize, Err> {
135         if let Some(frame) = frame_ref {
136             let mut octet_buf = OctetsMut::with_slice(buf);
137             octet_buf.put_varint(frame.frame_type().clone())?;
138             octet_buf.put_varint(frame.frame_len().clone())?;
139             let size = octet_buf.off();
140             Ok(size)
141         } else {
142             Err(FrameEncoderErr::NoCurrentFrame)
143         }
144     }
145 
encode(&mut self, buf: &mut [u8]) -> Result<usize, Err>146     pub fn encode(&mut self, buf: &mut [u8]) -> Result<usize, Err> {
147         let mut written_bytes = 0;
148 
149         while written_bytes < buf.len() {
150             match self.state {
151                 FrameEncoderState::Idle
152                 | FrameEncoderState::PayloadComplete
153                 | FrameEncoderState::FrameComplete => {
154                     break;
155                 }
156                 FrameEncoderState::EncodingHeadersFrame => {
157                     match self.encode_frame(self.current_frame.as_ref(), buf) {
158                         Ok(size) => {
159                             self.encoded_bytes += size;
160                             self.state = FrameEncoderState::EncodingHeadersPayload;
161                         }
162                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
163                     }
164                 }
165                 FrameEncoderState::EncodingHeadersPayload => {
166                     if let Some(frame) = self.current_frame.as_ref() {
167                         if let Payload::Headers(h) = frame.payload() {
168                             let buf_remain = &mut buf[self.encoded_bytes..];
169                             let size_remain = buf_remain.len();
170                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
171                             if (h.get_headers().len() - self.payload_offset) < size_remain {
172                                 self.encode_payload(
173                                     &mut octet_buf,
174                                     h.get_headers(),
175                                     self.payload_offset,
176                                 );
177                                 self.payload_offset = 0;
178                                 self.state = FrameEncoderState::PayloadComplete;
179                             } else {
180                                 let writen_bytes = self.encode_payload(
181                                     &mut octet_buf,
182                                     h.get_headers(),
183                                     self.payload_offset,
184                                 );
185                                 self.payload_offset += writen_bytes;
186                                 self.encoded_bytes += writen_bytes;
187                             }
188                             let size = octet_buf.off();
189                             Ok(size)
190                         } else {
191                             Err(FrameEncoderErr)
192                         }
193                     } else {
194                         Err(FrameEncoderErr::NoCurrentFrame)
195                     }
196                 }
197 
198                 FrameEncoderState::EncodingDataFrame => {
199                     match self.encode_frame(self.current_frame.as_ref(), buf) {
200                         Ok(size) => {
201                             self.encoded_bytes += size;
202                             self.state = FrameEncoderState::EncodingDataPaylaod;
203                         }
204                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
205                     }
206                 }
207                 FrameEncoderState::EncodingDataPaylaod => {
208                     if let Some(frame) = self.current_frame.as_ref() {
209                         if let Payload::Data(d) = frame.payload() {
210                             let buf_remain = &mut buf[self.encoded_bytes..];
211                             let size_remain = buf_remain.len();
212                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
213                             if (d.data().len() - self.payload_offset) < size_remain {
214                                 self.encode_payload(&mut octet_buf, d.data(), self.payload_offset);
215                                 self.payload_offset = 0;
216                                 self.state = FrameEncoderState::PayloadComplete;
217                             } else {
218                                 let writen_bytes = self.encode_payload(
219                                     &mut octet_buf,
220                                     d.data(),
221                                     self.payload_offset,
222                                 );
223                                 self.payload_offset += writen_bytes;
224                                 self.encoded_bytes += writen_bytes;
225                             }
226                             let size = octet_buf.off();
227                             Ok(size)
228                         } else {
229                             Err(FrameEncoderErr)
230                         }
231                     } else {
232                         Err(FrameEncoderErr::NoCurrentFrame)
233                     }
234                 }
235 
236                 FrameEncoderState::EncodingCancelPushFrame => {
237                     match self.encode_frame(self.current_frame.as_ref(), buf) {
238                         Ok(size) => {
239                             self.encoded_bytes += size;
240                             self.state = FrameEncoderState::EncodingCancelPushPayload;
241                         }
242                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
243                     }
244                 }
245                 FrameEncoderState::EncodingCancelPushPayload => {
246                     if let Some(frame) = self.current_frame.as_ref() {
247                         if let Payload::CancelPush(cp) = frame.payload() {
248                             let buf_remain = &mut buf[self.encoded_bytes..];
249                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
250                             octet_buf.put_varint(cp.get_push_id().clone())?;
251                             self.state = FrameEncoderState::PayloadComplete;
252                             let size = octet_buf.off();
253                             Ok(size)
254                         } else {
255                             Err(FrameEncoderErr)
256                         }
257                     } else {
258                         Err(FrameEncoderErr::NoCurrentFrame)
259                     }
260                 }
261 
262                 FrameEncoderState::EncodingSettingsFrame => {
263                     match self.encode_frame(self.current_frame.as_ref(), buf) {
264                         Ok(size) => {
265                             self.encoded_bytes += size;
266                             self.state = FrameEncoderState::EncodingSettingsPayload;
267                         }
268                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
269                     }
270                 }
271                 FrameEncoderState::EncodingSettingsPayload => {
272                     if let Some(frame) = self.current_frame.as_ref() {
273                         if let Payload::Settings(s) = frame.payload() {
274                             let buf_remain = &mut buf[self.encoded_bytes..];
275                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
276                             if let Some(val) = s.get_max_fied_section_size() {
277                                 octet_buf.put_varint(frame_new::SETTINGS_MAX_FIELD_SECTION_SIZE)?;
278                                 octet_buf.put_varint(val.clone())?;
279                             }
280 
281                             if let Some(val) = s.get_qpack_max_table_capacity() {
282                                 octet_buf
283                                     .put_varint(frame_new::SETTINGS_QPACK_MAX_TABLE_CAPACITY)?;
284                                 octet_buf.put_varint(val.clone())?;
285                             }
286 
287                             if let Some(val) = s.get_qpack_block_stream() {
288                                 octet_buf.put_varint(frame_new::SETTINGS_QPACK_BLOCKED_STREAMS)?;
289                                 octet_buf.put_varint(val.clone())?;
290                             }
291 
292                             if let Some(val) = s.get_connect_protocol_enabled() {
293                                 octet_buf
294                                     .put_varint(frame_new::SETTINGS_ENABLE_CONNECT_PROTOCOL)?;
295                                 octet_buf.put_varint(val.clone())?;
296                             }
297 
298                             if let Some(val) = s.get_h3_datagram() {
299                                 octet_buf.put_varint(frame_new::SETTINGS_H3_DATAGRAM_00)?;
300                                 octet_buf.put_varint(val.clone())?;
301                                 octet_buf.put_varint(frame_new::SETTINGS_H3_DATAGRAM)?;
302                                 octet_buf.put_varint(val.clone())?;
303                             }
304 
305                             if octet_buf.off() == 0 {
306                                 Err(FrameEncoderErr::NoCurrentFrame)
307                             }
308                             self.encoded_bytes += octet_buf.off();
309                             self.state = FrameEncoderState::PayloadComplete;
310                             Ok(octet_buf.off())
311                         }
312                     }
313                 }
314 
315                 FrameEncoderState::EncodingPushPromiseFrame => {
316                     match self.encode_frame(self.current_frame.as_ref(), buf) {
317                         Ok(size) => {
318                             self.encoded_bytes += size;
319                             self.state = FrameEncoderState::EncodingPushPromisePayload;
320                         }
321                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
322                     }
323                 }
324                 // todo!
325                 FrameEncoderState::EncodingPushPromisePayload => {}
326 
327                 FrameEncoderState::EncodingGoawayFrame => {
328                     match self.encode_frame(self.current_frame.as_ref(), buf) {
329                         Ok(size) => {
330                             self.encoded_bytes += size;
331                             self.state = FrameEncoderState::EncodingGoawayPayload;
332                         }
333                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
334                     }
335                 }
336                 FrameEncoderState::EncodingGoawayPayload => {
337                     if let Some(frame) = self.current_frame.as_ref() {
338                         if let Payload::Goaway(g) = frame.payload() {
339                             let buf_remain = &mut buf[self.encoded_bytes..];
340                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
341                             octet_buf.put_varint(g.get_id().clone())?;
342                             self.state = FrameEncoderState::PayloadComplete;
343                             let size = octet_buf.off();
344                             Ok(size)
345                         } else {
346                             Err(FrameEncoderErr)
347                         }
348                     } else {
349                         Err(FrameEncoderErr::NoCurrentFrame)
350                     }
351                 }
352 
353                 FrameEncoderState::EncodingMaxPushIdFrame => {
354                     match self.encode_frame(self.current_frame.as_ref(), buf) {
355                         Ok(size) => {
356                             self.encoded_bytes += size;
357                             self.state = FrameEncoderState::EncodingMaxPushIdPayload;
358                         }
359                         Err(_) => Err(FrameEncoderErr::NoCurrentFrame),
360                     }
361                 }
362                 FrameEncoderState::EncodingMaxPushIdPayload => {
363                     if let Some(frame) = self.current_frame.as_ref() {
364                         if let Payload::MaxPushId(max) = frame.payload() {
365                             let buf_remain = &mut buf[self.encoded_bytes..];
366                             let mut octet_buf = OctetsMut::with_slice(buf_remain);
367                             octet_buf.put_varint(max.get_id().clone())?;
368                             self.state = FrameEncoderState::PayloadComplete;
369                             let size = octet_buf.off();
370                             Ok(size)
371                         } else {
372                             Err(FrameEncoderErr)
373                         }
374                     } else {
375                         Err(FrameEncoderErr::NoCurrentFrame)
376                     }
377                 }
378                 _ => {}
379             }
380         }
381         Ok(written_bytes)
382     }
383 
384     /// Encoder can modify size of the dynamic table, initial size of the table
385     /// is 0. the size can also be updated from decoder
update_dyn_size(&mut self, max_size: usize)386     pub(crate) fn update_dyn_size(&mut self, max_size: usize) {
387         let cur_qpack = self.qpack_encoder.set_capacity(
388             max_size,
389             &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..],
390         );
391         self.remaining_qpack_payload += cur_qpack;
392     }
393 
394     /// User call `encode_header` to encode a header.
encode_header(&mut self, headers: &Headers)395     pub fn encode_header(&mut self, headers: &Headers) {
396         self.qpack_encoder.set_parts(headers.get_parts());
397         let (cur_qpack, cur_header, _) = self.qpack_encoder.encode(
398             &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..],
399             &mut self.header_payload_buffer[self.remaining_header_payload..],
400         );
401         self.remaining_header_payload += cur_header;
402         self.remaining_qpack_payload += cur_qpack;
403     }
404 
405     /// User must call `finish_encode_header` to end a batch of `encode_header`,
406     /// so as to add prefix to this stream.
finish_encode_header(&mut self)407     pub fn finish_encode_header(&mut self) {
408         let (cur_qpack, cur_header, mut prefix) = self.qpack_encoder.encode(
409             &mut self.qpack_encoder_buffer[self.remaining_qpack_payload..],
410             &mut self.header_payload_buffer[self.remaining_header_payload..],
411         );
412         self.remaining_header_payload += cur_header;
413         self.remaining_qpack_payload += cur_qpack;
414         if let Some((prefix_buf, cur_prefix)) = prefix {
415             self.header_payload_buffer
416                 .copy_within(0..self.remaining_header_payload, cur_prefix);
417             self.header_payload_buffer[..cur_prefix].copy_from_slice(&prefix_buf[..cur_prefix]);
418             self.remaining_header_payload += cur_prefix;
419         }
420     }
421 
422     /// User call `decode_ins` to decode peer's qpack_decoder_stream.
decode_ins(&mut self, buf: &[u8])423     pub fn decode_ins(&mut self, buf: &[u8]) {
424         match self.qpack_encoder.decode_ins(buf) {
425             Ok(Some(DecoderInst::StreamCancel)) => {
426                 // todo: cancel this stream.
427             }
428             _ => {}
429         }
430     }
431 }
432 
433 #[cfg(test)]
434 mod ut_headers_encode {
435     use crate::h3::encoder::FrameEncoder;
436     use crate::h3::frame::Headers;
437     use crate::h3::parts::Parts;
438     use crate::h3::qpack::table::{DynamicTable, Field};
439     use crate::test_util::decode;
440 
441     /// `s_res`: header stream after encoding by QPACK.
442     /// `q_res`: QPACK stream after encoding by QPACK.
443     #[test]
444     /// The encoder sends an encoded field section containing a literal
445     /// representation of a field with a static name reference.
literal_field_line_with_name_reference()446     fn literal_field_line_with_name_reference() {
447         let mut table = DynamicTable::with_empty();
448         let mut f_encoder = FrameEncoder::new(&mut table, false, 0, 0);
449 
450         let s_res = decode("0000510b2f696e6465782e68746d6c").unwrap();
451         let headers = [(Field::Path, String::from("/index.html"))];
452 
453         for (field, value) in headers.iter() {
454             let mut part = Parts::new();
455             println!("encoding: HEADER: {:?} , VALUE: {:?}", field, value);
456             part.update(field.clone(), value.clone());
457             let header = Headers::new(part.clone());
458             f_encoder.encode_header(&header);
459         }
460         f_encoder.finish_encode_header();
461         println!(
462             "header_payload_buffer: {:?}",
463             f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec()
464         );
465         assert_eq!(
466             s_res,
467             f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec()
468         );
469     }
470 
471     #[test]
472     /// The encoder sets the dynamic table capacity, inserts a header with a
473     /// dynamic name reference, then sends a potentially blocking, encoded
474     /// field section referencing this new entry. The decoder acknowledges
475     /// processing the encoded field section, which implicitly acknowledges
476     /// all dynamic table insertions up to the Required Insert Count.
dynamic_table()477     fn dynamic_table() {
478         let mut table = DynamicTable::with_empty();
479 
480         let mut f_encoder = FrameEncoder::new(&mut table, true, 0, 0);
481 
482         f_encoder.update_dyn_size(220);
483         let s_res = decode("03811011").unwrap();
484         let q_res =
485             decode("3fbd01c00f7777772e6578616d706c652e636f6dc10c2f73616d706c652f70617468").unwrap();
486         let headers = [
487             (Field::Authority, String::from("www.example.com")),
488             (Field::Path, String::from("/sample/path")),
489         ];
490         for (field, value) in headers.iter() {
491             println!("encoding: HEADER: {:?} , VALUE: {:?}", field, value);
492             let mut part = Parts::new();
493             part.update(field.clone(), value.clone());
494             let header = Headers::new(part.clone());
495             f_encoder.encode_header(&header);
496         }
497         f_encoder.finish_encode_header();
498         println!(
499             "header_payload_buffer: {:?}",
500             f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec()
501         );
502         println!(
503             "qpack_encoder_buffer: {:?}",
504             f_encoder.qpack_encoder_buffer[..f_encoder.remaining_qpack_payload].to_vec()
505         );
506         assert_eq!(
507             s_res,
508             f_encoder.header_payload_buffer[..f_encoder.remaining_header_payload].to_vec()
509         );
510         assert_eq!(
511             q_res,
512             f_encoder.qpack_encoder_buffer[..f_encoder.remaining_qpack_payload].to_vec()
513         );
514     }
515 }
516