-
-
Save dragoonis/b7dfeef0b223d4879a48b53a35b0f6c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace MapleSyrupGroup\Kafka; | |
use RdKafka\Consumer; | |
use RdKafka\TopicConf; | |
use RdKafka\ConsumerTopic; | |
use RdKafka\Conf as ConsumerConf; | |
use Closure; | |
class KafkaConsumer | |
{ | |
/** | |
* @var ConsumerTopic | |
*/ | |
private $topic; | |
/** | |
* @var string | |
*/ | |
private $partition; | |
/** | |
* @var Consumer | |
*/ | |
private $consumer; | |
/** | |
* @var array Of ConsumerTopic instances | |
*/ | |
private $topics = []; | |
/** | |
* KafkaConsumer constructor. | |
* @param array $brokers | |
*/ | |
public function __construct(array $brokers) | |
{ | |
$this->initialize($brokers); | |
} | |
/** | |
* @param \Closure $callback | |
* @param $topic | |
* @param $partition | |
* @param int $timeout | |
* @throws \Exception | |
*/ | |
public function consume($topic, $partition, Closure $callback, $timeout = 1200000) | |
{ | |
$topic = $this->getTopic($topic); | |
$topic->consumeStart($this->partition, RD_KAFKA_OFFSET_STORED); | |
while (true) { | |
$message = $topic->consume($partition, $timeout); | |
switch ($message->err) { | |
case RD_KAFKA_RESP_ERR_NO_ERROR: | |
$callback($message); | |
break; | |
case RD_KAFKA_RESP_ERR__PARTITION_EOF: | |
echo "No more messages; will wait for more\n"; | |
$topic->offsetStore($partition, $message->offset); | |
break; | |
case RD_KAFKA_RESP_ERR__TIMED_OUT: | |
echo "Timed out\n"; | |
break; | |
default: | |
throw new \Exception($message->errstr(), $message->err); | |
} | |
} | |
} | |
/** | |
* @param array $brokers | |
*/ | |
private function initialize(array $brokers) | |
{ | |
$conf = new ConsumerConf(); | |
$conf->set('metadata.broker.list', implode(',', $brokers)); | |
$consumer = new Consumer($conf); | |
$consumer->setLogLevel(LOG_DEBUG); | |
$this->setConsumer($consumer); | |
} | |
/** | |
* @return Consumer | |
*/ | |
private function getConsumer() | |
{ | |
return $this->consumer; | |
} | |
private function setConsumer(Consumer $consumer) | |
{ | |
$this->consumer = $consumer; | |
} | |
/** | |
* @param string $topic | |
* @return ConsumerTopic | |
*/ | |
private function createTopic($topic) | |
{ | |
if ($this->hasTopic($topic)) { | |
return $this->getTopic($topic); | |
} | |
$topicConf = new TopicConf(); | |
//$topicConf->set("enable.auto.offset.store", "false"); | |
//$topicConf->set('auto.offset.reset', 'smallest'); | |
$topicConf->set("auto.commit.interval.ms", 1000); | |
$topicConf->set("offset.store.sync.interval.ms", 60e3); | |
$topicConf->set('auto.commit.enable', 'false'); | |
$topicConf->set('offset.store.method', 'file'); | |
$topicConf->set('offset.store.path', sys_get_temp_dir()); | |
return $this->topics[$topic] = $this->getConsumer()->newTopic($topic, $topicConf); | |
} | |
/** | |
* @param $topic | |
* @return ConsumerTopic | |
*/ | |
private function getTopic($topic) | |
{ | |
if ($this->hasTopic($topic)) { | |
return $this->topics[$topic]; | |
} | |
return $this->topics[$topic] = $this->createTopic($topic); | |
} | |
/** | |
* @param string $topic | |
* @return bool | |
*/ | |
private function hasTopic($topic) | |
{ | |
return isset($this->topics[$topic]); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment