Last active
January 14, 2016 20:11
-
-
Save jberger/9c8398a75e2706d90bb4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RDMojo::Plugin::MQ; | |
use Mojo::Base 'Mojolicious::Plugin'; | |
use Crixa; | |
use Mojo::Util; | |
use Mojo::JSON; | |
use Carp (); | |
use Scalar::Util; | |
my $host = 'rabbitmq.service.consul'; | |
sub register { | |
my ($plugin, $app, $config) = @_; | |
$app->helper('mq.connection' => \&_connection); | |
$app->helper('mq.consume' => \&_consume); | |
$app->helper('mq.emit_exception' => sub { _emit_level( exception => @_ ) }); | |
$app->helper('mq.extract_data' => \&_extract); | |
$app->helper('mq.publish' => \&_publish); | |
Scalar::Util::weaken $app; | |
$app->log->on(message => sub { | |
my ($log, $level, @lines) = @_; | |
local $ENV{MOJO_LOG_LEVEL} = 'error'; | |
return unless ($log->can("is_$level") || sub { 1 })->(); | |
$app->mq->emit_exception(@lines); | |
}); | |
} | |
sub _connection { | |
my ($c) = @_; | |
state ($pid, $conn); | |
unless ($conn && $pid eq $$) { # fork safety | |
$pid = $$; | |
$conn = Crixa->connect(host => $host); | |
} | |
return $conn; | |
} | |
sub _consume { | |
my ($c, $name, $cb, $opts) = @_; | |
$opts ||= {}; | |
my $keys = $opts->{routing_keys} || [$name]; | |
unless (ref $keys) { | |
$keys = [$keys]; | |
} | |
my $ex = _exchange($c); | |
my $queue = $ex->queue( | |
name => $name, | |
routing_keys => $keys, | |
$opts->{queue} ? %{ $opts->{queue} } : (), | |
); | |
if ($opts->{raw_cb}) { | |
$queue->consume($cb); | |
} else { | |
my $app = $c->app; | |
Scalar::Util::weaken $app; | |
$queue->consume(sub{ | |
my $msg = shift; | |
if ($msg) { | |
local $@; | |
my $data = eval { $app->mq->extract_data($msg) }; | |
my $err = $@; | |
#TODO handle errors from callback | |
$app->$cb($err, $data, $msg); | |
} | |
return 1; | |
}); | |
} | |
} | |
sub _emit_level { | |
my $level = shift; | |
my ($c, @lines) = @_; | |
my $e = $lines[0] || return; | |
my %data; | |
if (eval { $e->isa('Mojo::Exception') }) { | |
$data{trace} = $e->frames; | |
$data{message} = $e->message; | |
} else { | |
$data{message} = join "\n", @lines; | |
} | |
$c->mq->publish("error.$level", json => \%data); | |
} | |
sub _exchange { | |
my $c = shift; | |
my $conn = $c->mq->connection; | |
return $conn->new_channel->exchange( | |
name => 'rd', | |
exchange_type => 'topic', | |
); | |
} | |
sub _extract { | |
my ($c, $msg) = @_; | |
my $body = $msg->body; | |
my $type = $msg->content_type; | |
if ($type =~ m{text/plain}) { | |
return Mojo::Util::decode 'UTF-8', $body; | |
} elsif ($type =~ m{application/json}) { | |
return Mojo::JSON::decode_json $body; | |
} | |
die "Unknown message type $type"; | |
} | |
sub _publish { | |
my ($c, $key, $type, $data, $opts) = @_; | |
$opts ||= {}; | |
my $ex = _exchange($c); | |
my $body; | |
my %props; | |
if ($type eq 'text') { | |
$props{content_type} = 'text/plain;charset=UTF-8'; | |
$body = Mojo::Util::encode 'UTF-8', $data; | |
} elsif ($type eq 'json') { | |
$props{content_type} = 'application/json;charset=UTF-8'; | |
$body = Mojo::JSON::encode_json $data; | |
} else { | |
Carp::croak "Unknown message type $type"; | |
} | |
$ex->publish( | |
routing_key => $key, | |
body => $body, | |
props => \%props, | |
); | |
} | |
1; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment