Created
June 29, 2013 20:06
-
-
Save mrflip/5892480 to your computer and use it in GitHub Desktop.
Trident Kafka State
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.infochimps.storm.trident; | |
import kafka.javaapi.producer.Producer; | |
import kafka.javaapi.producer.ProducerData; | |
import kafka.message.Message; | |
import kafka.serializer.Encoder; | |
import kafka.producer.ProducerConfig; | |
import backtype.storm.task.IMetricsContext; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.state.BaseStateUpdater; | |
import storm.trident.state.State; | |
import storm.trident.state.StateFactory; | |
import storm.trident.tuple.TridentTuple; | |
import java.nio.charset.Charset; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
public class KafkaState implements State { | |
protected String kafkaTopic; | |
protected String zookeeperHosts; | |
protected Producer<String, String> producer; | |
public static class Factory implements StateFactory { | |
protected String kafkaTopic; | |
protected String zookeeperHosts; | |
public Factory(String kafkaTopic, String zookeeperHosts) { | |
this.kafkaTopic = kafkaTopic; | |
this.zookeeperHosts = zookeeperHosts; | |
} | |
public State makeState(Map conf, | |
int partitionIndex, | |
int numPartitions) { | |
return makeState(conf, null, partitionIndex, numPartitions); | |
} | |
public State makeState(Map conf, | |
IMetricsContext context, | |
int partitionIndex, | |
int numPartitions) { | |
return new KafkaState(kafkaTopic, zookeeperHosts); | |
} | |
} | |
public static class Updater extends BaseStateUpdater<KafkaState> { | |
@Override | |
public void updateState(KafkaState state, List<TridentTuple> tuples, TridentCollector collector) { | |
state.setBulk(tuples); | |
} | |
} | |
public KafkaState(String kafkaTopic, String zookeeperHosts) { | |
this.kafkaTopic = kafkaTopic; | |
this.zookeeperHosts = zookeeperHosts; | |
} | |
@Override | |
public void beginCommit(Long txid) { | |
} | |
@Override | |
public void commit(Long txid) { | |
} | |
public void setBulk(List<TridentTuple> tuples) { | |
Properties props = new Properties(); | |
props.put("zk.connect", zookeeperHosts); | |
props.put("serializer.class", "com.infochimps.storm.trident.KafkaState$UTF8Encoder"); | |
System.out.println("zk.connect: " + props.get("zk.connect")); | |
ProducerConfig config = new ProducerConfig(props); | |
producer = new Producer<String, String>(config); | |
ProducerData<String, String> producerData; | |
List<ProducerData<String, String>> batchData = new ArrayList<ProducerData<String, String>>(); | |
for (TridentTuple tuple : tuples) { | |
producerData = new ProducerData<String, String>(kafkaTopic, tuple.getString(0)); | |
batchData.add(producerData); | |
} | |
System.out.println("Writing " + tuples.size() + " records to Kafka topic \"" + kafkaTopic + "\""); | |
producer.send(batchData); | |
} | |
//---------------------------------------------------------------------------- | |
public static class UTF8Encoder implements Encoder<String> { | |
@Override public Message toMessage(String event) { | |
return new Message(event.getBytes(Charset.forName("UTF-8"))); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment