Skip to content

Instantly share code, notes, and snippets.

@jberger
Last active November 7, 2016 13:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jberger/6235eb997b26ab767847ce69d29c9d81 to your computer and use it in GitHub Desktop.
Save jberger/6235eb997b26ab767847ce69d29c9d81 to your computer and use it in GitHub Desktop.
An unfinished Mojo RabbitMQ plugin
package RDMojo::Plugin::MQ;
use Mojo::Base 'Mojolicious::Plugin';
use Crixa;
use Mojo::Util;
use Mojo::JSON;
use Carp ();
use Scalar::Util;
my $host = 'rabbitmq.service.consul';
sub register {
my ($plugin, $app, $config) = @_;
$app->helper('mq.connection' => \&_connection);
$app->helper('mq.consume' => \&_consume);
$app->helper('mq.emit_exception' => sub { _emit_level( exception => @_ ) });
$app->helper('mq.extract_data' => \&_extract);
$app->helper('mq.publish' => \&_publish);
Scalar::Util::weaken $app;
$app->log->on(message => sub {
my ($log, $level, @lines) = @_;
local $ENV{MOJO_LOG_LEVEL} = 'error';
return unless ($log->can("is_$level") || sub { 1 })->();
$app->mq->emit_exception(@lines);
});
}
sub _connection {
my ($c) = @_;
state ($pid, $conn);
unless ($conn && $pid eq $$) { # fork safety
$pid = $$;
$conn = Crixa->connect(host => $host);
}
return $conn;
}
sub _consume {
my ($c, $name, $cb, $opts) = @_;
$opts ||= {};
my $keys = $opts->{routing_keys} || [$name];
unless (ref $keys) {
$keys = [$keys];
}
my $ex = _exchange($c);
my $queue = $ex->queue(
name => $name,
routing_keys => $keys,
$opts->{queue} ? %{ $opts->{queue} } : (),
);
if ($opts->{raw_cb}) {
$queue->consume($cb);
} else {
my $app = $c->app;
Scalar::Util::weaken $app;
$queue->consume(sub{
my $msg = shift;
if ($msg) {
local $@;
my $data = eval { $app->mq->extract_data($msg) };
my $err = $@;
#TODO handle errors from callback
$app->$cb($err, $data, $msg);
}
return 1;
});
}
#$app->mq->consume(sub {
#my ($app, $err, $txt, $msg) = @_;
#$app->external->send_sms({
#sms_to => '8477028856',
#message => $txt,
#});
#});
}
sub _emit_level {
my $level = shift;
my ($c, @lines) = @_;
my $e = $lines[0] || return;
my %data;
if (eval { $e->isa('Mojo::Exception') }) {
$data{trace} = $e->frames;
$data{message} = $e->message;
} else {
$data{message} = join "\n", @lines;
}
$c->mq->publish("error.$level", json => \%data);
}
sub _exchange {
my $c = shift;
my $conn = $c->mq->connection;
return $conn->new_channel->exchange(
name => 'rd',
exchange_type => 'topic',
);
}
sub _extract {
my ($c, $msg) = @_;
my $body = $msg->body;
my $type = $msg->content_type;
if ($type =~ m{text/plain}) {
return Mojo::Util::decode 'UTF-8', $body;
} elsif ($type =~ m{application/json}) {
return Mojo::JSON::decode_json $body;
}
die "Unknown message type $type";
}
sub _publish {
my ($c, $key, $type, $data, $opts) = @_;
$opts ||= {};
my $ex = _exchange($c);
my $body;
my %props;
if ($type eq 'text') {
$props{content_type} = 'text/plain;charset=UTF-8';
$body = Mojo::Util::encode 'UTF-8', $data;
} elsif ($type eq 'json') {
$props{content_type} = 'application/json;charset=UTF-8';
$body = Mojo::JSON::encode_json $data;
} else {
Carp::croak "Unknown message type $type";
}
$ex->publish(
routing_key => $key,
body => $body,
props => \%props,
);
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment