Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

/codec.diff Secret

Created January 12, 2016 19:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/b34128b09de1a21d2f3d to your computer and use it in GitHub Desktop.
Save anonymous/b34128b09de1a21d2f3d to your computer and use it in GitHub Desktop.
diff --git a/lib/Mojo/Codec.pm b/lib/Mojo/Codec.pm
new file mode 100644
index 0000000..642399b
--- /dev/null
+++ b/lib/Mojo/Codec.pm
@@ -0,0 +1,9 @@
+package Mojo::Codec;
+use Mojo::Base -base;
+
+use Carp 'croak';
+
+sub read { croak 'Method "read" not implemented by subclass' }
+sub write { croak 'Method "write" not implemented by subclass' }
+
+1;
diff --git a/lib/Mojo/Codec/HTTP.pm b/lib/Mojo/Codec/HTTP.pm
new file mode 100644
index 0000000..e0fd018
--- /dev/null
+++ b/lib/Mojo/Codec/HTTP.pm
@@ -0,0 +1,109 @@
+package Mojo::Codec::HTTP;
+use Mojo::Base 'Mojo::Codec';
+
+use Scalar::Util 'weaken';
+
+sub is_server {undef}
+
+sub reset { delete @{$_[0]}{qw(http_state offset tx write writing)} }
+
+sub start {
+ my ($self, $tx) = @_;
+ $self->{tx} = $tx;
+ weaken $self;
+ $tx->on(resume => sub { $self->{writing}++ });
+ return $self;
+}
+
+sub tx { shift->{tx} }
+
+sub write {
+ my $self = shift;
+
+ # Client starts writing right away
+ my $server = $self->is_server;
+ return '' unless $server ? $self->{writing} : ($self->{writing} //= 1);
+
+ # Nothing written yet
+ my $tx = $self->{tx};
+ $self->{$_} ||= 0 for qw(offset write);
+ my $msg = $server ? $tx->res : $tx->req;
+ @$self{qw(http_state write)} = ('start_line', $msg->start_line_size)
+ unless $self->{http_state};
+
+ # Start-line
+ my $chunk = '';
+ $chunk .= $self->_start_line($msg) if $self->{http_state} eq 'start_line';
+
+ # Headers
+ $chunk .= $self->_headers($msg, $server) if $self->{http_state} eq 'headers';
+
+ # Body
+ $chunk .= $self->_body($msg, $server) if $self->{http_state} eq 'body';
+
+ return $chunk;
+}
+
+sub _body {
+ my ($self, $msg, $finish) = @_;
+
+ # Prepare body chunk
+ my $buffer = $msg->get_body_chunk($self->{offset});
+ my $written = defined $buffer ? length $buffer : 0;
+ $self->{write} = $msg->content->is_dynamic ? 1 : ($self->{write} - $written);
+ $self->{offset} += $written;
+
+ # Delayed
+ $self->{writing} = 0 unless defined $buffer;
+
+ # Finished
+ $finish ? $self->{tx}->completed : ($self->{writing} = 0)
+ if $self->{write} <= 0 || defined $buffer && $buffer eq '';
+
+ return $buffer // '';
+}
+
+sub _headers {
+ my ($self, $msg, $head) = @_;
+
+ # Prepare header chunk
+ my $buffer = $msg->get_header_chunk($self->{offset});
+ my $written = defined $buffer ? length $buffer : 0;
+ $self->{write} -= $written;
+ $self->{offset} += $written;
+
+ # Switch to body
+ if ($self->{write} <= 0) {
+ $self->{offset} = 0;
+
+ # Response without body
+ my $tx = $self->{tx};
+ if ($head && $tx->is_empty) { $tx->completed }
+
+ # Body
+ else {
+ $self->{http_state} = 'body';
+ $self->{write} = $msg->content->is_dynamic ? 1 : $msg->body_size;
+ }
+ }
+
+ return $buffer;
+}
+
+sub _start_line {
+ my ($self, $msg) = @_;
+
+ # Prepare start-line chunk
+ my $buffer = $msg->get_start_line_chunk($self->{offset});
+ my $written = defined $buffer ? length $buffer : 0;
+ $self->{write} -= $written;
+ $self->{offset} += $written;
+
+ # Switch to headers
+ @$self{qw(http_state write offset)} = ('headers', $msg->header_size, 0)
+ if $self->{write} <= 0;
+
+ return $buffer;
+}
+
+1;
diff --git a/lib/Mojo/Codec/HTTP/Client.pm b/lib/Mojo/Codec/HTTP/Client.pm
new file mode 100644
index 0000000..afdeebd
--- /dev/null
+++ b/lib/Mojo/Codec/HTTP/Client.pm
@@ -0,0 +1,4 @@
+package Mojo::Codec::HTTP::Client;
+use Mojo::Base 'Mojo::Codec::HTTP';
+
+1;
diff --git a/lib/Mojo/Codec/HTTP/Server.pm b/lib/Mojo/Codec/HTTP/Server.pm
new file mode 100644
index 0000000..5ecc58e
--- /dev/null
+++ b/lib/Mojo/Codec/HTTP/Server.pm
@@ -0,0 +1,24 @@
+package Mojo::Codec::HTTP::Server;
+use Mojo::Base 'Mojo::Codec::HTTP';
+
+sub is_server {1}
+
+sub read {
+ my ($self, $chunk) = @_;
+
+ # Parse request
+ my $tx = $self->{tx};
+ my $req = $tx->req;
+ $req->parse($chunk) unless $req->error;
+
+ # Generate response
+ $tx->received if $req->is_finished && !$self->{handled}++;
+}
+
+sub reset {
+ my $self = shift;
+ delete $self->{handled};
+ return $self->SUPER::reset;
+}
+
+1;
diff --git a/lib/Mojo/Codec/WebSocket.pm b/lib/Mojo/Codec/WebSocket.pm
new file mode 100644
index 0000000..3854a97
--- /dev/null
+++ b/lib/Mojo/Codec/WebSocket.pm
@@ -0,0 +1,41 @@
+package Mojo::Codec::WebSocket;
+use Mojo::Base 'Mojo::Codec';
+
+use Mojo::WebSocket qw(build_frame parse_frame);
+use Scalar::Util 'weaken';
+
+has 'masked';
+
+sub read {
+ my ($self, $chunk) = @_;
+
+ my $tx = $self->{tx};
+ $self->{read} .= $chunk // '';
+ my $max = $tx->max_websocket_size;
+ while (my $frame = parse_frame \$self->{read}, $max) {
+ $tx->finish(1009) and last unless ref $frame;
+ $tx->frame($frame);
+ }
+
+ $tx->resume;
+}
+
+sub start {
+ my ($self, $tx) = @_;
+ $self->{tx} = $tx;
+ weaken $self;
+ $tx->on(send => sub { $self->{write} .= build_frame 0, @{pop()} });
+ return $self;
+}
+
+sub tx { shift->{tx} }
+
+sub write {
+ my $self = shift;
+ my $tx = $self->{tx};
+ $tx->emit('drain') if ($self->{write} //= '') eq ''; # TODO
+ $tx->completed if $self->{write} eq '' && $self->{closing};
+ return delete $self->{write};
+}
+
+1;
diff --git a/lib/Mojo/Server/Daemon.pm b/lib/Mojo/Server/Daemon.pm
index 8b55653..e1f044d 100644
--- a/lib/Mojo/Server/Daemon.pm
+++ b/lib/Mojo/Server/Daemon.pm
@@ -2,6 +2,8 @@ package Mojo::Server::Daemon;
use Mojo::Base 'Mojo::Server';
use Carp 'croak';
+use Mojo::Codec::HTTP::Server;
+use Mojo::Codec::WebSocket;
use Mojo::IOLoop;
use Mojo::Transaction::WebSocket;
use Mojo::URL;
@@ -111,7 +113,7 @@ sub _close {
my ($self, $id) = @_;
# Finish gracefully
- if (my $tx = $self->{connections}{$id}{tx}) { $tx->server_close }
+ if (my $tx = $self->{connections}{$id}{codec}->tx) { $tx->server_close }
delete $self->{connections}{$id};
}
@@ -121,18 +123,19 @@ sub _finish {
# Always remove connection for WebSockets
my $c = $self->{connections}{$id};
- return unless my $tx = $c->{tx};
+ return unless my $tx = $c->{codec}->tx;
return $self->_remove($id) if $tx->is_websocket;
# Finish transaction
- delete($c->{tx})->server_close;
+ $c->{codec}->reset;
+ $tx->server_close;
# Upgrade connection to WebSocket
if (my $ws = delete $c->{next}) {
# Successful upgrade
if ($ws->handshake->res->code == 101) {
- $c->{tx} = $ws;
+ $c->{codec} = Mojo::Codec::WebSocket->new->start($ws);
weaken $self;
$ws->on(resume => sub { $self->_write($id) });
$ws->server_open;
@@ -147,8 +150,7 @@ sub _finish {
# Build new transaction for leftovers
return if (my $leftovers = $tx->req->content->leftovers) eq '';
- $tx = $c->{tx} = $self->_build_tx($id, $c);
- $tx->server_read($leftovers);
+ $c->{codec}->start($self->_build_tx($id, $c))->read($leftovers);
}
sub _listen {
@@ -180,7 +182,8 @@ sub _listen {
$options => sub {
my ($loop, $stream, $id) = @_;
- $self->{connections}{$id} = {tls => $tls};
+ my $c = $self->{connections}{$id}
+ = {codec => Mojo::Codec::HTTP::Server->new, tls => $tls};
warn "-- Accept $id (@{[$stream->handle->peerhost]})\n" if DEBUG;
$stream->timeout($self->inactivity_timeout);
@@ -191,7 +194,7 @@ sub _listen {
$stream->on(
timeout => sub {
$self->app->log->debug('Inactivity timeout')
- if $self->{connections}{$id}{tx};
+ if $self->{connections}{$id}{codec}->tx;
}
);
}
@@ -208,10 +211,12 @@ sub _read {
my ($self, $id, $chunk) = @_;
# Make sure we have a transaction
- my $c = $self->{connections}{$id};
- my $tx = $c->{tx} ||= $self->_build_tx($id, $c);
+ my $c = $self->{connections}{$id};
+ my $codec = $c->{codec};
+ $codec->start($self->_build_tx($id, $c)) unless $codec->tx;
+ my $tx = $codec->tx;
warn term_escape "-- Server <<< Client (@{[_url($tx)]})\n$chunk\n" if DEBUG;
- $tx->server_read($chunk);
+ $codec->read($chunk);
}
sub _remove {
@@ -226,9 +231,10 @@ sub _write {
my ($self, $id) = @_;
# Protect from resume event recursion
- my $c = $self->{connections}{$id};
- return if !(my $tx = $c->{tx}) || $c->{writing}++;
- my $chunk = $tx->server_write;
+ my $c = $self->{connections}{$id};
+ my $codec = $c->{codec};
+ return if !(my $tx = $codec->tx) || $c->{writing}++;
+ my $chunk = $codec->write;
delete $c->{writing};
warn term_escape "-- Server >>> Client (@{[_url($tx)]})\n$chunk\n" if DEBUG;
my $stream = $self->ioloop->stream($id)->write($chunk);
diff --git a/lib/Mojo/Transaction/HTTP.pm b/lib/Mojo/Transaction/HTTP.pm
index 5643fb1..d34fd8a 100644
--- a/lib/Mojo/Transaction/HTTP.pm
+++ b/lib/Mojo/Transaction/HTTP.pm
@@ -58,6 +58,8 @@ sub keep_alive {
return 1;
}
+sub received { shift->emit('request') }
+
sub redirects {
my $previous = shift;
my @redirects;
@@ -290,6 +292,12 @@ Check transaction for C<HEAD> request and C<1xx>, C<204> or C<304> response.
Check if connection can be kept alive.
+=head2 received
+
+ $tx->received;
+
+Emit L</"request"> event.
+
=head2 redirects
my $redirects = $tx->redirects;
diff --git a/lib/Mojo/Transaction/WebSocket.pm b/lib/Mojo/Transaction/WebSocket.pm
index b1d9b0b..1ff4c59 100644
--- a/lib/Mojo/Transaction/WebSocket.pm
+++ b/lib/Mojo/Transaction/WebSocket.pm
@@ -64,6 +64,8 @@ sub finish {
return $self;
}
+sub frame { shift->emit(frame => shift) }
+
sub is_established { !!shift->{open} }
sub is_websocket {1}
@@ -94,10 +96,11 @@ sub res { shift->handshake->res }
sub resume { $_[0]->handshake->resume and return $_[0] }
sub send {
- my ($self, $msg, $cb) = @_;
+ my ($self, $frame, $cb) = @_;
$self->once(drain => $cb) if $cb;
- $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
- $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
+ $frame = $self->build_message($frame) unless ref $frame eq 'ARRAY';
+ $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$frame);
+ $self->emit(send => $frame);
return $self->SUPER::resume;
}
@@ -319,6 +322,15 @@ least one subscriber.
say "Message: $msg";
});
+=head2 send
+
+ $ws->on(frame => sub {
+ my ($ws, $frame) = @_;
+ ...
+ });
+
+Emitted when a WebSocket frame has been sent.
+
=head2 text
$ws->on(text => sub {
@@ -409,6 +421,12 @@ Connection identifier.
Close WebSocket connection gracefully.
+=head2 frame
+
+ $ws->frame($frame);
+
+Emit L</"frame"> event.
+
=head2 is_established
my $bool = $ws->is_established;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment