Last active
December 27, 2015 04:49
-
-
Save ottomata/7269312 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.criteo.kafka; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.log4j.Logger; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.Metric; | |
import com.yammer.metrics.core.MetricName; | |
import com.yammer.metrics.core.MetricPredicate; | |
import com.yammer.metrics.core.VirtualMachineMetrics; | |
import com.yammer.metrics.reporting.GangliaReporter; | |
import com.yammer.metrics.reporting.GangliaMessageBuilder; | |
import kafka.metrics.KafkaMetricsConfig; | |
import kafka.metrics.KafkaMetricsReporter; | |
import kafka.metrics.KafkaMetricsReporterMBean; | |
import kafka.utils.VerifiableProperties; | |
public class KafkaGangliaMetricsReporter implements KafkaMetricsReporter, | |
KafkaGangliaMetricsReporterMBean { | |
static Logger LOG = Logger.getLogger(KafkaGangliaMetricsReporter.class); | |
static String GANGLIA_DEFAULT_HOST = "localhost"; | |
static int GANGLIA_DEFAULT_PORT = 8649; | |
static boolean GANGLIA_USE_MULTICAST = false; | |
static int GANGLIA_MULTICAST_TTL = 3; | |
static String GANGLIA_DEFAULT_PREFIX = "kafka"; | |
boolean initialized = false; | |
boolean running = false; | |
GangliaReporter reporter = null; | |
String gangliaHost = GANGLIA_DEFAULT_HOST; | |
int gangliaPort = GANGLIA_DEFAULT_PORT; | |
boolean gangliaMulticastEnabled = GANGLIA_MULTICAST_ENABLED; | |
int gangliaMulticastTTL = GANGLIA_MULTICAST_TTL; | |
String gangliaGroupPrefix = GANGLIA_DEFAULT_PREFIX; | |
MetricPredicate predicate = MetricPredicate.ALL; | |
GangliaMessageBuilder gangliaMessageBuilder = null; | |
@Override | |
public String getMBeanName() { | |
return "kafka:type=com.criteo.kafka.KafkaGangliaMetricsReporter"; | |
} | |
@Override | |
public synchronized void startReporter(long pollingPeriodSecs) { | |
if (initialized && !running) { | |
reporter.start(pollingPeriodSecs, TimeUnit.SECONDS); | |
running = true; | |
LOG.info(String.format("Started Kafka Ganglia metrics reporter with polling period %d seconds", pollingPeriodSecs)); | |
} | |
} | |
@Override | |
public synchronized void stopReporter() { | |
if (initialized && running) { | |
reporter.shutdown(); | |
running = false; | |
LOG.info("Stopped Kafka Ganglia metrics reporter"); | |
try { | |
reporter = new GangliaReporter( | |
Metrics.defaultRegistry(), | |
gangliaGroupPrefix, | |
predicate, | |
false, | |
gangliaMessageBuilder, | |
VirtualMachineMetrics.getInstance() | |
); | |
} catch (IOException e) { | |
LOG.error("Unable to initialize GangliaReporter", e); | |
} | |
} | |
} | |
@Override | |
public synchronized void init(VerifiableProperties props) { | |
if (!initialized) { | |
KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props); | |
gangliaHost = props.getString("kafka.ganglia.metrics.host", GANGLIA_DEFAULT_HOST); | |
gangliaPort = props.getInt("kafka.ganglia.metrics.port", GANGLIA_DEFAULT_PORT); | |
gangliaMulticastEnabled = props.getBoolean("kafka.ganglia.metrics.multicast.enabled", GANGLIA_MULTICAST_ENABLED); | |
gangliaMulticastTTL = props.getInt("kafka.ganglia.metrics.multicast.ttl", GANGLIA_MULTICAST_TTL); | |
gangliaGroupPrefix = props.getString("kafka.ganglia.metrics.group", GANGLIA_DEFAULT_PREFIX); | |
String predicateRegex = props.getString("kafka.ganglia.metrics.exclude.regex", null); | |
if (predicateRegex != null) { | |
predicate = new RegexMetricPredicate(predicateRegex); | |
} | |
try { | |
// use MulticastGangliaMessageBuilder | |
// if ganglia is configured via multicast. | |
if (gangliaMulticastEnabled) { | |
gangliaMessageBuilder = new MulticastGangliaMessageBuilder(gangliaHost, gangliaPort, multicastTTL); | |
} | |
else { | |
gangliaMessageBuilder = new GangliaMessageBuilder(gangliaHost, gangliaPort); | |
} | |
// instantiate a new GangliaReporter | |
// with the proper GangliaMessageBuilder | |
reporter = new GangliaReporter( | |
Metrics.defaultRegistry(), | |
gangliaGroupPrefix, | |
predicate, | |
false, | |
gangliaMessageBuilder, | |
VirtualMachineMetrics.getInstance() | |
); | |
} catch (IOException e) { | |
LOG.error("Unable to initialize GangliaReporter", e); | |
} | |
if (props.getBoolean("kafka.ganglia.metrics.reporter.enabled", false)) { | |
initialized = true; | |
startReporter(metricsConfig.pollingIntervalSecs()); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.criteo.kafka; | |
import com.yammer.metrics.reporting.GangliaMessageBuilder; | |
import java.net.DatagramSocket; | |
import java.net.InetSocketAddress; | |
import java.net.SocketException; | |
class MulticastGangliaMessageBuilder extends GangliaMessageBuilder { | |
MulticastGangliaMessageBuilder(String mcastaddr, int port, int ttl) throws SocketException { | |
this.inetSocketAddress = new InetSocketAddress(mcastaddr, port); | |
this.datagramSocket = new MulticastSocket(); | |
this.datagramSocket.setTimeToLive(ttl); | |
this.datagramSocket.joinGroup(this.inetSocketAddress); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment