Skip to content

Instantly share code, notes, and snippets.

@mustafo
Last active January 3, 2019 05:21
Show Gist options
  • Save mustafo/5533b3fbc5a6d7f4a7c75439e68ad583 to your computer and use it in GitHub Desktop.
Save mustafo/5533b3fbc5a6d7f4a7c75439e68ad583 to your computer and use it in GitHub Desktop.
Kafka laravel commands
<?php
namespace App\Console\Commands;
use App\Core\Audit\Models\Activity;
use Illuminate\Console\Command;
class KafkaConsume extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:test';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$conf = new \RdKafka\Conf();
$conf->set('bootstrap.servers', 'ark-01.srvs.cloudkafka.com:9094');
$conf->set('group.id', "8x35zqny-consumer");
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'SCRAM-SHA-256');
$conf->set('sasl.username', '8x35zqny');
$conf->set('sasl.password', '0u8GzFXvW3G-R7nCCY22q7Je1y9x-WRY');
$topicConf = new \RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['8x35zqny-alif.pro']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "$message->payload\n";
$activity = Activity::find(1);
$activity->payload = $message->payload;
$activity->save();
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
}
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
class KafkaProduce extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:produce';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$conf = new \RdKafka\Conf();
$conf->set('bootstrap.servers', "ark-01.srvs.cloudkafka.com:9094,ark-02.srvs.cloudkafka.com:9094,ark-03.srvs.cloudkafka.com:9094");
$conf->set('group.id', "8x35zqny-consumer");
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'SCRAM-SHA-256');
$conf->set('sasl.username', '8x35zqny');
$conf->set('sasl.password', '0u8GzFXvW3G-R7nCCY22q7Je1y9x-WRY');
$rk = new \RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$topic = $rk->newTopic('8x35zqny-alif.pro');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello from PHP");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment