Created
October 8, 2011 19:09
-
-
Save chiral/1272719 to your computer and use it in GitHub Desktop.
Producer-Consumer impl on ZooKeeper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zk_queue; | |
use Net::ZooKeeper qw(:node_flags :acls); | |
sub die1 { | |
die("failed to create node: ".$_[0]."\n"); | |
} | |
sub die2 { | |
die("failed to delete node: ".$_[0]."\n"); | |
} | |
sub die3 { | |
die("timeout\n"); | |
} | |
sub new { | |
my $class = shift; | |
my $zk = shift; | |
my $name = shift; | |
unless ($zk->exists($name)) { | |
$zk->create($name,0, | |
'acl'=>ZOO_OPEN_ACL_UNSAFE) or die1($name); | |
} | |
my $self = {zk=>$zk,root=>$name}; | |
return bless $self,$class; | |
} | |
sub produce { | |
my $self = shift; | |
my $val = shift; | |
my $zk = $self->{zk}; | |
my $node = $self->{root}.'/element'; | |
$zk->create($node,$val, | |
'flags'=>ZOO_SEQUENCE, | |
'acl'=>ZOO_OPEN_ACL_UNSAFE) or die1($node); | |
} | |
sub consume { | |
my $self = shift; | |
my $zk = $self->{zk}; | |
my $root = $self->{root}; | |
my $watch = $zk->watch('timeout'=>10000); | |
while (1) { | |
my @cs = $zk->get_children($root,'watch'=>$watch); | |
unless (@cs) { | |
next if ($watch->wait()); | |
die3(); | |
} | |
my $min=substr($cs[0],7); | |
foreach my $path (@cs) { | |
my $v=substr($path,7); | |
$min=$v if ($min>$v); | |
} | |
my $node = $root.'/element'.$min; | |
my $data = $zk->get($node); | |
return $data if ($zk->delete($node)); | |
} | |
} | |
sub test($$$) { | |
my $zk = Net::ZooKeeper->new($_[0]); | |
my $q = zk_queue->new($zk,'/app1'); | |
my $max = $_[1]; | |
if ($_[2] eq 'p') { | |
foreach (1..$max) { | |
$q->produce(10+$_); | |
} | |
} else { | |
foreach (1..$max) { | |
my $r = $q->consume(); | |
print "Item: $r\n"; | |
} | |
} | |
} | |
1; | |
__END__ | |
=pod | |
=head2 シェルからのテスト方法 | |
zookeeperサーバが192.168.164.2:2181で動いてるとして: | |
=item * 10個キューに入れる | |
# perl -e 'use zk_queue; zk_queue::test("192.168.164.2:2181",10,'p');' | |
=item * 7個キューから取り出す | |
# perl -e 'use zk_queue; zk_queue::test("192.168.164.2:2181",7,'c');' | |
=cut |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment