Skip to content

Instantly share code, notes, and snippets.

/congestion.diff Secret

Created January 31, 2016 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/bb6c0506ba3490d78da7 to your computer and use it in GitHub Desktop.
Save anonymous/bb6c0506ba3490d78da7 to your computer and use it in GitHub Desktop.
diff --git a/Changes b/Changes
index 7f58173..4ddf36f 100644
--- a/Changes
+++ b/Changes
@@ -1,8 +1,10 @@
-6.43 2016-01-31
+6.43 2016-02-01
- Removed client_close and server_close methods from Mojo::Transaction.
+ - Added high_watermark and low_watermark attributes to Mojo::IOLoop::Stream.
- Added closed method to Mojo::Transaction.
- Added parse_message method to Mojo::Transaction::WebSocket.
+ - Added congestion event to Mojo::IOLoop::Stream.
- Improved a few examples to avoid timing attacks.
- Fixed timing bug in Mojo::Server::Daemon.
diff --git a/lib/Mojo/IOLoop/Stream.pm b/lib/Mojo/IOLoop/Stream.pm
index 04b25fa..c0b4dac 100644
--- a/lib/Mojo/IOLoop/Stream.pm
+++ b/lib/Mojo/IOLoop/Stream.pm
@@ -6,7 +6,9 @@ use Mojo::IOLoop;
use Mojo::Util;
use Scalar::Util 'weaken';
-has reactor => sub { Mojo::IOLoop->singleton->reactor };
+has high_watermark => 16777216;
+has low_watermark => 0;
+has reactor => sub { Mojo::IOLoop->singleton->reactor };
sub DESTROY { Mojo::Util::_global_destruction() or shift->close }
@@ -44,6 +46,7 @@ sub start {
return $reactor->watch($self->{handle}, 1, $self->is_writing)
if delete $self->{paused};
+ $self->{started} ? return $self : $self->{started}++;
weaken $self;
my $cb = sub { pop() ? $self->_write : $self->_read };
$reactor->io($self->timeout($self->{timeout})->{handle} => $cb);
@@ -80,10 +83,10 @@ sub write {
my ($self, $chunk, $cb) = @_;
$self->{buffer} .= $chunk;
- if ($cb) { $self->once(drain => $cb) }
- elsif ($self->{buffer} eq '') { return $self }
+ $self->once(drain => $cb) if $cb;
+ $self->emit('congestion') if length $self->{buffer} >= $self->high_watermark;
$self->reactor->watch($self->{handle}, !$self->{paused}, 1)
- if $self->{handle};
+ if $self->{handle} && ($cb || $self->{buffer} ne '');
return $self;
}
@@ -114,7 +117,7 @@ sub _write {
$self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again;
}
- $self->emit('drain') if $self->{buffer} eq '';
+ $self->emit('drain') if length $self->{buffer} <= $self->low_watermark;
return if $self->is_writing;
return $self->close if $self->{graceful};
$self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle};
@@ -172,6 +175,15 @@ emit the following new ones.
Emitted if the stream gets closed.
+=head2 congestion
+
+ $stream->on(congestion => sub {
+ my $stream = shift;
+ ...
+ });
+
+Emitted if the data waiting to be written reaches L</"high_watermark">.
+
=head2 drain
$stream->on(drain => sub {
@@ -179,7 +191,7 @@ Emitted if the stream gets closed.
...
});
-Emitted once all data has been written.
+Emitted if the data waiting to be written reaches L</"low_watermark">.
=head2 error
@@ -222,6 +234,22 @@ Emitted if new data has been written to the stream.
L<Mojo::IOLoop::Stream> implements the following attributes.
+=head2 high_watermark
+
+ my $size = $stream->high_watermark;
+ $stream = $stream->high_watermark(1024);
+
+Maximum size in bytes of data waiting to be written before the L</"congestion">
+event will be emitted, defaults to C<262144> (256KB).
+
+=head2 low_watermark
+
+ my $size = $stream->low_watermark;
+ $stream = $stream->low_watermark(1024);
+
+Minimum size in bytes of data waiting to be written before the L</"drain"> event
+will be emitted, defaults to C<0>.
+
=head2 reactor
my $reactor = $stream->reactor;
diff --git a/t/mojo/ioloop.t b/t/mojo/ioloop.t
index ce13a0a..8eef67d 100644
--- a/t/mojo/ioloop.t
+++ b/t/mojo/ioloop.t
@@ -256,6 +256,36 @@ ok length($server) > length($server_after), 'stream has been resumed';
is $client, $client_after, 'stream was writable while paused';
is $client, 'works!', 'full message has been written';
+# Congestion
+my $congestion;
+$client = '';
+$id = Mojo::IOLoop->server(
+ {address => '127.0.0.1'} => sub {
+ my ($loop, $stream) = @_;
+ $stream->write('a');
+ $stream->high_watermark(3);
+ $stream->on(congestion => sub { $congestion++ });
+ $stream->write('b');
+ $stream->write('c');
+ $stream->write(
+ 'd' => sub {
+ shift->write('e' => sub { shift->close });
+ }
+ );
+ }
+);
+$port = Mojo::IOLoop->acceptor($id)->port;
+Mojo::IOLoop->client(
+ {port => $port} => sub {
+ my ($loop, $err, $stream) = @_;
+ $stream->on(read => sub { $client .= pop });
+ $stream->on(close => sub { Mojo::IOLoop->stop });
+ }
+);
+Mojo::IOLoop->start;
+is $client, 'abcde', 'full message has been written';
+is $congestion, 2, 'congestion event has been emitted twice';
+
# Graceful shutdown
$err = '';
$loop = Mojo::IOLoop->new;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment