Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active December 27, 2015 04:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/7269312 to your computer and use it in GitHub Desktop.
Save ottomata/7269312 to your computer and use it in GitHub Desktop.
package com.criteo.kafka;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricPredicate;
import com.yammer.metrics.core.VirtualMachineMetrics;
import com.yammer.metrics.reporting.GangliaReporter;
import com.yammer.metrics.reporting.GangliaMessageBuilder;
import kafka.metrics.KafkaMetricsConfig;
import kafka.metrics.KafkaMetricsReporter;
import kafka.metrics.KafkaMetricsReporterMBean;
import kafka.utils.VerifiableProperties;
public class KafkaGangliaMetricsReporter implements KafkaMetricsReporter,
KafkaGangliaMetricsReporterMBean {
static Logger LOG = Logger.getLogger(KafkaGangliaMetricsReporter.class);
static String GANGLIA_DEFAULT_HOST = "localhost";
static int GANGLIA_DEFAULT_PORT = 8649;
static boolean GANGLIA_USE_MULTICAST = false;
static int GANGLIA_MULTICAST_TTL = 3;
static String GANGLIA_DEFAULT_PREFIX = "kafka";
boolean initialized = false;
boolean running = false;
GangliaReporter reporter = null;
String gangliaHost = GANGLIA_DEFAULT_HOST;
int gangliaPort = GANGLIA_DEFAULT_PORT;
boolean gangliaMulticastEnabled = GANGLIA_MULTICAST_ENABLED;
int gangliaMulticastTTL = GANGLIA_MULTICAST_TTL;
String gangliaGroupPrefix = GANGLIA_DEFAULT_PREFIX;
MetricPredicate predicate = MetricPredicate.ALL;
GangliaMessageBuilder gangliaMessageBuilder = null;
@Override
public String getMBeanName() {
return "kafka:type=com.criteo.kafka.KafkaGangliaMetricsReporter";
}
@Override
public synchronized void startReporter(long pollingPeriodSecs) {
if (initialized && !running) {
reporter.start(pollingPeriodSecs, TimeUnit.SECONDS);
running = true;
LOG.info(String.format("Started Kafka Ganglia metrics reporter with polling period %d seconds", pollingPeriodSecs));
}
}
@Override
public synchronized void stopReporter() {
if (initialized && running) {
reporter.shutdown();
running = false;
LOG.info("Stopped Kafka Ganglia metrics reporter");
try {
reporter = new GangliaReporter(
Metrics.defaultRegistry(),
gangliaGroupPrefix,
predicate,
false,
gangliaMessageBuilder,
VirtualMachineMetrics.getInstance()
);
} catch (IOException e) {
LOG.error("Unable to initialize GangliaReporter", e);
}
}
}
@Override
public synchronized void init(VerifiableProperties props) {
if (!initialized) {
KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
gangliaHost = props.getString("kafka.ganglia.metrics.host", GANGLIA_DEFAULT_HOST);
gangliaPort = props.getInt("kafka.ganglia.metrics.port", GANGLIA_DEFAULT_PORT);
gangliaMulticastEnabled = props.getBoolean("kafka.ganglia.metrics.multicast.enabled", GANGLIA_MULTICAST_ENABLED);
gangliaMulticastTTL = props.getInt("kafka.ganglia.metrics.multicast.ttl", GANGLIA_MULTICAST_TTL);
gangliaGroupPrefix = props.getString("kafka.ganglia.metrics.group", GANGLIA_DEFAULT_PREFIX);
String predicateRegex = props.getString("kafka.ganglia.metrics.exclude.regex", null);
if (predicateRegex != null) {
predicate = new RegexMetricPredicate(predicateRegex);
}
try {
// use MulticastGangliaMessageBuilder
// if ganglia is configured via multicast.
if (gangliaMulticastEnabled) {
gangliaMessageBuilder = new MulticastGangliaMessageBuilder(gangliaHost, gangliaPort, multicastTTL);
}
else {
gangliaMessageBuilder = new GangliaMessageBuilder(gangliaHost, gangliaPort);
}
// instantiate a new GangliaReporter
// with the proper GangliaMessageBuilder
reporter = new GangliaReporter(
Metrics.defaultRegistry(),
gangliaGroupPrefix,
predicate,
false,
gangliaMessageBuilder,
VirtualMachineMetrics.getInstance()
);
} catch (IOException e) {
LOG.error("Unable to initialize GangliaReporter", e);
}
if (props.getBoolean("kafka.ganglia.metrics.reporter.enabled", false)) {
initialized = true;
startReporter(metricsConfig.pollingIntervalSecs());
}
}
}
}
package com.criteo.kafka;
import com.yammer.metrics.reporting.GangliaMessageBuilder;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
class MulticastGangliaMessageBuilder extends GangliaMessageBuilder {
MulticastGangliaMessageBuilder(String mcastaddr, int port, int ttl) throws SocketException {
this.inetSocketAddress = new InetSocketAddress(mcastaddr, port);
this.datagramSocket = new MulticastSocket();
this.datagramSocket.setTimeToLive(ttl);
this.datagramSocket.joinGroup(this.inetSocketAddress);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment