Skip to content

Instantly share code, notes, and snippets.

@michael-grunder
Created June 20, 2018 00:38
Show Gist options
  • Save michael-grunder/910b4a7328f55750d023fc928179494e to your computer and use it in GitHub Desktop.
Save michael-grunder/910b4a7328f55750d023fc928179494e to your computer and use it in GitHub Desktop.
Example producer and consumer for the new Redis 5.0 stream API
<?php
$opt = getopt('', ['host:', 'port:', 'skey:', 'group:', 'consumer:', 'sleep:']);
$host = isset($opt['host']) ? $opt['host'] : 'localhost';
$port = isset($opt['port']) ? $opt['port'] : '6379';
$skey = isset($opt['skey']) ? $opt['skey'] : 'stream';
$group = isset($opt['group']) ? $opt['group'] : 'group';
$consumer = isset($opt['consumer']) ? $opt['consumer'] : 'Alice';
$sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000;
$obj_r = new Redis();
$obj_r->connect($host, $port);
if (!$obj_r->isConnected()) {
fprintf(stderr, "Error: Cannot connect to redis at $host:$port\n");
exit(-1);
}
/* Make sure phpredis has stream support */
if (!method_exists($obj_r, 'xAdd')) {
fprintf(STDERR, "Error: You have to run a phpredis version with stream support\n");
exit(-1);
}
/* Finally make sure Redis has stream support */
if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) {
fprintf(STDERR, "Error: Must connect to a redis-server with stream support!\n");
exit(-1);
}
$obj_r->xGroup('CREATE', $skey, $group, '0');
echo "Consuming messages from '$skey' with (group: $group, consumer: $consumer)\n";
$prefix = "[$group::$consumer] ";
while (true) {
try {
$ids = $obj_r->xReadGroup($group, $consumer, [$skey => '>']);
if ($ids) {
/* We're just looking at one stream */
$ids = $ids[$skey];
foreach ($ids as $id => $payload) {
$msg = "id: " . $payload['id'] . ", payload: '" . $payload['payload'] . "'";
echo "$prefix: $id => [$msg]\n";
}
$obj_r->xAck($skey, $group, array_keys($ids));
}
usleep($sleep);
} catch(Exception $ex) {
echo "Exception: " . $ex->getMessage() . "\n";
}
}
<?php
$opt = getopt('', ['host:', 'port:', 'skey:', 'sleep:']);
$host = isset($opt['host']) ? $opt['host'] : 'localhost';
$port = isset($opt['port']) ? $opt['port'] : '6379';
$skey = isset($opt['skey']) ? $opt['skey'] : 'stream';
$sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000;
$obj_r = new Redis();
$obj_r->connect($host, $port);
if (!$obj_r->isConnected()) {
fprintf(STDERR, "Error: Cannot connect to redis at $host:$port\n");
exit(-1);
}
/* Make sure phpredis has stream support */
if (!method_exists($obj_r, 'xAdd')) {
fprintf(STDERR, "Error: You have to run a phpredis version with stream support\n");
exit(-1);
}
/* Finally make sure Redis has stream support */
if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) {
fprintf(STDERR, "Error: Must connect to a redis-server with stream support!\n");
exit(-1);
}
/* Let the user know where we'll be sending data */
echo "Producing contrived data to key: '$skey'\n";
/* Keep track of messages produced */
$n = 0;
/* Just loop forever */
while (true) {
$message = [
'id' => $obj_r->incr('__stream_next_id'),
'payload' => 'payload:' . uniqid()
];
$obj_r->xAdd($skey, '*', $message);
if (++$n % 1000 == 0) {
echo "Added " . number_format($n) . " messages to stream '$skey' so far\n";
}
/* Don't totally spam CPU */
usleep($sleep);
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment