Skip to content

Instantly share code, notes, and snippets.

@php-cpm
Last active July 25, 2017 12:25
Show Gist options
  • Save php-cpm/8fe4a0c45c41db74ec19526007242d70 to your computer and use it in GitHub Desktop.
Save php-cpm/8fe4a0c45c41db74ec19526007242d70 to your computer and use it in GitHub Desktop.
PHP读取kafka的消费offset和生产的offset并计算队列积压情况
<?php
// composer require "nmred/kafka-php"
require_once __DIR__.'/vendor/autoload.php';
$server = 'zookeeperHost:2181';
$topic = 'topic_name';
$group = 'consumer_group';
$zookeeper = new \Kafka\ZooKeeper($server);
$listBroker = $zookeeper->listBrokers();
$hostList = [];
/*获取原信息*/
foreach ($listBroker as $broker) {
$hostList[] = $broker['host'] . ':' . $broker['port'] ;
}
$metaData = new \Kafka\MetaDataFromKafka($hostList);
$client = new \Kafka\Client($metaData);
$topicDetail = $client->getTopicDetail($topic);
/*获取offset的详情*/
$partitions = $topicDetail['partitions'];
foreach ($partitions as $key => $partition) {
$offset = new \Kafka\Offset($client, $group, $topic, $key);
$ConsumerOffset = $offset->getOffset();
print_r($ConsumerOffset."\r\n");
$produceLastOffset = $offset->getProduceOffset(\Kafka\Offset::LAST_OFFSET);
print_r($produceLastOffset."\r\n");
// $offset->setOffset($produceEarlyOffset);
echo "now lan is ".( $produceLastOffset - $ConsumerOffset)."\r\n";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment