Skip to content

Instantly share code, notes, and snippets.

@keepsimple1
Last active April 6, 2021 14:30
Show Gist options
  • Save keepsimple1/d3ff6b511b95a967026d316c21ea8458 to your computer and use it in GitHub Desktop.
Save keepsimple1/d3ff6b511b95a967026d316c21ea8458 to your computer and use it in GitHub Desktop.
keepsimple1 quiche branch v0.6.0 tag v0.6.0-p10 diffs with baseline quiche 0.6.0
diff --git a/examples/http3-client.rs b/examples/http3-client.rs
index a93d67e..329c243 100644
--- a/examples/http3-client.rs
+++ b/examples/http3-client.rs
@@ -264,6 +264,16 @@ fn main() {
info!("GOAWAY id={}", goaway_id);
},
+ Ok((stream_id, quiche::h3::Event::StopSending { error_code })) => {
+ info!("StopSending received for stream {}, error_code {}",
+ stream_id, error_code);
+ },
+
+ Ok((stream_id, quiche::h3::Event::ResetStream { error_code, final_size })) => {
+ info!("ResetStream received for stream {}, error_code {}, final_size {}",
+ stream_id, error_code, final_size);
+ },
+
Err(quiche::h3::Error::Done) => {
break;
},
diff --git a/examples/http3-server.rs b/examples/http3-server.rs
index 4c41cbb..ba80c6d 100644
--- a/examples/http3-server.rs
+++ b/examples/http3-server.rs
@@ -361,6 +361,16 @@ fn main() {
Ok((_goaway_id, quiche::h3::Event::GoAway)) => (),
+ Ok((stream_id, quiche::h3::Event::StopSending { error_code })) => {
+ info!("StopSending received for stream {}, error_code {}",
+ stream_id, error_code);
+ },
+
+ Ok((stream_id, quiche::h3::Event::ResetStream { error_code, final_size })) => {
+ info!("ResetStream received for stream {}, error_code {}, final_size {}",
+ stream_id, error_code, final_size);
+ },
+
Err(quiche::h3::Error::Done) => {
break;
},
diff --git a/src/h3/ffi.rs b/src/h3/ffi.rs
index 2d278c7..3783413 100644
--- a/src/h3/ffi.rs
+++ b/src/h3/ffi.rs
@@ -115,6 +115,10 @@ pub extern fn quiche_h3_event_type(ev: &h3::Event) -> u32 {
h3::Event::Datagram { .. } => 3,
h3::Event::GoAway { .. } => 4,
+
+ h3::Event::StopSending { .. } => 5,
+
+ h3::Event::ResetStream { .. } => 6,
}
}
diff --git a/src/h3/mod.rs b/src/h3/mod.rs
index 6248688..bc74fe4 100644
--- a/src/h3/mod.rs
+++ b/src/h3/mod.rs
@@ -166,6 +166,14 @@
//! // Peer signalled it is going away, handle it.
//! },
//!
+//! Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => {
+//! // Peer sent STOP_SENDING, handle it.
+//! },
+//!
+//! Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => {
+//! // Peer sent RESET_STREAM, handle it.
+//! },
+//!
//! Err(quiche::h3::Error::Done) => {
//! // Done reading.
//! break;
@@ -174,7 +182,7 @@
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
-//! },
+//! }
//! }
//! }
//! # Ok::<(), quiche::h3::Error>(())
@@ -219,6 +227,14 @@
//! // Peer signalled it is going away, handle it.
//! },
//!
+//! Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => {
+//! // Peer sent STOP_SENDING, handle it.
+//! },
+//!
+//! Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => {
+//! // Peer sent RESET_STREAM, handle it.
+//! },
+//!
//! Err(quiche::h3::Error::Done) => {
//! // Done reading.
//! break;
@@ -266,6 +282,7 @@ use std::collections::HashMap;
use std::collections::VecDeque;
use crate::octets;
+use crate::h3::stream::Type;
/// List of ALPN tokens of supported HTTP/3 versions.
///
@@ -525,6 +542,20 @@ pub enum Event {
/// GOAWAY was received.
GoAway,
+
+ /// STOP_SENDING was received.
+ StopSending {
+ /// Application Protocol Error Code
+ error_code: u64
+ },
+
+ /// RESET_STREAM was received.
+ ResetStream {
+ /// Application Protocol Error Code
+ error_code: u64,
+ /// The final offset of data in this stream
+ final_size: u64,
+ }
}
struct ConnectionSettings {
@@ -1118,6 +1149,29 @@ impl Connection {
}
}
+ // Process STOP_SENDING, and trigger an event for streams of `Request` type
+ while let Some((stream_id, error_code)) = conn.poll_stop_sending() {
+ match self.streams.get(&stream_id) {
+ Some(stream) => match stream.ty() {
+ Some(Type::Request) => return Ok((stream_id,
+ Event::StopSending { error_code })),
+ _ => trace!("StopSending: stream {} is not Request type", stream_id),
+ }
+ None => trace!("StopSending: stream {} does not exists", stream_id),
+ }
+ }
+
+ // Process RESET_STREAM, and trigger an event for streams of `Request` type
+ while let Some((stream_id, error_code, final_size)) = conn.poll_reset_stream() {
+ match self.streams.get(&stream_id) {
+ Some(stream) => match stream.ty() {
+ Some(Type::Request) => return Ok((stream_id, Event::ResetStream { error_code, final_size })),
+ _ => trace!("ResetStream: stream {} is not Request type", stream_id),
+ }
+ None => trace!("ResetStream: stream {} does not exists", stream_id),
+ }
+ }
+
Err(Error::Done)
}
@@ -1281,7 +1335,7 @@ impl Connection {
fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> {
match self.open_uni_stream(conn, grease_value()) {
Ok(stream_id) => {
- trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
+ info!("{} open GREASE stream {}", conn.trace_id(), stream_id);
conn.stream_send(stream_id, b"GREASE is the word", false)?;
},
@@ -2050,6 +2104,8 @@ mod tests {
use super::testing::*;
+ use crate::Shutdown;
+
#[test]
/// Make sure that random GREASE values is within the specified limit.
fn grease_value_in_varint_limit() {
@@ -3159,6 +3215,53 @@ mod tests {
// Once the server gives flow control credits back, we can send the body.
assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
}
+
+ #[test]
+ /// Tests that `stream_shutdown` `read` will trigger StopSending event on the server,
+ /// and the server will send back RESET_STREAM, trigger ResetStream event on the client.
+ fn stop_sending_and_reset_stream() {
+ let mut s = Session::default().unwrap();
+ s.handshake().unwrap();
+
+ // Send the request
+ let (stream_id, req) = s.send_request(true).unwrap();
+ let ev_headers = Event::Headers {
+ list: req,
+ has_body: false,
+ };
+ assert_eq!(s.poll_server(), Ok((stream_id, ev_headers)));
+ assert_eq!(s.poll_server(), Ok((stream_id, Event::Finished)));
+
+ // Start the response
+ let resp = s.send_response(stream_id, false).unwrap();
+ let body = s.send_body_server(stream_id, false).unwrap();
+
+ let mut recv_buf = vec![0; body.len()];
+ let ev_headers = Event::Headers {
+ list: resp,
+ has_body: true,
+ };
+
+ assert_eq!(s.poll_client(), Ok((stream_id, ev_headers)));
+ assert_eq!(s.poll_client(), Ok((stream_id, Event::Data)));
+ assert_eq!(s.recv_body_client(stream_id, &mut recv_buf), Ok(body.len()));
+
+ // The client shutdown will send STOP_SENDING to the server
+ let error_code = 12345;
+ s.pipe.client.stream_shutdown(stream_id, Shutdown::Read, error_code).unwrap();
+ s.advance().ok();
+
+ // Verify StopSending event received on the server
+ let stop_sending = Event::StopSending { error_code };
+ assert_eq!(s.poll_server(), Ok((stream_id, stop_sending)));
+
+ // Server should respond with a RESET_STREAM
+ // Verify ResetStream event received on the client
+ let client_stream = s.pipe.client.streams.get(stream_id).unwrap();
+ let final_size = client_stream.recv.max_off();
+ let reset_stream = Event::ResetStream { error_code, final_size};
+ assert_eq!(s.poll_client(), Ok((stream_id, reset_stream)));
+ }
}
mod ffi;
diff --git a/src/h3/stream.rs b/src/h3/stream.rs
index f2f8f0c..091cf66 100644
--- a/src/h3/stream.rs
+++ b/src/h3/stream.rs
@@ -121,7 +121,7 @@ pub struct Stream {
/// The write offset in the state buffer, that is, how many bytes have
/// already been read from the transport for the current state. When
- /// it reaches `stream_len` the state can be completed.
+ /// it reaches `state_len` the state can be completed.
state_off: usize,
/// The type of the frame currently being parsed.
@@ -178,6 +178,10 @@ impl Stream {
self.state
}
+ pub fn ty(&self) -> Option<Type> {
+ self.ty
+ }
+
/// Sets the stream's type and transitions to the next state.
pub fn set_ty(&mut self, ty: Type) -> Result<()> {
assert_eq!(self.state, State::StreamType);
diff --git a/src/lib.rs b/src/lib.rs
index 79c942b..474c44a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -413,6 +413,7 @@ impl std::convert::From<octets::BufferTooShortError> for Error {
/// This should be used when calling [`stream_shutdown()`].
///
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
+#[derive(Debug, PartialEq)]
#[repr(C)]
pub enum Shutdown {
/// Stop receiving stream data.
@@ -2247,6 +2248,49 @@ impl Connection {
in_flight = true;
}
}
+
+ // Create STOP_SENDING frames as needed.
+ for (stream_id, error_code) in self
+ .streams
+ .recv_aborted()
+ .map(|(&k, &v)| (k, v))
+ .collect::<Vec<(u64, u64)>>()
+ {
+ let frame = frame::Frame::StopSending { stream_id, error_code };
+
+ if push_frame_to_pkt!(frames, frame, payload_len, left) {
+ self.streams.mark_recv_aborted(stream_id, false, error_code);
+
+ ack_eliciting = true;
+ in_flight = true;
+ }
+ }
+
+ // Create RESET_STREAM frames as needed
+ for (stream_id, error_code) in self
+ .streams
+ .will_reset()
+ .map(|(&k, &v)| (k, v))
+ .collect::<Vec<(u64, u64)>>()
+ {
+ let stream = match self.streams.get(stream_id) {
+ Some(s) => s,
+ None => {
+ self.streams.mark_will_reset(stream_id, false, error_code);
+ continue;
+ },
+ };
+
+ let final_size = stream.send.off_front();
+ let frame = frame::Frame::ResetStream { stream_id, error_code, final_size };
+
+ if push_frame_to_pkt!(frames, frame, payload_len, left) {
+ self.streams.mark_will_reset(stream_id, false, error_code);
+
+ ack_eliciting = true;
+ in_flight = true;
+ }
+ }
}
// Create CONNECTION_CLOSE frame.
@@ -2880,26 +2924,32 @@ impl Connection {
/// [`stream_recv()`]: struct.Connection.html#method.stream_recv
/// [`stream_send()`]: struct.Connection.html#method.stream_send
pub fn stream_shutdown(
- &mut self, stream_id: u64, direction: Shutdown, _err: u64,
+ &mut self, stream_id: u64, direction: Shutdown, err: u64,
) -> Result<()> {
+ trace!("stream_shutdown: stream {} direction {:?} err {}", stream_id, direction, err);
+
// Get existing stream.
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
match direction {
- // TODO: send STOP_SENDING
Shutdown::Read => {
stream.recv.shutdown()?;
// Once shutdown, the stream is guaranteed to be non-readable.
self.streams.mark_readable(stream_id, false);
+
+ // Mark for sending STOP_SENDING
+ self.streams.mark_recv_aborted(stream_id, true, err);
},
- // TODO: send RESET_STREAM
Shutdown::Write => {
stream.send.shutdown()?;
// Once shutdown, the stream is guaranteed to be non-writable.
self.streams.mark_writable(stream_id, false);
+
+ // Mark for sending RESET_STREAM
+ self.streams.mark_will_reset(stream_id, true, err);
},
}
@@ -2934,6 +2984,17 @@ impl Connection {
stream.recv.is_fin()
}
+ /// Returns true if the stream is shutdown for its send direction
+ pub fn stream_is_shutdown(&self, stream_id: u64) -> bool {
+ let stream = match self.streams.get(stream_id) {
+ Some(v) => v,
+
+ None => return true,
+ };
+
+ stream.send.is_shutdown()
+ }
+
/// Initializes the stream's application data.
///
/// This can be used by applications to store per-stream information without
@@ -3230,6 +3291,16 @@ impl Connection {
.is_none()
}
+ /// Returns the (stream_id, error_code) of a STOP_SENDING frame
+ pub fn poll_stop_sending(&mut self) -> Option<(u64, u64)> {
+ self.streams.poll_stop_sending()
+ }
+
+ /// Returns the (stream_id, error_code, final_size) of a RESET_STREAM frame
+ pub fn poll_reset_stream(&mut self) -> Option<(u64, u64, u64)> {
+ self.streams.poll_reset_stream()
+ }
+
/// Returns the amount of time until the next timeout event.
///
/// Once the given duration has elapsed, the [`on_timeout()`] method should
@@ -3523,6 +3594,7 @@ impl Connection {
self.streams.should_update_max_streams_uni() ||
self.streams.has_flushable() ||
self.streams.has_almost_full() ||
+ self.streams.has_stop_sending() ||
self.streams.has_blocked())
{
return Ok(packet::EPOCH_APPLICATION);
@@ -3592,8 +3664,8 @@ impl Connection {
frame::Frame::ResetStream {
stream_id,
+ error_code,
final_size,
- ..
} => {
// Peer can't send on our unidirectional streams.
if !stream::is_bidi(stream_id) &&
@@ -3602,6 +3674,8 @@ impl Connection {
return Err(Error::InvalidStreamState);
}
+ info!("{} stream {} received RESET_STREAM error_code {} final_size {}", self.trace_id, stream_id, error_code, final_size);
+
// Get existing stream or create a new one, but if the stream
// has already been closed and collected, ignore the frame.
//
@@ -3615,7 +3689,10 @@ impl Connection {
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
- Err(Error::Done) => return Ok(()),
+ Err(Error::Done) => {
+ info!("{} stream {} closed or collected, ignore RESET_STREAM", self.trace_id, stream_id);
+ return Ok(());
+ },
Err(e) => return Err(e),
};
@@ -3625,15 +3702,44 @@ impl Connection {
if self.rx_data > self.max_rx_data {
return Err(Error::FlowControl);
}
+
+ self.streams.mark_reset_stream(stream_id, error_code, final_size);
},
- frame::Frame::StopSending { stream_id, .. } => {
+ frame::Frame::StopSending { stream_id, error_code } => {
// STOP_SENDING on a receive-only stream is a fatal error.
if !stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
+ error!("STOP_SENDING on a receive-only stream is a fatal error");
return Err(Error::InvalidStreamState);
}
+
+ // STOP_SENDING on a locally-initiated stream that
+ // has not yet been created is a fatal error
+ if stream::is_local(stream_id, self.is_server) &&
+ self.streams.get(stream_id).is_none()
+ {
+ error!("STOP_SENDING on a non-existing locally-initiated stream");
+ return Err(Error::InvalidStreamState);
+ }
+
+ info!("{} stream {} received STOP_SENDING error_code {}", self.trace_id, stream_id, error_code);
+
+ // what happens if this stream is in "Data sent" state?
+ match self.get_or_create_stream(stream_id, false) {
+ Ok(_) => {},
+
+ Err(Error::Done) => {
+ info!("{} stream {} closed or collected, ignore STOP_SENDING", self.trace_id, stream_id);
+ return Ok(());
+ },
+
+ Err(e) => return Err(e),
+ }
+
+ self.streams.mark_stop_sending(stream_id, error_code);
+ self.stream_shutdown(stream_id, Shutdown::Write, error_code)?;
},
frame::Frame::Crypto { data } => {
@@ -7746,6 +7852,201 @@ mod tests {
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Err(Error::Done));
}
+
+ #[test]
+ /// Tests stream_shutdown read marks recv_aborted and sends out STOP_SENDING,
+ /// the server will shutdown its write.
+ fn stream_recv_aborted() {
+ let mut buf = [0; 65535];
+
+ let mut pipe = testing::Pipe::default().unwrap();
+
+ assert_eq!(pipe.handshake(&mut buf), Ok(()));
+
+ // No streams recv_aborted.
+ let mut recv_aborted = pipe.client.streams.recv_aborted();
+ assert_eq!(recv_aborted.next(), None);
+
+ // Client sends some request
+ assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Server sends some response
+ assert_eq!(
+ pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
+ Ok(15)
+ );
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Client stream is readable
+ let mut readable = pipe.client.readable();
+ assert_eq!(readable.next(), Some(4));
+
+ // Client drains stream.
+ let mut b = [0; 15];
+ pipe.client.stream_recv(4, &mut b).unwrap();
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Server is writable
+ let mut writable = pipe.server.writable();
+ assert_eq!(writable.next(), Some(4));
+
+ // Client shuts down Read, hence sends STOP_SENDING
+ let error_code: u64 = 12345;
+ pipe.client.stream_shutdown(4, Shutdown::Read, error_code).unwrap();
+ let mut recv_aborted = pipe.client.streams.recv_aborted();
+ assert_eq!(recv_aborted.next(), Some((&4, &error_code)));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Client stream is no longer readable
+ let mut readable = pipe.client.readable();
+ assert_eq!(readable.next(), None);
+
+ // Server receives STOP_SENDING, shuts down Write, no longer writable
+ let mut writable = pipe.server.writable();
+ assert_eq!(writable.next(), None);
+ }
+
+ #[test]
+ /// Tests stream_shutdown write marks will_reset and sends out RESET_STREAM,
+ /// the client will shutdown its read.
+ fn stream_will_reset() {
+ let mut buf = [0; 65535];
+
+ let mut pipe = testing::Pipe::default().unwrap();
+
+ assert_eq!(pipe.handshake(&mut buf), Ok(()));
+
+ // No streams recv_aborted.
+ let mut recv_aborted = pipe.client.streams.recv_aborted();
+ assert_eq!(recv_aborted.next(), None);
+
+ // Client sends some request
+ assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Server sends some response
+ assert_eq!(
+ pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
+ Ok(15)
+ );
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Client stream is readable
+ let mut readable = pipe.client.readable();
+ assert_eq!(readable.next(), Some(4));
+
+ // Client drains stream.
+ let mut b = [0; 15];
+ pipe.client.stream_recv(4, &mut b).unwrap();
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Server is writable
+ let mut writable = pipe.server.writable();
+ assert_eq!(writable.next(), Some(4));
+
+ // Server shuts down Write, hence sends RESET_STREAM
+ let error_code: u64 = 12345;
+ pipe.server.stream_shutdown(4, Shutdown::Write, error_code).unwrap();
+ let mut will_reset = pipe.server.streams.will_reset();
+ assert_eq!(will_reset.next(), Some((&4, &error_code)));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // Server no longer writable
+ let mut writable = pipe.server.writable();
+ assert_eq!(writable.next(), None);
+
+ // Client receives RESET_STREAM, is no longer readable
+ let mut readable = pipe.client.readable();
+ assert_eq!(readable.next(), None);
+ }
+
+ #[test]
+ /// Tests it's okay to send STOP_SENDING to create client-initiated bidirectional stream
+ fn stop_sending_frame_new_stream() {
+ let mut buf = [0; 65535];
+
+ let mut pipe = testing::Pipe::default().unwrap();
+
+ assert_eq!(pipe.handshake(&mut buf), Ok(()));
+
+ let stream_id = 4; // a new stream
+
+ let frames = [frame::Frame::StopSending {
+ stream_id,
+ error_code: 12345,
+ }];
+
+ let pkt_type = packet::Type::Short;
+
+ // STOP_SENDING is okay
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(44));
+
+ // still able to send data to the server
+ assert_eq!(pipe.client.stream_send(stream_id, b"hello", true), Ok(5));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ // server can read from the stream
+ let mut readable = pipe.server.readable();
+ assert_eq!(readable.next(), Some(stream_id));
+
+ // server cannot write to the stream
+ let mut writable = pipe.server.writable();
+ assert_eq!(writable.next(), None);
+ }
+
+ #[test]
+ /// Tests STOP_SENDING is invalid for receive-only, non-locally-initiated stream
+ fn invalid_stop_sending_frame_recv_only() {
+ let mut buf = [0; 65535];
+
+ let mut pipe = testing::Pipe::default().unwrap();
+
+ assert_eq!(pipe.handshake(&mut buf), Ok(()));
+
+ // Use stream_id 2 for client-initiated uni-directional (receive-only for server)
+ let stream_id = 2;
+ assert_eq!(pipe.client.stream_send(stream_id, b"hello", false), Ok(5));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+
+ let frames = [frame::Frame::StopSending {
+ stream_id,
+ error_code: 12345,
+ }];
+
+ let pkt_type = packet::Type::Short;
+
+ // invalid: STOP_SENDING on a receive-only stream is a fatal error
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
+ Err(Error::InvalidStreamState));
+
+ // The stream still works
+ assert_eq!(pipe.client.stream_send(stream_id, b"world", true), Ok(5));
+ assert_eq!(pipe.advance(&mut buf), Ok(()));
+ }
+
+ #[test]
+ /// Tests STOP_SENDING is invalid for non-existing, locally-initiated stream
+ fn invalid_stop_sending_frame_non_existing_stream() {
+ let mut buf = [0; 65535];
+
+ let mut pipe = testing::Pipe::default().unwrap();
+
+ assert_eq!(pipe.handshake(&mut buf), Ok(()));
+
+ // Use stream_id 3 for server-initiated stream, non-existing yet
+ let stream_id = 3;
+ let frames = [frame::Frame::StopSending {
+ stream_id,
+ error_code: 12345,
+ }];
+
+ let pkt_type = packet::Type::Short;
+
+ // invalid: STOP_SENDING on a non-existing locally-initiated stream is a fatal error
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
+ Err(Error::InvalidStreamState));
+ }
}
pub use crate::packet::Header;
diff --git a/src/stream.rs b/src/stream.rs
index c2fbf36..10a9e9b 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -113,6 +113,22 @@ pub struct StreamMap {
/// of the map elements represents the offset of the stream at which the
/// blocking occurred.
blocked: HashMap<u64, u64>,
+
+ /// Set of stream IDs corresponding to streams that have shut down recv and need
+ /// to send STOP_SENDING. The value of the map elements represents Application
+ /// Error Code.
+ recv_aborted: HashMap<u64, u64>,
+
+ /// Set of stream IDs corresponding to streams that have shut down send or
+ /// received STOP_SENDING. In both cases, we want to send RESET_STREAM to the peer.
+ /// The value of the map elements represents the error code.
+ will_reset: HashMap<u64, u64>,
+
+ /// Queue of (stream_id, error_code) corresponding to STOP_SENDING received.
+ stop_sending: VecDeque<(u64, u64)>,
+
+ /// Queue of (stream_id, error_code, final_size) corresponding to RESET_STREAM received.
+ reset_stream: VecDeque<(u64, u64, u64)>,
}
impl StreamMap {
@@ -229,6 +245,7 @@ impl StreamMap {
},
};
+ info!("get_or_create: creating stream {}", id);
let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local);
v.insert(s)
},
@@ -355,6 +372,38 @@ impl StreamMap {
}
}
+ pub fn mark_recv_aborted(&mut self, stream_id: u64, aborted: bool, error_code: u64) {
+ if aborted {
+ self.recv_aborted.insert(stream_id, error_code);
+ } else {
+ self.recv_aborted.remove(&stream_id);
+ }
+ }
+
+ pub fn mark_will_reset(&mut self, stream_id: u64, reset: bool, error_code: u64) {
+ if reset {
+ self.will_reset.insert(stream_id, error_code);
+ } else {
+ self.will_reset.remove(&stream_id);
+ }
+ }
+
+ pub fn mark_stop_sending(&mut self, stream_id: u64, error_code: u64) {
+ self.stop_sending.push_back((stream_id, error_code));
+ }
+
+ pub fn poll_stop_sending(&mut self) -> Option<(u64, u64)> {
+ self.stop_sending.pop_front()
+ }
+
+ pub fn mark_reset_stream(&mut self, stream_id: u64, error_code: u64, final_size: u64) {
+ self.reset_stream.push_back((stream_id, error_code, final_size));
+ }
+
+ pub fn poll_reset_stream(&mut self) -> Option<(u64, u64, u64)> {
+ self.reset_stream.pop_front()
+ }
+
/// Updates the peer's maximum bidirectional stream count limit.
pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
@@ -421,6 +470,14 @@ impl StreamMap {
StreamIter::from(&self.almost_full)
}
+ pub fn recv_aborted(&self) -> hash_map::Iter<u64, u64> {
+ self.recv_aborted.iter()
+ }
+
+ pub fn will_reset(&self) -> hash_map::Iter<u64, u64> {
+ self.will_reset.iter()
+ }
+
/// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
self.blocked.iter()
@@ -442,6 +499,10 @@ impl StreamMap {
!self.blocked.is_empty()
}
+ pub fn has_stop_sending(&self) -> bool {
+ !self.recv_aborted.is_empty()
+ }
+
/// Returns true if the max bidirectional streams count needs to be updated
/// by sending a MAX_STREAMS frame to the peer.
pub fn should_update_max_streams_bidi(&self) -> bool {
@@ -922,6 +983,7 @@ impl SendBuf {
if self.shutdown {
// Since we won't write any more data anyway, pretend that we sent
// all data that was passed in.
+ error!("Pretending sending {} bytes even the stream is shutdown.", data.len());
return Ok(data.len());
}
@@ -1100,6 +1162,11 @@ impl SendBuf {
false
}
+ /// Returns true if the SendBuf is shutdown
+ pub fn is_shutdown(&self) -> bool {
+ self.shutdown
+ }
+
/// Returns true if the send-side of the stream is complete.
///
/// This happens when the stream's send final size is known, and the peer
diff --git a/tools/apps/src/lib.rs b/tools/apps/src/lib.rs
index 3f72fc9..52fd10e 100644
--- a/tools/apps/src/lib.rs
+++ b/tools/apps/src/lib.rs
@@ -1255,6 +1255,23 @@ impl HttpConn for Http3Conn {
);
},
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => {
+ info!(
+ "Received STOP_SENDING stream_id={} error_code={}",
+ stream_id,
+ error_code
+ );
+ },
+
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => {
+ info!(
+ "Received RESET_STREAM stream_id={} error_code={} final_size={}",
+ stream_id,
+ error_code,
+ final_size
+ );
+ },
+
Err(quiche::h3::Error::Done) => {
break;
},
@@ -1405,6 +1422,23 @@ impl HttpConn for Http3Conn {
.send_goaway(conn, self.largest_processed_request)?;
},
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => {
+ info!(
+ "Received STOP_SENDING stream_id={} error_code={}",
+ stream_id,
+ error_code
+ );
+ },
+
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => {
+ info!(
+ "Received RESET_STREAM stream_id={} error_code={} final_size={}",
+ stream_id,
+ error_code,
+ final_size
+ );
+ },
+
Err(quiche::h3::Error::Done) => {
break;
},
diff --git a/tools/http3_test/src/runner.rs b/tools/http3_test/src/runner.rs
index 7befde1..6a5de8a 100644
--- a/tools/http3_test/src/runner.rs
+++ b/tools/http3_test/src/runner.rs
@@ -277,6 +277,20 @@ pub fn run(
Ok((_goaway_id, quiche::h3::Event::GoAway)) => (),
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => {
+ info!(
+ "StopSending received for stream {} error_code {}",
+ stream_id, error_code
+ );
+ },
+
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => {
+ info!(
+ "ResetStream received from stream {} error_code {} final_size {}",
+ stream_id, error_code, final_size
+ );
+ },
+
Err(quiche::h3::Error::Done) => {
break;
},
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment