-
-
Save anonymous/b34128b09de1a21d2f3d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/Mojo/Codec.pm b/lib/Mojo/Codec.pm | |
new file mode 100644 | |
index 0000000..642399b | |
--- /dev/null | |
+++ b/lib/Mojo/Codec.pm | |
@@ -0,0 +1,9 @@ | |
+package Mojo::Codec; | |
+use Mojo::Base -base; | |
+ | |
+use Carp 'croak'; | |
+ | |
+sub read { croak 'Method "read" not implemented by subclass' } | |
+sub write { croak 'Method "write" not implemented by subclass' } | |
+ | |
+1; | |
diff --git a/lib/Mojo/Codec/HTTP.pm b/lib/Mojo/Codec/HTTP.pm | |
new file mode 100644 | |
index 0000000..e0fd018 | |
--- /dev/null | |
+++ b/lib/Mojo/Codec/HTTP.pm | |
@@ -0,0 +1,109 @@ | |
+package Mojo::Codec::HTTP; | |
+use Mojo::Base 'Mojo::Codec'; | |
+ | |
+use Scalar::Util 'weaken'; | |
+ | |
+sub is_server {undef} | |
+ | |
+sub reset { delete @{$_[0]}{qw(http_state offset tx write writing)} } | |
+ | |
+sub start { | |
+ my ($self, $tx) = @_; | |
+ $self->{tx} = $tx; | |
+ weaken $self; | |
+ $tx->on(resume => sub { $self->{writing}++ }); | |
+ return $self; | |
+} | |
+ | |
+sub tx { shift->{tx} } | |
+ | |
+sub write { | |
+ my $self = shift; | |
+ | |
+ # Client starts writing right away | |
+ my $server = $self->is_server; | |
+ return '' unless $server ? $self->{writing} : ($self->{writing} //= 1); | |
+ | |
+ # Nothing written yet | |
+ my $tx = $self->{tx}; | |
+ $self->{$_} ||= 0 for qw(offset write); | |
+ my $msg = $server ? $tx->res : $tx->req; | |
+ @$self{qw(http_state write)} = ('start_line', $msg->start_line_size) | |
+ unless $self->{http_state}; | |
+ | |
+ # Start-line | |
+ my $chunk = ''; | |
+ $chunk .= $self->_start_line($msg) if $self->{http_state} eq 'start_line'; | |
+ | |
+ # Headers | |
+ $chunk .= $self->_headers($msg, $server) if $self->{http_state} eq 'headers'; | |
+ | |
+ # Body | |
+ $chunk .= $self->_body($msg, $server) if $self->{http_state} eq 'body'; | |
+ | |
+ return $chunk; | |
+} | |
+ | |
+sub _body { | |
+ my ($self, $msg, $finish) = @_; | |
+ | |
+ # Prepare body chunk | |
+ my $buffer = $msg->get_body_chunk($self->{offset}); | |
+ my $written = defined $buffer ? length $buffer : 0; | |
+ $self->{write} = $msg->content->is_dynamic ? 1 : ($self->{write} - $written); | |
+ $self->{offset} += $written; | |
+ | |
+ # Delayed | |
+ $self->{writing} = 0 unless defined $buffer; | |
+ | |
+ # Finished | |
+ $finish ? $self->{tx}->completed : ($self->{writing} = 0) | |
+ if $self->{write} <= 0 || defined $buffer && $buffer eq ''; | |
+ | |
+ return $buffer // ''; | |
+} | |
+ | |
+sub _headers { | |
+ my ($self, $msg, $head) = @_; | |
+ | |
+ # Prepare header chunk | |
+ my $buffer = $msg->get_header_chunk($self->{offset}); | |
+ my $written = defined $buffer ? length $buffer : 0; | |
+ $self->{write} -= $written; | |
+ $self->{offset} += $written; | |
+ | |
+ # Switch to body | |
+ if ($self->{write} <= 0) { | |
+ $self->{offset} = 0; | |
+ | |
+ # Response without body | |
+ my $tx = $self->{tx}; | |
+ if ($head && $tx->is_empty) { $tx->completed } | |
+ | |
+ # Body | |
+ else { | |
+ $self->{http_state} = 'body'; | |
+ $self->{write} = $msg->content->is_dynamic ? 1 : $msg->body_size; | |
+ } | |
+ } | |
+ | |
+ return $buffer; | |
+} | |
+ | |
+sub _start_line { | |
+ my ($self, $msg) = @_; | |
+ | |
+ # Prepare start-line chunk | |
+ my $buffer = $msg->get_start_line_chunk($self->{offset}); | |
+ my $written = defined $buffer ? length $buffer : 0; | |
+ $self->{write} -= $written; | |
+ $self->{offset} += $written; | |
+ | |
+ # Switch to headers | |
+ @$self{qw(http_state write offset)} = ('headers', $msg->header_size, 0) | |
+ if $self->{write} <= 0; | |
+ | |
+ return $buffer; | |
+} | |
+ | |
+1; | |
diff --git a/lib/Mojo/Codec/HTTP/Client.pm b/lib/Mojo/Codec/HTTP/Client.pm | |
new file mode 100644 | |
index 0000000..afdeebd | |
--- /dev/null | |
+++ b/lib/Mojo/Codec/HTTP/Client.pm | |
@@ -0,0 +1,4 @@ | |
+package Mojo::Codec::HTTP::Client; | |
+use Mojo::Base 'Mojo::Codec::HTTP'; | |
+ | |
+1; | |
diff --git a/lib/Mojo/Codec/HTTP/Server.pm b/lib/Mojo/Codec/HTTP/Server.pm | |
new file mode 100644 | |
index 0000000..5ecc58e | |
--- /dev/null | |
+++ b/lib/Mojo/Codec/HTTP/Server.pm | |
@@ -0,0 +1,24 @@ | |
+package Mojo::Codec::HTTP::Server; | |
+use Mojo::Base 'Mojo::Codec::HTTP'; | |
+ | |
+sub is_server {1} | |
+ | |
+sub read { | |
+ my ($self, $chunk) = @_; | |
+ | |
+ # Parse request | |
+ my $tx = $self->{tx}; | |
+ my $req = $tx->req; | |
+ $req->parse($chunk) unless $req->error; | |
+ | |
+ # Generate response | |
+ $tx->received if $req->is_finished && !$self->{handled}++; | |
+} | |
+ | |
+sub reset { | |
+ my $self = shift; | |
+ delete $self->{handled}; | |
+ return $self->SUPER::reset; | |
+} | |
+ | |
+1; | |
diff --git a/lib/Mojo/Codec/WebSocket.pm b/lib/Mojo/Codec/WebSocket.pm | |
new file mode 100644 | |
index 0000000..3854a97 | |
--- /dev/null | |
+++ b/lib/Mojo/Codec/WebSocket.pm | |
@@ -0,0 +1,41 @@ | |
+package Mojo::Codec::WebSocket; | |
+use Mojo::Base 'Mojo::Codec'; | |
+ | |
+use Mojo::WebSocket qw(build_frame parse_frame); | |
+use Scalar::Util 'weaken'; | |
+ | |
+has 'masked'; | |
+ | |
+sub read { | |
+ my ($self, $chunk) = @_; | |
+ | |
+ my $tx = $self->{tx}; | |
+ $self->{read} .= $chunk // ''; | |
+ my $max = $tx->max_websocket_size; | |
+ while (my $frame = parse_frame \$self->{read}, $max) { | |
+ $tx->finish(1009) and last unless ref $frame; | |
+ $tx->frame($frame); | |
+ } | |
+ | |
+ $tx->resume; | |
+} | |
+ | |
+sub start { | |
+ my ($self, $tx) = @_; | |
+ $self->{tx} = $tx; | |
+ weaken $self; | |
+ $tx->on(send => sub { $self->{write} .= build_frame 0, @{pop()} }); | |
+ return $self; | |
+} | |
+ | |
+sub tx { shift->{tx} } | |
+ | |
+sub write { | |
+ my $self = shift; | |
+ my $tx = $self->{tx}; | |
+ $tx->emit('drain') if ($self->{write} //= '') eq ''; # TODO | |
+ $tx->completed if $self->{write} eq '' && $self->{closing}; | |
+ return delete $self->{write}; | |
+} | |
+ | |
+1; | |
diff --git a/lib/Mojo/Server/Daemon.pm b/lib/Mojo/Server/Daemon.pm | |
index 8b55653..e1f044d 100644 | |
--- a/lib/Mojo/Server/Daemon.pm | |
+++ b/lib/Mojo/Server/Daemon.pm | |
@@ -2,6 +2,8 @@ package Mojo::Server::Daemon; | |
use Mojo::Base 'Mojo::Server'; | |
use Carp 'croak'; | |
+use Mojo::Codec::HTTP::Server; | |
+use Mojo::Codec::WebSocket; | |
use Mojo::IOLoop; | |
use Mojo::Transaction::WebSocket; | |
use Mojo::URL; | |
@@ -111,7 +113,7 @@ sub _close { | |
my ($self, $id) = @_; | |
# Finish gracefully | |
- if (my $tx = $self->{connections}{$id}{tx}) { $tx->server_close } | |
+ if (my $tx = $self->{connections}{$id}{codec}->tx) { $tx->server_close } | |
delete $self->{connections}{$id}; | |
} | |
@@ -121,18 +123,19 @@ sub _finish { | |
# Always remove connection for WebSockets | |
my $c = $self->{connections}{$id}; | |
- return unless my $tx = $c->{tx}; | |
+ return unless my $tx = $c->{codec}->tx; | |
return $self->_remove($id) if $tx->is_websocket; | |
# Finish transaction | |
- delete($c->{tx})->server_close; | |
+ $c->{codec}->reset; | |
+ $tx->server_close; | |
# Upgrade connection to WebSocket | |
if (my $ws = delete $c->{next}) { | |
# Successful upgrade | |
if ($ws->handshake->res->code == 101) { | |
- $c->{tx} = $ws; | |
+ $c->{codec} = Mojo::Codec::WebSocket->new->start($ws); | |
weaken $self; | |
$ws->on(resume => sub { $self->_write($id) }); | |
$ws->server_open; | |
@@ -147,8 +150,7 @@ sub _finish { | |
# Build new transaction for leftovers | |
return if (my $leftovers = $tx->req->content->leftovers) eq ''; | |
- $tx = $c->{tx} = $self->_build_tx($id, $c); | |
- $tx->server_read($leftovers); | |
+ $c->{codec}->start($self->_build_tx($id, $c))->read($leftovers); | |
} | |
sub _listen { | |
@@ -180,7 +182,8 @@ sub _listen { | |
$options => sub { | |
my ($loop, $stream, $id) = @_; | |
- $self->{connections}{$id} = {tls => $tls}; | |
+ my $c = $self->{connections}{$id} | |
+ = {codec => Mojo::Codec::HTTP::Server->new, tls => $tls}; | |
warn "-- Accept $id (@{[$stream->handle->peerhost]})\n" if DEBUG; | |
$stream->timeout($self->inactivity_timeout); | |
@@ -191,7 +194,7 @@ sub _listen { | |
$stream->on( | |
timeout => sub { | |
$self->app->log->debug('Inactivity timeout') | |
- if $self->{connections}{$id}{tx}; | |
+ if $self->{connections}{$id}{codec}->tx; | |
} | |
); | |
} | |
@@ -208,10 +211,12 @@ sub _read { | |
my ($self, $id, $chunk) = @_; | |
# Make sure we have a transaction | |
- my $c = $self->{connections}{$id}; | |
- my $tx = $c->{tx} ||= $self->_build_tx($id, $c); | |
+ my $c = $self->{connections}{$id}; | |
+ my $codec = $c->{codec}; | |
+ $codec->start($self->_build_tx($id, $c)) unless $codec->tx; | |
+ my $tx = $codec->tx; | |
warn term_escape "-- Server <<< Client (@{[_url($tx)]})\n$chunk\n" if DEBUG; | |
- $tx->server_read($chunk); | |
+ $codec->read($chunk); | |
} | |
sub _remove { | |
@@ -226,9 +231,10 @@ sub _write { | |
my ($self, $id) = @_; | |
# Protect from resume event recursion | |
- my $c = $self->{connections}{$id}; | |
- return if !(my $tx = $c->{tx}) || $c->{writing}++; | |
- my $chunk = $tx->server_write; | |
+ my $c = $self->{connections}{$id}; | |
+ my $codec = $c->{codec}; | |
+ return if !(my $tx = $codec->tx) || $c->{writing}++; | |
+ my $chunk = $codec->write; | |
delete $c->{writing}; | |
warn term_escape "-- Server >>> Client (@{[_url($tx)]})\n$chunk\n" if DEBUG; | |
my $stream = $self->ioloop->stream($id)->write($chunk); | |
diff --git a/lib/Mojo/Transaction/HTTP.pm b/lib/Mojo/Transaction/HTTP.pm | |
index 5643fb1..d34fd8a 100644 | |
--- a/lib/Mojo/Transaction/HTTP.pm | |
+++ b/lib/Mojo/Transaction/HTTP.pm | |
@@ -58,6 +58,8 @@ sub keep_alive { | |
return 1; | |
} | |
+sub received { shift->emit('request') } | |
+ | |
sub redirects { | |
my $previous = shift; | |
my @redirects; | |
@@ -290,6 +292,12 @@ Check transaction for C<HEAD> request and C<1xx>, C<204> or C<304> response. | |
Check if connection can be kept alive. | |
+=head2 received | |
+ | |
+ $tx->received; | |
+ | |
+Emit L</"request"> event. | |
+ | |
=head2 redirects | |
my $redirects = $tx->redirects; | |
diff --git a/lib/Mojo/Transaction/WebSocket.pm b/lib/Mojo/Transaction/WebSocket.pm | |
index b1d9b0b..1ff4c59 100644 | |
--- a/lib/Mojo/Transaction/WebSocket.pm | |
+++ b/lib/Mojo/Transaction/WebSocket.pm | |
@@ -64,6 +64,8 @@ sub finish { | |
return $self; | |
} | |
+sub frame { shift->emit(frame => shift) } | |
+ | |
sub is_established { !!shift->{open} } | |
sub is_websocket {1} | |
@@ -94,10 +96,11 @@ sub res { shift->handshake->res } | |
sub resume { $_[0]->handshake->resume and return $_[0] } | |
sub send { | |
- my ($self, $msg, $cb) = @_; | |
+ my ($self, $frame, $cb) = @_; | |
$self->once(drain => $cb) if $cb; | |
- $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY'; | |
- $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg); | |
+ $frame = $self->build_message($frame) unless ref $frame eq 'ARRAY'; | |
+ $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$frame); | |
+ $self->emit(send => $frame); | |
return $self->SUPER::resume; | |
} | |
@@ -319,6 +322,15 @@ least one subscriber. | |
say "Message: $msg"; | |
}); | |
+=head2 send | |
+ | |
+ $ws->on(frame => sub { | |
+ my ($ws, $frame) = @_; | |
+ ... | |
+ }); | |
+ | |
+Emitted when a WebSocket frame has been sent. | |
+ | |
=head2 text | |
$ws->on(text => sub { | |
@@ -409,6 +421,12 @@ Connection identifier. | |
Close WebSocket connection gracefully. | |
+=head2 frame | |
+ | |
+ $ws->frame($frame); | |
+ | |
+Emit L</"frame"> event. | |
+ | |
=head2 is_established | |
my $bool = $ws->is_established; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment