Skip to content

Instantly share code, notes, and snippets.

@splitice
Created January 19, 2014 12:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save splitice/8504162 to your computer and use it in GitHub Desktop.
Save splitice/8504162 to your computer and use it in GitHub Desktop.
unix fifo pipe -> redis
<?php
$fifo = '/rsyslog/pipe';
$max_length = 1024*8;
$redis_list_name = 'log_entries';
$redis_host = '127.0.0.1';
$redis_port = 6379;
$multi_bulk = 25;
$redis = new Redis();
$redis->connect($redis_host, $redis_port);
$fh = fopen($fifo, "rt");
$count = 0;
$last_time = time();
$buffer = array($redis_list_name);
do{
$data = fgets($fh, $max_length);
if($data === '')
continue;
//a character at the start of message, ensures no overflows or issues.
if($data{0} != '~')
continue;
$usleep_time = 0;
$count++;
if(($count % 1000) == 0){
$now = time();
if($last_time < ($now - 10)){
echo "Done $count in ",$now - $last_time," seconds\r\n";
$count = 0;
$last_time = $now;
}
}
$buffer[] = $data;
if(($count % $multi_bulk) == 0){
do {
try {
$status = call_user_func_array(array($redis, 'lPush'), $buffer);
}catch(\RedisException $rex){
//Delay on error
if($usleep_time && $status === false){
echo "Sleeping for ", $usleep_time/10,"s";
usleep($usleep_time);
$redis->connect($redis_host, $redis_port);
}
$usleep_time += 1000;
$status = false;
}
}while($status === false);
$buffer = array($redis_list_name);
}
}while($data !== false);
fclose($fh);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment