/beanstalk.pm6 Secret
Created
June 28, 2016 14:19
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Queue::Beanstalk { | |
subset Name of Str where m/^<[A..Za..z0..9+/;.$_()]><[A..Za..z0..9+/;.$_()-]>*$/; | |
subset Priority of Int where 0 <= * < 2 ** 32; | |
subset Delay of Int where * >= 0; | |
subset TTR of Int where * > 0; | |
has Str $.host where *.chars > 0 = '127.0.0.1'; | |
has Int $.port where 0 < * < 65536 = 11300; | |
has IO::Socket::INET $!socket; | |
has Bool $!connected = False; | |
has Name @!watchlist = <default>; | |
has Priority $.default-priority = 10_000; | |
has Delay $.default-delay = 0; | |
has TTR $.default-ttr = 120; | |
my class Job { | |
has Int $.id; | |
has Blob $.body; | |
has Queue::Beanstalk $.beanstalk; | |
method stats { | |
return state %stats = $.beanstalk.stats-job($.id); | |
} | |
method delete { | |
return $.beanstalk.delete($.id); | |
} | |
method release { | |
return $.beanstalk.release($.id); | |
} | |
method bury { | |
return $.beanstalk.bury($.id); | |
} | |
} | |
method !connection returns IO::Socket::INET:D { | |
if !$!connected { | |
$!socket = IO::Socket::INET.new(:host($.host), :port($.port)); | |
$!connected = True; | |
} | |
return $!socket; | |
} | |
method !disconnect { | |
if $!connected { | |
$!socket.close; | |
$!socket = IO::Socket::INET:U; | |
$!connected = False; | |
} | |
} | |
method !send-command(*@words where *.elems > 0) { | |
self!connection.print(@words.join(' ') ~ "\r\n"); | |
} | |
method !read-response { | |
return self!connection.get.split(' '); | |
} | |
method watch(Name $tube-name) { | |
self!send-command('watch', $tube-name); | |
my ($code, $count) = self!read-response; | |
fail $code if $code ne 'WATCHING'; | |
@!watchlist = (@!watchlist (+) $tube-name).keys; | |
} | |
method ignore(Name $tube-name) { | |
self!send-command('ignore', $tube-name); | |
my ($code, $count) = self!read-response; | |
fail $code if $code ne 'WATCHING'; | |
@!watchlist = (@!watchlist (-) $tube-name).keys; | |
} | |
method watch-only(*@new-watchlist where *.elems > 0) { | |
self.watch($_) for (@new-watchlist (-) @!watchlist).keys; | |
self.ignore($_) for (@!watchlist (-) @new-watchlist).keys; | |
} | |
method use(Name $tube-name) { | |
self!send-command('use', $tube-name); | |
my ($code) = self!read-response; | |
fail $code if $code ne 'USING'; | |
} | |
method put( | |
Blob $data, | |
Priority :$priority = $.default-priority, | |
Delay :$delay = $.default-delay, | |
TTR :$ttr = $.default-ttr | |
) { | |
self!connection.print("put $priority $delay $ttr {$data.elems}\r\n$data\r\n"); | |
my ($response, $id) = self!read-response; | |
if $response eq 'INSERTED' { | |
return val($id); | |
} | |
else { | |
fail $response; | |
} | |
} | |
method reserve-with-timeout(Int $timeout where * >= 0) { | |
self!connection.print("reserve-with-timeout $timeout\r\n"); | |
self!read-job; | |
} | |
method reserve { | |
self!connection.print("reserve\r\n"); | |
self!read-job; | |
} | |
method !read-job returns Job { | |
my ($response, $id, $bytes) = self!read-response; | |
given $response { | |
when 'RESERVED' { | |
my $body = self!connection.read($bytes); | |
my $job = Job.new(id => val($id).Int, body => $body, beanstalk => self); | |
self!connection.read(2); # Discard trailing \r\n | |
return $job; | |
} | |
when 'TIMED_OUT' { | |
return Job:U; | |
} | |
default { | |
fail "Unexpected: $response"; | |
} | |
} | |
} | |
method delete(Int:D $id) { | |
self!send-command('delete', $id); | |
self!read-response; # TODO check result | |
} | |
method release( | |
Int:D $id, | |
Priority :$priority = $.default-priority, | |
Delay :$delay = $.default-delay | |
) { | |
self!send-command('release', $id, $priority, $delay); | |
my ($code) = self!read-response; | |
fail $code if $code ne 'RELEASED'; | |
} | |
method bury(Int:D $id, Priority :$priority = 10_000) { | |
self!send-command('bury', $id, $priority); | |
my ($code) = self!read-response; # TODO check | |
} | |
method touch(Int:D $id) { | |
self!send-command('touch', $id); | |
self!read-response; | |
} | |
method kick(Int:D $bound where * > 0) { | |
self!send-command('kick', $bound); | |
self!read-response; | |
} | |
method kick-job(Int:D $id) { | |
self!send-command('kick-job', $id); | |
self!read-response; | |
} | |
method !dirty-yaml(Buf $yaml) returns Hash { | |
return $yaml.decode.lines.grep({ m/\:/ })>>.split(': ', 2).flat.Hash; | |
} | |
method !read-stats returns Hash { | |
my ($code, $bytes) = self!read-response; | |
if $code eq 'OK' { | |
return self!dirty-yaml(self!connection.read($bytes + 2)); | |
} | |
else { | |
fail $code; | |
} | |
} | |
method stats-job(Int:D $id) returns Hash { | |
self!send-command('stats-job', $id); | |
return self!read-stats; | |
} | |
method stats-tube(Name $tube-name) { | |
self!send-command('stats-tube', $tube-name); | |
return self!read-stats; | |
} | |
method stats { | |
self!send-command('stats'); | |
return self!read-stats; | |
} | |
method pause-tube(Name $tube-name, Delay $delay) { | |
self!send-command('pause-tube', $tube-name, $delay); | |
self!read-response; | |
} | |
method quit { | |
self!disconnect; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment