Skip to content

Instantly share code, notes, and snippets.

@jkramer
Created June 28, 2016 14:19
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save jkramer/98e9ac48cb04a98410b7b914da847344 to your computer and use it in GitHub Desktop.
class Queue::Beanstalk {
subset Name of Str where m/^<[A..Za..z0..9+/;.$_()]><[A..Za..z0..9+/;.$_()-]>*$/;
subset Priority of Int where 0 <= * < 2 ** 32;
subset Delay of Int where * >= 0;
subset TTR of Int where * > 0;
has Str $.host where *.chars > 0 = '127.0.0.1';
has Int $.port where 0 < * < 65536 = 11300;
has IO::Socket::INET $!socket;
has Bool $!connected = False;
has Name @!watchlist = <default>;
has Priority $.default-priority = 10_000;
has Delay $.default-delay = 0;
has TTR $.default-ttr = 120;
my class Job {
has Int $.id;
has Blob $.body;
has Queue::Beanstalk $.beanstalk;
method stats {
return state %stats = $.beanstalk.stats-job($.id);
}
method delete {
return $.beanstalk.delete($.id);
}
method release {
return $.beanstalk.release($.id);
}
method bury {
return $.beanstalk.bury($.id);
}
}
method !connection returns IO::Socket::INET:D {
if !$!connected {
$!socket = IO::Socket::INET.new(:host($.host), :port($.port));
$!connected = True;
}
return $!socket;
}
method !disconnect {
if $!connected {
$!socket.close;
$!socket = IO::Socket::INET:U;
$!connected = False;
}
}
method !send-command(*@words where *.elems > 0) {
self!connection.print(@words.join(' ') ~ "\r\n");
}
method !read-response {
return self!connection.get.split(' ');
}
method watch(Name $tube-name) {
self!send-command('watch', $tube-name);
my ($code, $count) = self!read-response;
fail $code if $code ne 'WATCHING';
@!watchlist = (@!watchlist (+) $tube-name).keys;
}
method ignore(Name $tube-name) {
self!send-command('ignore', $tube-name);
my ($code, $count) = self!read-response;
fail $code if $code ne 'WATCHING';
@!watchlist = (@!watchlist (-) $tube-name).keys;
}
method watch-only(*@new-watchlist where *.elems > 0) {
self.watch($_) for (@new-watchlist (-) @!watchlist).keys;
self.ignore($_) for (@!watchlist (-) @new-watchlist).keys;
}
method use(Name $tube-name) {
self!send-command('use', $tube-name);
my ($code) = self!read-response;
fail $code if $code ne 'USING';
}
method put(
Blob $data,
Priority :$priority = $.default-priority,
Delay :$delay = $.default-delay,
TTR :$ttr = $.default-ttr
) {
self!connection.print("put $priority $delay $ttr {$data.elems}\r\n$data\r\n");
my ($response, $id) = self!read-response;
if $response eq 'INSERTED' {
return val($id);
}
else {
fail $response;
}
}
method reserve-with-timeout(Int $timeout where * >= 0) {
self!connection.print("reserve-with-timeout $timeout\r\n");
self!read-job;
}
method reserve {
self!connection.print("reserve\r\n");
self!read-job;
}
method !read-job returns Job {
my ($response, $id, $bytes) = self!read-response;
given $response {
when 'RESERVED' {
my $body = self!connection.read($bytes);
my $job = Job.new(id => val($id).Int, body => $body, beanstalk => self);
self!connection.read(2); # Discard trailing \r\n
return $job;
}
when 'TIMED_OUT' {
return Job:U;
}
default {
fail "Unexpected: $response";
}
}
}
method delete(Int:D $id) {
self!send-command('delete', $id);
self!read-response; # TODO check result
}
method release(
Int:D $id,
Priority :$priority = $.default-priority,
Delay :$delay = $.default-delay
) {
self!send-command('release', $id, $priority, $delay);
my ($code) = self!read-response;
fail $code if $code ne 'RELEASED';
}
method bury(Int:D $id, Priority :$priority = 10_000) {
self!send-command('bury', $id, $priority);
my ($code) = self!read-response; # TODO check
}
method touch(Int:D $id) {
self!send-command('touch', $id);
self!read-response;
}
method kick(Int:D $bound where * > 0) {
self!send-command('kick', $bound);
self!read-response;
}
method kick-job(Int:D $id) {
self!send-command('kick-job', $id);
self!read-response;
}
method !dirty-yaml(Buf $yaml) returns Hash {
return $yaml.decode.lines.grep({ m/\:/ })>>.split(': ', 2).flat.Hash;
}
method !read-stats returns Hash {
my ($code, $bytes) = self!read-response;
if $code eq 'OK' {
return self!dirty-yaml(self!connection.read($bytes + 2));
}
else {
fail $code;
}
}
method stats-job(Int:D $id) returns Hash {
self!send-command('stats-job', $id);
return self!read-stats;
}
method stats-tube(Name $tube-name) {
self!send-command('stats-tube', $tube-name);
return self!read-stats;
}
method stats {
self!send-command('stats');
return self!read-stats;
}
method pause-tube(Name $tube-name, Delay $delay) {
self!send-command('pause-tube', $tube-name, $delay);
self!read-response;
}
method quit {
self!disconnect;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment