Skip to content

Instantly share code, notes, and snippets.

@chiral
Created October 8, 2011 19:09
Show Gist options
  • Save chiral/1272719 to your computer and use it in GitHub Desktop.
Save chiral/1272719 to your computer and use it in GitHub Desktop.
Producer-Consumer impl on ZooKeeper
package zk_queue;
use Net::ZooKeeper qw(:node_flags :acls);
sub die1 {
die("failed to create node: ".$_[0]."\n");
}
sub die2 {
die("failed to delete node: ".$_[0]."\n");
}
sub die3 {
die("timeout\n");
}
sub new {
my $class = shift;
my $zk = shift;
my $name = shift;
unless ($zk->exists($name)) {
$zk->create($name,0,
'acl'=>ZOO_OPEN_ACL_UNSAFE) or die1($name);
}
my $self = {zk=>$zk,root=>$name};
return bless $self,$class;
}
sub produce {
my $self = shift;
my $val = shift;
my $zk = $self->{zk};
my $node = $self->{root}.'/element';
$zk->create($node,$val,
'flags'=>ZOO_SEQUENCE,
'acl'=>ZOO_OPEN_ACL_UNSAFE) or die1($node);
}
sub consume {
my $self = shift;
my $zk = $self->{zk};
my $root = $self->{root};
my $watch = $zk->watch('timeout'=>10000);
while (1) {
my @cs = $zk->get_children($root,'watch'=>$watch);
unless (@cs) {
next if ($watch->wait());
die3();
}
my $min=substr($cs[0],7);
foreach my $path (@cs) {
my $v=substr($path,7);
$min=$v if ($min>$v);
}
my $node = $root.'/element'.$min;
my $data = $zk->get($node);
return $data if ($zk->delete($node));
}
}
sub test($$$) {
my $zk = Net::ZooKeeper->new($_[0]);
my $q = zk_queue->new($zk,'/app1');
my $max = $_[1];
if ($_[2] eq 'p') {
foreach (1..$max) {
$q->produce(10+$_);
}
} else {
foreach (1..$max) {
my $r = $q->consume();
print "Item: $r\n";
}
}
}
1;
__END__
=pod
=head2 シェルからのテスト方法
zookeeperサーバが192.168.164.2:2181で動いてるとして:
=item * 10個キューに入れる
# perl -e 'use zk_queue; zk_queue::test("192.168.164.2:2181",10,'p');'
=item * 7個キューから取り出す
# perl -e 'use zk_queue; zk_queue::test("192.168.164.2:2181",7,'c');'
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment