Skip to content

Instantly share code, notes, and snippets.

@eshioji
Created June 17, 2014 21:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eshioji/d22cc62c37003af05a3c to your computer and use it in GitHub Desktop.
Save eshioji/d22cc62c37003af05a3c to your computer and use it in GitHub Desktop.
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractService;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.google.common.base.Preconditions.checkState;
/**
*
*/
public class KafkaPoster extends AbstractService {
private static final Logger log = LoggerFactory.getLogger(KafkaPoster.class);
private final MetricRegistry metricRegistry;
private final Queue<Producer<String,String>> producers = new ConcurrentLinkedQueue<Producer<String, String>>();
private final ThreadLocal<Producer<String,String>> producer;
@Inject
public KafkaPoster(MetricRegistry metricRegistry, @Named("kafka.broker.server") String kafkaServer, @Named("kafka.broker.port") int kafkaPort) {
this.metricRegistry = metricRegistry;
String broker = String.format("%s:%s", kafkaServer, kafkaPort);
Properties props = new Properties();
props.put("metadata.broker.list", broker);
props.put("producer.type", "async");
props.put("batch.num.messages", "300");
props.put("compression.codec", "snappy");
props.put("serializer.class", "kafka.serializer.StringEncoder");
final ProducerConfig config = new ProducerConfig(props);
producer = new ThreadLocal<Producer<String, String>>(){
@Override
protected Producer<String, String> initialValue() {
Producer<String, String> p = new Producer<String, String>(config);
producers.add(p);
return p;
}
};
}
@Override
protected void doStart() {
new Thread() {
@Override
public void run() {
try {
notifyStarted();
} catch (Throwable e) {
notifyFailed(e);
throw Throwables.propagate(e);
}
}
}.start();
}
public void post(String topic, String message) {
checkState(isRunning());
Timer.Context t = metricRegistry.timer("kafka-post." + topic).time();
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message);
producer.get().send(data);
t.stop();
}
@Override
protected void doStop() {
new Thread() {
@Override
public void run() {
try {
for (Producer<String, String> producer : producers) {
producer.close();
}
notifyStopped();
} catch (Throwable e) {
notifyFailed(e);
throw Throwables.propagate(e);
}
}
}.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment