#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
my %bulk_command = map { $_ => 1 }
qw( set setnx rpush lpush lset lrem sadd srem sismember echo );
my $txn_cv = AE::cv;
my $r = tcp_connect "127.0.0.1", 6379, sub {
my $fh = shift
or die "Can't connect Redis server: $!";
my $hd = AnyEvent::Handle->new(
fh => $fh,
on_error => sub { $_[0]->destory },
on_eof => sub { $_[0]->destroy },
);
my $cmd = sub {
$txn_cv->begin;
my $command = shift;
my $cv_send = sub {
my $cv = shift;
$cv->send(@_);
$txn_cv->end;
};
my $send;
if ( defined $bulk_command{$command} ) {
my $value = pop;
$value = '' if ! defined $value;
$send = uc($command)
. ' '
. join(' ', @_)
. ' '
. length( $value )
. "\r\n$value\r\n";
} else {
$send = uc($command)
. ' '
. join(' ', @_)
. "\r\n";
}
warn $send;
my $cv = AE::cv;
$hd->push_write($send);
$hd->push_read(line => sub {
my($hd, $result) = @_;
warn "got line <$result> for command [$send]";
my $type = substr $result, 0, 1;
$result =~ s/^.//;
if ( $type eq '-' ) {
$cv_send->($cv, undef, $result);
} elsif ( $type eq '+' ) {
$cv_send->($cv, $result);
} elsif ( $type eq '$' ) {
$hd->unshift_read(chunk => $result + 2, sub {
my($hd, $chunk) = @_;
$chunk =~ s/\r\n$//;
warn "chunk <$chunk>";
$cv_send->($cv, $chunk);
});
} elsif ( $type eq '*' ) {
my $size = $result;
warn "size is $size";
my @lines;
my $multi_cb; $multi_cb = sub {
my $hd = shift;
$hd->unshift_read(line => sub {
my $size = $size; # nested closure!
my($hd, $line) = @_;
warn "line: <$line>";
$line =~ s/^.//;
$hd->unshift_read(chunk => $line + 2, sub {
my($hd, $chunk) = @_;
$chunk =~ s/\r\n$//;
warn "chunk <$chunk>";
push @lines, $chunk;
if (@lines >= $size) {
undef $multi_cb;
$cv_send->($cv, \@lines);
} else {
warn "RECURSE";
$multi_cb->($hd); # recursive
}
});
});
};
$multi_cb->($hd);
} elsif ( $type eq ':' ) {
$cv_send->($cv, $result);
} else {
$cv_send->($cv, undef, "Unknown type $type");
}
});
return $cv;
};
use XXX;
$cmd->('set', 'foo', 'bar')->cb(sub { WWW "SET", $_[0]->recv });
$cmd->('get', 'foo')->cb(sub { WWW "GET", $_[0]->recv });
$cmd->('lpush', 'fox', 'baz');
$cmd->('lpush', 'fox', 'bad');
$cmd->('get', 'fox')->cb(sub { WWW "SMEMBERS", $_[0]->recv });
};
$txn_cv->recv;