Skip to content

Instantly share code, notes, and snippets.

@monken
Created December 29, 2011 16:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save monken/1534750 to your computer and use it in GitHub Desktop.
Save monken/1534750 to your computer and use it in GitHub Desktop.
MyApp::Model
package MyApp::Model;
use Moose;
use JSON::XS;
use AnyEvent::RabbitMQ;
use MooseX::ClassAttribute;
class_has queue => ( is => 'ro', builder => '_build_queue' );
has _channel => ( is => 'rw' );
sub _build_queue {
my $self = shift;
my $cv = AE::cv;
my $ar = AnyEvent::RabbitMQ->new->load_xml_spec->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
on_success => sub {
shift->open_channel(
on_success => sub {
shift->declare_queue(
queue => 'myapp.jobs',
durable => 1,
on_success => $cv
);
},
);
},
);
$cv->recv;
return $ar;
}
sub channel {
my ( $self, $cb ) = @_;
return $cb->( $self->_channel ) if ( $self->_channel );
$self->queue->open_channel(
on_success => sub { $self->_channel(@_); $cb->(@_) } );
}
sub publish {
my ( $self, $body ) = @_;
$self->channel(
sub {
shift->publish(
routing_key => 'myapp.jobs',
body => encode_json($body),
);
}
);
}
sub new_queue {
my $self = shift;
my $cv = AE::cv;
my $queue = $self->channel(
sub { shift->declare_queue( durable => 1, on_success => $cv ) } );
return $cv;
}
sub consume {
my ( $self, $queue ) = @_;
my $cv = AE::cv;
my $res = $self->channel(
sub {
shift->consume(
queue => $queue,
on_consume => $cv,
no_ack => 1,
);
}
);
return $cv;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment