miyagawa (owner)

Revisions

gist: 216469 Download_button fork
public
Public Clone URL: git://gist.github.com/216469.git
Embed All Files: show embed
anyevent-redis.pl #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/perl
use strict;
use warnings;
 
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
 
my %bulk_command = map { $_ => 1 }
    qw( set setnx rpush lpush lset lrem sadd srem sismember echo );
 
my $txn_cv = AE::cv;
my $r = tcp_connect "127.0.0.1", 6379, sub {
    my $fh = shift
        or die "Can't connect Redis server: $!";
 
    my $hd = AnyEvent::Handle->new(
        fh => $fh,
        on_error => sub { $_[0]->destory },
        on_eof => sub { $_[0]->destroy },
    );
 
    my $cmd = sub {
        $txn_cv->begin;
        my $command = shift;
 
        my $cv_send = sub {
            my $cv = shift;
            $cv->send(@_);
            $txn_cv->end;
        };
 
        my $send;
        if ( defined $bulk_command{$command} ) {
            my $value = pop;
            $value = '' if ! defined $value;
            $send = uc($command)
                  . ' '
                  . join(' ', @_)
                  . ' '
                  . length( $value )
                  . "\r\n$value\r\n";
        } else {
            $send = uc($command)
                  . ' '
                  . join(' ', @_)
                  . "\r\n";
        }
 
        warn $send;
 
        my $cv = AE::cv;
        $hd->push_write($send);
        $hd->push_read(line => sub {
            my($hd, $result) = @_;
            warn "got line <$result> for command [$send]";
            my $type = substr $result, 0, 1;
            $result =~ s/^.//;
 
            if ( $type eq '-' ) {
                $cv_send->($cv, undef, $result);
            } elsif ( $type eq '+' ) {
                $cv_send->($cv, $result);
            } elsif ( $type eq '$' ) {
                $hd->unshift_read(chunk => $result + 2, sub {
                    my($hd, $chunk) = @_;
                    $chunk =~ s/\r\n$//;
                    warn "chunk <$chunk>";
                    $cv_send->($cv, $chunk);
                });
            } elsif ( $type eq '*' ) {
                my $size = $result;
                warn "size is $size";
                my @lines;
                my $multi_cb; $multi_cb = sub {
                    my $hd = shift;
                    $hd->unshift_read(line => sub {
                        my $size = $size; # nested closure!
                        my($hd, $line) = @_;
                        warn "line: <$line>";
                        $line =~ s/^.//;
                        $hd->unshift_read(chunk => $line + 2, sub {
                            my($hd, $chunk) = @_;
                            $chunk =~ s/\r\n$//;
                            warn "chunk <$chunk>";
                            push @lines, $chunk;
                            if (@lines >= $size) {
                                undef $multi_cb;
                                $cv_send->($cv, \@lines);
                            } else {
                                warn "RECURSE";
                                $multi_cb->($hd); # recursive
                            }
                        });
                    });
                };
                $multi_cb->($hd);
            } elsif ( $type eq ':' ) {
                $cv_send->($cv, $result);
            } else {
                $cv_send->($cv, undef, "Unknown type $type");
            }
        });
 
        return $cv;
    };
 
    use XXX;
    $cmd->('set', 'foo', 'bar')->cb(sub { WWW "SET", $_[0]->recv });
    $cmd->('get', 'foo')->cb(sub { WWW "GET", $_[0]->recv });
    $cmd->('lpush', 'fox', 'baz');
    $cmd->('lpush', 'fox', 'bad');
    $cmd->('get', 'fox')->cb(sub { WWW "SMEMBERS", $_[0]->recv });
};
 
$txn_cv->recv;