Skip to content

Instantly share code, notes, and snippets.

@dragoonis dragoonis/KafkaConsumer.php Secret
Last active Nov 21, 2016

Embed
What would you like to do?
<?php
namespace MapleSyrupGroup\Kafka;
use RdKafka\Consumer;
use RdKafka\TopicConf;
use RdKafka\ConsumerTopic;
use RdKafka\Conf as ConsumerConf;
use Closure;
class KafkaConsumer
{
/**
* @var ConsumerTopic
*/
private $topic;
/**
* @var string
*/
private $partition;
/**
* @var Consumer
*/
private $consumer;
/**
* @var array Of ConsumerTopic instances
*/
private $topics = [];
/**
* KafkaConsumer constructor.
* @param array $brokers
*/
public function __construct(array $brokers)
{
$this->initialize($brokers);
}
/**
* @param \Closure $callback
* @param $topic
* @param $partition
* @param int $timeout
* @throws \Exception
*/
public function consume($topic, $partition, Closure $callback, $timeout = 1200000)
{
$topic = $this->getTopic($topic);
$topic->consumeStart($this->partition, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume($partition, $timeout);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$callback($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
$topic->offsetStore($partition, $message->offset);
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
}
/**
* @param array $brokers
*/
private function initialize(array $brokers)
{
$conf = new ConsumerConf();
$conf->set('metadata.broker.list', implode(',', $brokers));
$consumer = new Consumer($conf);
$consumer->setLogLevel(LOG_DEBUG);
$this->setConsumer($consumer);
}
/**
* @return Consumer
*/
private function getConsumer()
{
return $this->consumer;
}
private function setConsumer(Consumer $consumer)
{
$this->consumer = $consumer;
}
/**
* @param string $topic
* @return ConsumerTopic
*/
private function createTopic($topic)
{
if ($this->hasTopic($topic)) {
return $this->getTopic($topic);
}
$topicConf = new TopicConf();
//$topicConf->set("enable.auto.offset.store", "false");
//$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set("auto.commit.interval.ms", 1000);
$topicConf->set("offset.store.sync.interval.ms", 60e3);
$topicConf->set('auto.commit.enable', 'false');
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
return $this->topics[$topic] = $this->getConsumer()->newTopic($topic, $topicConf);
}
/**
* @param $topic
* @return ConsumerTopic
*/
private function getTopic($topic)
{
if ($this->hasTopic($topic)) {
return $this->topics[$topic];
}
return $this->topics[$topic] = $this->createTopic($topic);
}
/**
* @param string $topic
* @return bool
*/
private function hasTopic($topic)
{
return isset($this->topics[$topic]);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.