Last active
November 7, 2016 13:00
-
-
Save jberger/6235eb997b26ab767847ce69d29c9d81 to your computer and use it in GitHub Desktop.
An unfinished Mojo RabbitMQ plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RDMojo::Plugin::MQ; | |
use Mojo::Base 'Mojolicious::Plugin'; | |
use Crixa; | |
use Mojo::Util; | |
use Mojo::JSON; | |
use Carp (); | |
use Scalar::Util; | |
my $host = 'rabbitmq.service.consul'; | |
sub register { | |
my ($plugin, $app, $config) = @_; | |
$app->helper('mq.connection' => \&_connection); | |
$app->helper('mq.consume' => \&_consume); | |
$app->helper('mq.emit_exception' => sub { _emit_level( exception => @_ ) }); | |
$app->helper('mq.extract_data' => \&_extract); | |
$app->helper('mq.publish' => \&_publish); | |
Scalar::Util::weaken $app; | |
$app->log->on(message => sub { | |
my ($log, $level, @lines) = @_; | |
local $ENV{MOJO_LOG_LEVEL} = 'error'; | |
return unless ($log->can("is_$level") || sub { 1 })->(); | |
$app->mq->emit_exception(@lines); | |
}); | |
} | |
sub _connection { | |
my ($c) = @_; | |
state ($pid, $conn); | |
unless ($conn && $pid eq $$) { # fork safety | |
$pid = $$; | |
$conn = Crixa->connect(host => $host); | |
} | |
return $conn; | |
} | |
sub _consume { | |
my ($c, $name, $cb, $opts) = @_; | |
$opts ||= {}; | |
my $keys = $opts->{routing_keys} || [$name]; | |
unless (ref $keys) { | |
$keys = [$keys]; | |
} | |
my $ex = _exchange($c); | |
my $queue = $ex->queue( | |
name => $name, | |
routing_keys => $keys, | |
$opts->{queue} ? %{ $opts->{queue} } : (), | |
); | |
if ($opts->{raw_cb}) { | |
$queue->consume($cb); | |
} else { | |
my $app = $c->app; | |
Scalar::Util::weaken $app; | |
$queue->consume(sub{ | |
my $msg = shift; | |
if ($msg) { | |
local $@; | |
my $data = eval { $app->mq->extract_data($msg) }; | |
my $err = $@; | |
#TODO handle errors from callback | |
$app->$cb($err, $data, $msg); | |
} | |
return 1; | |
}); | |
} | |
#$app->mq->consume(sub { | |
#my ($app, $err, $txt, $msg) = @_; | |
#$app->external->send_sms({ | |
#sms_to => '8477028856', | |
#message => $txt, | |
#}); | |
#}); | |
} | |
sub _emit_level { | |
my $level = shift; | |
my ($c, @lines) = @_; | |
my $e = $lines[0] || return; | |
my %data; | |
if (eval { $e->isa('Mojo::Exception') }) { | |
$data{trace} = $e->frames; | |
$data{message} = $e->message; | |
} else { | |
$data{message} = join "\n", @lines; | |
} | |
$c->mq->publish("error.$level", json => \%data); | |
} | |
sub _exchange { | |
my $c = shift; | |
my $conn = $c->mq->connection; | |
return $conn->new_channel->exchange( | |
name => 'rd', | |
exchange_type => 'topic', | |
); | |
} | |
sub _extract { | |
my ($c, $msg) = @_; | |
my $body = $msg->body; | |
my $type = $msg->content_type; | |
if ($type =~ m{text/plain}) { | |
return Mojo::Util::decode 'UTF-8', $body; | |
} elsif ($type =~ m{application/json}) { | |
return Mojo::JSON::decode_json $body; | |
} | |
die "Unknown message type $type"; | |
} | |
sub _publish { | |
my ($c, $key, $type, $data, $opts) = @_; | |
$opts ||= {}; | |
my $ex = _exchange($c); | |
my $body; | |
my %props; | |
if ($type eq 'text') { | |
$props{content_type} = 'text/plain;charset=UTF-8'; | |
$body = Mojo::Util::encode 'UTF-8', $data; | |
} elsif ($type eq 'json') { | |
$props{content_type} = 'application/json;charset=UTF-8'; | |
$body = Mojo::JSON::encode_json $data; | |
} else { | |
Carp::croak "Unknown message type $type"; | |
} | |
$ex->publish( | |
routing_key => $key, | |
body => $body, | |
props => \%props, | |
); | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment