Skip to content

Instantly share code, notes, and snippets.

@mrflip
Created June 29, 2013 20:06
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrflip/5892480 to your computer and use it in GitHub Desktop.
Save mrflip/5892480 to your computer and use it in GitHub Desktop.
Trident Kafka State
package com.infochimps.storm.trident;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import kafka.serializer.Encoder;
import kafka.producer.ProducerConfig;
import backtype.storm.task.IMetricsContext;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseStateUpdater;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.tuple.TridentTuple;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaState implements State {
protected String kafkaTopic;
protected String zookeeperHosts;
protected Producer<String, String> producer;
public static class Factory implements StateFactory {
protected String kafkaTopic;
protected String zookeeperHosts;
public Factory(String kafkaTopic, String zookeeperHosts) {
this.kafkaTopic = kafkaTopic;
this.zookeeperHosts = zookeeperHosts;
}
public State makeState(Map conf,
int partitionIndex,
int numPartitions) {
return makeState(conf, null, partitionIndex, numPartitions);
}
public State makeState(Map conf,
IMetricsContext context,
int partitionIndex,
int numPartitions) {
return new KafkaState(kafkaTopic, zookeeperHosts);
}
}
public static class Updater extends BaseStateUpdater<KafkaState> {
@Override
public void updateState(KafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
state.setBulk(tuples);
}
}
public KafkaState(String kafkaTopic, String zookeeperHosts) {
this.kafkaTopic = kafkaTopic;
this.zookeeperHosts = zookeeperHosts;
}
@Override
public void beginCommit(Long txid) {
}
@Override
public void commit(Long txid) {
}
public void setBulk(List<TridentTuple> tuples) {
Properties props = new Properties();
props.put("zk.connect", zookeeperHosts);
props.put("serializer.class", "com.infochimps.storm.trident.KafkaState$UTF8Encoder");
System.out.println("zk.connect: " + props.get("zk.connect"));
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
ProducerData<String, String> producerData;
List<ProducerData<String, String>> batchData = new ArrayList<ProducerData<String, String>>();
for (TridentTuple tuple : tuples) {
producerData = new ProducerData<String, String>(kafkaTopic, tuple.getString(0));
batchData.add(producerData);
}
System.out.println("Writing " + tuples.size() + " records to Kafka topic \"" + kafkaTopic + "\"");
producer.send(batchData);
}
//----------------------------------------------------------------------------
public static class UTF8Encoder implements Encoder<String> {
@Override public Message toMessage(String event) {
return new Message(event.getBytes(Charset.forName("UTF-8")));
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment