Skip to content

Instantly share code, notes, and snippets.

@lennartkoopmann
Created November 27, 2013 14:50
Show Gist options
  • Save lennartkoopmann/7676894 to your computer and use it in GitHub Desktop.
Save lennartkoopmann/7676894 to your computer and use it in GitHub Desktop.
public class KafkaProducer implements RadioTransport {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
public final static String KAFKA_TOPIC = "graylog2-radio-messages";
private final Producer<byte[], byte[]> producer;
private final MessagePack msgPack;
public KafkaProducer(Radio radio) {
msgPack = new MessagePack();
Properties props = new Properties();
props.put("metadata.broker.list", radio.getConfiguration().getKafkaBrokers());
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("request.required.acks", String.valueOf(radio.getConfiguration().getKafkaRequiredAcks()));
props.put("client.id", "graylog2-radio-" + radio.getNodeId());
props.put("producer.type", radio.getConfiguration().getKafkaProducerType());
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<byte[], byte[]>(config);
}
@Override
public void send(Message msg) {
KeyedMessage<byte[], byte[]> data;
try {
data = new KeyedMessage<byte[], byte[]>(KAFKA_TOPIC, msg.getId().getBytes(), serialize(msg));
} catch(IOException e) {
LOG.error("Could not serialize message.");
return;
}
producer.send(data);
}
public byte[] serialize(Message msg) throws IOException {
Map<String, Integer> ints = Maps.newHashMap();
Map<String, String> strings = Maps.newHashMap();
for(Map.Entry<String, Object> field : msg.getFields().entrySet()) {
if (field.getValue() instanceof String) {
strings.put(field.getKey(), (String) field.getValue());
} else if (field.getValue() instanceof Integer) {
ints.put(field.getKey(), (Integer) field.getValue());
}
}
RadioMessage radioMessage = new RadioMessage();
radioMessage.strings = strings;
radioMessage.ints = ints;
radioMessage.timestamp = ((DateTime) msg.getField("timestamp")).getMillis();
return msgPack.write(radioMessage);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment