Skip to content

Instantly share code, notes, and snippets.

@CurtTilmes
Last active April 11, 2019 17:27
Show Gist options
  • Save CurtTilmes/0cbc7e821fc172aa9953e0d44794fe15 to your computer and use it in GitHub Desktop.
Save CurtTilmes/0cbc7e821fc172aa9953e0d44794fe15 to your computer and use it in GitHub Desktop.
Playing with Redis Streams
use Redis::Async;
my $r = Redis::Async.new("localhost:6379", :0timeout);
sub xread(*@streams, Int :$timeout = 0, Bool :$add-stream, Bool :$add-id)
{
supply {
my %streams = @streams.map: * => '$';
loop {
with $r.XREAD('block', $timeout, 'streams',
%streams.keys, %streams.values)
{
for $_ -> @ ($stream, @messages)
{
for @messages -> @ ($id, @fields)
{
%streams{$stream} = $id;
my $obj = %( @fields );
$obj<_stream> = $stream if $add-stream;
$obj<_id> = $id if $add-id;
emit $obj;
}
}
}
else
{
warn "Timeout!";
last
}
}
}
}
react
{
whenever xread(<mystream1 mystream2>, :add-stream)
{
.say
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment