Skip to content

Instantly share code, notes, and snippets.

@kaniska
Created April 27, 2016 21:42
Show Gist options
  • Save kaniska/5ceb214193b7604815fb23b74151bd93 to your computer and use it in GitHub Desktop.
Save kaniska/5ceb214193b7604815fb23b74151bd93 to your computer and use it in GitHub Desktop.
Flink-Kafka-Avro
/**
*
*/
package com.xyz.topology.netflow.beam;
import java.util.Properties;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.joda.time.Duration;
import com.xyz.schemas.Test;
public class BeamKafkaFlinkAvroConsumerTest {
private static final String TOPIC = "topic3";
private static BeamKafkaOptions options;
private static Properties props = new Properties();
public static void setup(String[] args) {
PipelineOptionsFactory.register(BeamKafkaOptions.class);
options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class);
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection());
// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList());
options.setKafkaTopic(TOPIC);
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
options.setNumberOfExecutionRetries(5);
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " "
+ options.getGroup());
props.setProperty("zookeeper.connect", options.getZookeeper());
props.setProperty("bootstrap.servers", options.getBroker());
props.setProperty("group.id", options.getGroup());
}
public static UnboundedSource<Test, CheckpointMark> consumeMessages() {
AvroDeserializationSchema schema = new AvroDeserializationSchema(Test.class);
//TypeInformation<Test> info = TypeExtractor.getForClass(Test.class);
//TypeInformationSerializationSchema<Test> schema = new TypeInformationSerializationSchema<Test>(info,
// new ExecutionConfig());
FlinkKafkaConsumer08<Test> kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC, schema, props);
return UnboundedFlinkSource.of(kafkaConsumer);
}
public static void main(String args[]) {
setup(args);
Pipeline pipeline = Pipeline.create(options);
PCollection<Test> users = pipeline.apply(
Read.named("StreamingWordCount").from(consumeMessages()))
.apply(Window.<Test> into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
// users.apply(ConsoleIO.Write.create());
PCollection<Long> counts = users.apply(Count.globally());
// .apply(ConsoleIO.Write.create());
// .apply(TextIO.Write.to("outputKafka.txt"));
System.out.println("***************** " + counts);
PipelineResult result = pipeline.run();
System.out.println("***************** " + result.toString());
}
}
class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
@Override
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
return reader.read(null, decoder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
}
/**
*
*/
package com.xyz.topology.netflow.beam;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import com.xyz.schemas.Test;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author kaniska
*
*/
public class BeamKafkaFlinkAvroProducerTest {
private static final String TOPIC = "topic3";
private static BeamKafkaOptions options;
private static Properties props = new Properties();
public static void setup(String[] args) {
PipelineOptionsFactory.register(BeamKafkaOptions.class);
options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class);
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection());
// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList());
options.setKafkaTopic(TOPIC);
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
options.setNumberOfExecutionRetries(5);
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " "
+ options.getGroup());
props.setProperty("zookeeper.connect", options.getZookeeper());
props.setProperty("bootstrap.servers", options.getBroker());
props.setProperty("group.id", options.getGroup());
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
}
private void produceData1(){
FlinkKafkaProducer08<User> kafkaSink =
new FlinkKafkaProducer08<>(TOPIC, new AvroSerializationSchema(User.class), props);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Create.of(
new User("Joe", 3, "red"),
new User("Mary", 4, "blue"),
new User("Mark", 1, "green"),
new User("Julia", 5, "purple")))
//.withCoder(AvroCoder.of(User.class)))
.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
pipeline.run();
}
private static void produceAvroData2(){
TypeInformation<Test> info = TypeExtractor.getForClass(Test.class);
TypeInformationSerializationSchema<Test> schema =new TypeInformationSerializationSchema<Test>(info, new ExecutionConfig());
AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
FlinkKafkaProducer08<Test> kafkaSink =
new FlinkKafkaProducer08<>(TOPIC, schema, props);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Create.of(
new Test("Joe", 6))
.withCoder(AvroCoder.of(Test.class))).
apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
pipeline.run();
}
private static void produceSimpleData() throws IOException{
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, byte[]> kafkaProducer = new Producer<String, byte[]>(config);
Test test = new Test("Don", 32);
// serializing in avro format
//DatumWriter<NetFlowPkt> datumWriter = new SpecificDatumWriter<NetFlowPkt>(NetFlowPkt
// .class);
DatumWriter<Test> datumWriter = new SpecificDatumWriter<Test>(Test
.class);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
datumWriter.write(test, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
out.close();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("topic3",
serializedBytes);
kafkaProducer.send(message);
kafkaProducer.close();
}
public static void main(String args[]){
setup(args);
try {
//produceSimpleData();
produceAvroData2();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
public void testDeSerialization() {
try {
TypeInformation<User> info = TypeExtractor.getForClass(User.class);
TypeInformationSerializationSchema<User> schema =
new TypeInformationSerializationSchema<User>(info, new ExecutionConfig());
User[] types = {
new User(72, new Date(763784523L), new Date(88234L)),
new User(-1, new Date(11111111111111L)),
new User(42),
new User(17, new Date(222763784523L))
};
for (User val : types) {
byte[] serialized = schema.serialize(val);
User deser = schema.deserialize(serialized);
assertEquals(val, deser);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
**/
private static class AvroSerializationSchema<T> implements SerializationSchema {
private final Class<T> avroType;
private transient GenericDatumWriter writer;
private transient BinaryEncoder encoder;
private T obj;
public AvroSerializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
@Override
public byte[] serialize(Object elem) {
obj = (T)elem;
ensureInitialized();
// TODO Auto-generated method stub
//return SerializationUtils.serialize((Serializable) obj);
ByteArrayOutputStream out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
byte[] serializedBytes = null;
try {
writer.write(obj, encoder);
encoder.flush();
serializedBytes = out.toByteArray();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
return serializedBytes;
}
private void ensureInitialized() {
if (writer == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
writer = new SpecificDatumWriter<T>(avroType);
//if(obj instanceof GenericRecord) {
// writer = new GenericDatumWriter(((GenericRecord)obj).getSchema());
//}else {
// writer = new SpecificDatumWriter<T>(avroType);
//}
} else {
writer = new ReflectDatumWriter<T>(avroType);
}
}
}
}
}
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.xyz.schemas;
import java.io.Serializable;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Test extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord , Serializable{
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.xyz.schemas\",\"fields\":[{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.lang.CharSequence uname;
@Deprecated public int id;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Test() {}
/**
* All-args constructor.
*/
public Test(java.lang.CharSequence uname, java.lang.Integer id) {
this.uname = uname;
this.id = id;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return uname;
case 1: return id;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: uname = (java.lang.CharSequence)value$; break;
case 1: id = (java.lang.Integer)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'uname' field.
*/
public java.lang.CharSequence getUname() {
return uname;
}
/**
* Sets the value of the 'uname' field.
* @param value the value to set.
*/
public void setUname(java.lang.CharSequence value) {
this.uname = value;
}
/**
* Gets the value of the 'id' field.
*/
public java.lang.Integer getId() {
return id;
}
/**
* Sets the value of the 'id' field.
* @param value the value to set.
*/
public void setId(java.lang.Integer value) {
this.id = value;
}
/** Creates a new Test RecordBuilder */
public static com.xyz.schemas.Test.Builder newBuilder() {
return new com.xyz.schemas.Test.Builder();
}
/** Creates a new Test RecordBuilder by copying an existing Builder */
public static com.xyz.schemas.Test.Builder newBuilder(com.xyz.schemas.Test.Builder other) {
return new com.xyz.schemas.Test.Builder(other);
}
/** Creates a new Test RecordBuilder by copying an existing Test instance */
public static com.xyz.schemas.Test.Builder newBuilder(com.xyz.schemas.Test other) {
return new com.xyz.schemas.Test.Builder(other);
}
/**
* RecordBuilder for Test instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Test>
implements org.apache.avro.data.RecordBuilder<Test> {
private java.lang.CharSequence uname;
private int id;
/** Creates a new Builder */
private Builder() {
super(com.xyz.schemas.Test.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(com.xyz.schemas.Test.Builder other) {
super(other);
if (isValidValue(fields()[0], other.uname)) {
this.uname = data().deepCopy(fields()[0].schema(), other.uname);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.id)) {
this.id = data().deepCopy(fields()[1].schema(), other.id);
fieldSetFlags()[1] = true;
}
}
/** Creates a Builder by copying an existing Test instance */
private Builder(com.xyz.schemas.Test other) {
super(com.xyz.schemas.Test.SCHEMA$);
if (isValidValue(fields()[0], other.uname)) {
this.uname = data().deepCopy(fields()[0].schema(), other.uname);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.id)) {
this.id = data().deepCopy(fields()[1].schema(), other.id);
fieldSetFlags()[1] = true;
}
}
/** Gets the value of the 'uname' field */
public java.lang.CharSequence getUname() {
return uname;
}
/** Sets the value of the 'uname' field */
public com.xyz.schemas.Test.Builder setUname(java.lang.CharSequence value) {
validate(fields()[0], value);
this.uname = value;
fieldSetFlags()[0] = true;
return this;
}
/** Checks whether the 'uname' field has been set */
public boolean hasUname() {
return fieldSetFlags()[0];
}
/** Clears the value of the 'uname' field */
public com.xyz.schemas.Test.Builder clearUname() {
uname = null;
fieldSetFlags()[0] = false;
return this;
}
/** Gets the value of the 'id' field */
public java.lang.Integer getId() {
return id;
}
/** Sets the value of the 'id' field */
public com.xyz.schemas.Test.Builder setId(int value) {
validate(fields()[1], value);
this.id = value;
fieldSetFlags()[1] = true;
return this;
}
/** Checks whether the 'id' field has been set */
public boolean hasId() {
return fieldSetFlags()[1];
}
/** Clears the value of the 'id' field */
public com.xyz.schemas.Test.Builder clearId() {
fieldSetFlags()[1] = false;
return this;
}
@Override
public Test build() {
try {
Test record = new Test();
record.uname = fieldSetFlags()[0] ? this.uname : (java.lang.CharSequence) defaultValue(fields()[0]);
record.id = fieldSetFlags()[1] ? this.id : (java.lang.Integer) defaultValue(fields()[1]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}
package com.xyz.topology.netflow.beam;
import java.io.Serializable;
import org.apache.avro.Schema;
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private String name;
private int favoriteNumber;
private String favoriteColor;
public User() {
}
public User(String name, int favoriteNumber, String favoriteColor) {
this.name = name;
this.favoriteNumber = favoriteNumber;
this.favoriteColor = favoriteColor;
}
public String getName() {
return name;
}
public String getFavoriteColor() {
return favoriteColor;
}
public int getFavoriteNumber() {
return favoriteNumber;
}
@Override
public Schema getSchema() {
// TODO Auto-generated method stub
return null;
}
@Override
public Object get(int field) {
// TODO Auto-generated method stub
return null;
}
@Override
public void put(int field, Object value) {
// TODO Auto-generated method stub
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment