Last active
April 11, 2019 17:27
-
-
Save CurtTilmes/0cbc7e821fc172aa9953e0d44794fe15 to your computer and use it in GitHub Desktop.
Playing with Redis Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use Redis::Async; | |
my $r = Redis::Async.new("localhost:6379", :0timeout); | |
sub xread(*@streams, Int :$timeout = 0, Bool :$add-stream, Bool :$add-id) | |
{ | |
supply { | |
my %streams = @streams.map: * => '$'; | |
loop { | |
with $r.XREAD('block', $timeout, 'streams', | |
%streams.keys, %streams.values) | |
{ | |
for $_ -> @ ($stream, @messages) | |
{ | |
for @messages -> @ ($id, @fields) | |
{ | |
%streams{$stream} = $id; | |
my $obj = %( @fields ); | |
$obj<_stream> = $stream if $add-stream; | |
$obj<_id> = $id if $add-id; | |
emit $obj; | |
} | |
} | |
} | |
else | |
{ | |
warn "Timeout!"; | |
last | |
} | |
} | |
} | |
} | |
react | |
{ | |
whenever xread(<mystream1 mystream2>, :add-stream) | |
{ | |
.say | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment