Skip to content

Instantly share code, notes, and snippets.

@ssi-anik
Forked from srigi/ConsumerCommand.php
Created September 14, 2019 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ssi-anik/603a5b8f7258db3d3e2631dd10299ff2 to your computer and use it in GitHub Desktop.
Save ssi-anik/603a5b8f7258db3d3e2631dd10299ff2 to your computer and use it in GitHub Desktop.
PHP rdkafka
{
"name": "blabla",
"type": "project",
"config": {
"preferred-install": {
"*": "dist"
},
"sort-packages": true
},
"require": {
"php": "^7.1.3",
"ext-rdkafka": "*",
"symfony/console": "^4.0",
"symfony/framework-bundle": "^4.0"
},
"require-dev": {
"kwn/php-rdkafka-stubs": "^1.0"
}
}
<?php
declare(strict_types = 1);
namespace App\Kafka;
use RdKafka;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
class ConsumeCommand extends ContainerAwareCommand
{
private const CONSUME_MAX_WAIT_TIME = 15 * 1000;
private const CONF_COMPRESSION_CODEC = 'gzip';
private const CONF_GROUP_ID = 'basic-lowLevel-consumer';
private const TOPIC_CONF_AUTO_COMMIT_INTERVAL = '5000';
private const TOPIC_CONF_OFFSET_STORE_METHOD = 'file'; // file|broker
private const TOPIC_CONF_OFFSET_RESET = 'smallest'; // smallest|largest
/** @var string */
private $brokersList;
/** @var int */
private $logLevel;
public function __construct(string $brokersList, int $logLevel)
{
parent::__construct();
$this->brokersList = $brokersList;
$this->logLevel = $logLevel;
}
protected function configure()
{
$this
->setName('app:kafka:consume')
->setDescription('Low-level consumer of Kafka topic')
->addArgument('topic', InputArgument::REQUIRED, 'Name of the topic to consume')
->addOption('offset', 'o', InputOption::VALUE_OPTIONAL)
->addOption('partition', 'p', InputOption::VALUE_OPTIONAL);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$topicName = $input->getArgument('topic');
$offset = (int) $input->getOption('offset') ?? RD_KAFKA_OFFSET_BEGINNING;
$partition = (int) $input->getOption('partition') ?? RD_KAFKA_PARTITION_UA;
$output->writeln("Starting consuming topic '${topicName}' from offset ${offset}, partition ${partition}");
$conf = $this->getConf();
$rk = new RdKafka\Consumer($conf);
$rk->setLogLevel($this->logLevel);
$rk->addBrokers($this->brokersList);
$topicConf = $this->getTopicConf();
$topic = $rk->newTopic($topicName, $topicConf);
$topic->consumeStart($partition, $offset);
while (TRUE) {
$message = $topic->consume($partition, self::CONSUME_MAX_WAIT_TIME);
$messageErr = (is_object($message)) ? $message->err : RD_KAFKA_RESP_ERR_UNKNOWN;
switch ($messageErr) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
case RD_KAFKA_RESP_ERR_UNKNOWN:
continue 2; // continue while
default:
throw new \Exception($message->errstr(), $message->err);
}
$topic->offsetStore($message->partition, $message->offset); // schedule offset store after successfully consuming the message
}
}
private function getConf(): RdKafka\Conf
{
$conf = new RdKafka\Conf();
$conf->set('compression.codec', self::CONF_COMPRESSION_CODEC);
$conf->set('group.id', self::CONF_GROUP_ID); // required when storing offsets
return $conf;
}
private function getTopicConf(): RdKafka\TopicConf
{
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.enable', 'false'); // don't commit offset automatically
$topicConf->set('auto.commit.interval.ms', self::TOPIC_CONF_AUTO_COMMIT_INTERVAL);
$topicConf->set('offset.store.method', self::TOPIC_CONF_OFFSET_STORE_METHOD);
if (self::TOPIC_CONF_OFFSET_STORE_METHOD === 'file') {
$topicConf->set('offset.store.path', sys_get_temp_dir());
}
// where to start consuming messages when there is no initial offset in offset store or the desired offset is out of range
$topicConf->set('auto.offset.reset', self::TOPIC_CONF_OFFSET_RESET);
return $topicConf;
}
}
version: '3'
services:
php:
build:
context: .
args:
APP_DEBUG: $APP_DEBUG
APP_ENV: $APP_ENV
image: srigi/php
kafka1:
image: wurstmeister/kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
depends_on:
- zookeeper
volumes:
- kafka1:/kafka
kafka2:
image: wurstmeister/kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_BROKER_ID: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
depends_on:
- zookeeper
volumes:
- kafka2:/kafka
zookeeper:
image: zookeeper:3.4
volumes:
- zookeeper-data:/data
- zookeeper-datalog:/datalog
volumes:
kafka1:
kafka2:
zookeeper-data:
zookeeper-datalog:
FROM php:7.1-fpm
ENV DEBIAN_FRONTEND=noninteractive
# install system libs & PHP extensions
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
librdkafka-dev \
unzip \
&& pecl install \
rdkafka \
&& docker-php-ext-enable \
rdkafka
# install composer
RUN php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');" \
&& php -r "if (hash_file('SHA384', 'composer-setup.php') === '544e09ee996cdf60ece3804abc52599c22b1f40f4323403c44d44fdfdd586475ca9813a858088ffbc1f233e9b180f061') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;" \
&& php composer-setup.php --filename=composer --install-dir=/usr/local/bin \
&& php -r "unlink('composer-setup.php');"
# Prepare app workdirs, switch to unprivileged user
WORKDIR /app
RUN mkdir -p \
var/cache \
var/logs \
var/sessions \
&& chown -R www-data:www-data \
/app \
&& chown www-data:www-data \
/var/www
USER www-data
RUN composer global require hirak/prestissimo
# Install app dependencies
ARG APP_DEBUG=0
ENV APP_DEBUG=$APP_DEBUG
ARG APP_ENV=prod
ENV APP_ENV=$APP_ENV
COPY ./composer.json ./composer.lock ./
RUN composer install --no-autoloader --no-interaction --no-scripts --no-suggest \
&& composer clearcache
# Copy app sources & initialize app
COPY ./app ./app/
COPY ./public ./public/
RUN composer dump-autoload --optimize \
&& composer clearcache
<?php
declare(strict_types = 1);
namespace App\Kafka;
use JMS\Serializer\SerializationContext;
use JMS\Serializer\SerializerInterface;
use Nette\Utils\DateTime;
use Psr\Log\LoggerInterface;
use RdKafka;
class Producer
{
protected const AFTER_PRODUCE_RECOVER_TIMEOUT = 50;
protected const MSG_FLAGS = 0;
/** @var string */
private $brokersList;
/** @var int */
private $logLevel;
/** @var LoggerInterface */
private $logger;
/** @var RdKafka\Producer */
private $producer;
/** @var SerializerInterface */
private $serializer;
/**
* @param string $brokersList Comma-separated list of brokers in the format: <broker1>,<broker2>,...
* @param int $logLevel Specifies the maximum logging level produced by internal kafka logging and debugging
* @param LoggerInterface $logger
* @param Serializer $serializer
*/
public function __construct(string $brokersList, int $logLevel, LoggerInterface $logger, SerializerInterface $serializer)
{
$this->brokersList = $brokersList;
$this->logLevel = $logLevel;
$this->logger = $logger;
$this->serializer = $serializer;
}
protected function connect(): RdKafka\Producer
{
if ($this->producer === NULL) {
$conf = new RdKafka\Conf();
$conf->set('compression.codec', 'gzip');
$this->producer = new RdKafka\Producer($conf);
$this->producer->setLogLevel($this->logLevel);
$this->producer->addBrokers($this->brokersList);
}
return $this->producer;
}
public function createTopic(string $name, array $conf = []): RdKafka\ProducerTopic
{
$producer = $this->connect();
if (empty($conf)) {
return $producer->newTopic($name);
} else {
$topicConf = new RdKafka\TopicConf();
foreach ($conf as $key => $value) {
$topicConf->set($key, $value);
}
return $producer->newTopic($name, $topicConf);
}
}
public function getMetadata(?RdKafka\Topic $topic = NULL, $local = FALSE): RdKafka\Metadata
{
$producer = $this->connect();
$metadata = $producer->getMetadata(!$local, $topic, 15 * 1000);
return $metadata;
}
public function produce(
RdKafka\ProducerTopic $topic,
array $payload,
int $partition = RD_KAFKA_PARTITION_UA,
?string $messageKey = NULL
): void
{
$context = (new SerializationContext())->setSerializeNull(TRUE);
$payloadStr = $this->serializer->serialize($payload, 'json', $context);
$this->logger->info('producing Kafka message', [
'topic' => $topic->getName(),
]);
$producer = $this->connect();
$topic->produce($partition, self::MSG_FLAGS, $payloadStr, $messageKey);
$producer->poll(self::AFTER_PRODUCE_RECOVER_TIMEOUT);
while ($producer->getOutQLen() > 0) {
$producer->poll(self::AFTER_PRODUCE_RECOVER_TIMEOUT);
}
$this->logger->info('Kafka message produced successfully', [
'topic' => $topic->getName(),
]);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment