Skip to content

Instantly share code, notes, and snippets.

@aromanenko-dev
Created February 26, 2020 10:13
Show Gist options
  • Save aromanenko-dev/065a48d6abccf85f9a7ad32f291dee2c to your computer and use it in GitHub Desktop.
Save aromanenko-dev/065a48d6abccf85f9a7ad32f291dee2c to your computer and use it in GitHub Desktop.
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaAvroGenericRecordSql {
private static final String SCHEMA_STRING =
"{\n" + " \"type\": \"record\",\n" + " \"name\": \"Payment\",\n"
+ " \"namespace\": \"io.confluent.examples.clients.basicavro\",\n" + " \"fields\": [\n"
+ " {\n" + " \"name\": \"id\",\n" + " \"type\": \"string\"\n" + " },\n"
+ " {\n" + " \"name\": \"amount\",\n" + " \"type\": \"double\"\n" + " }\n"
+ " ]\n" + "}";
private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);
public static void main(String[] args) {
org.apache.beam.sdk.schemas.Schema schema = AvroUtils.getSchema(GenericRecord.class, SCHEMA);
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers("localhost:9092").withTopic("transactions")
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", "http://localhost:8081"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(SCHEMA));
p.apply(read)
.apply(ParDo.of(new DoFn<KafkaRecord<String, GenericRecord>, GenericRecord>() {
@ProcessElement public void processElement(ProcessContext ctx) {
KafkaRecord<String, GenericRecord> element = ctx.element();
GenericRecord value = element.getKV().getValue();
System.out.println(value);
System.out.println(value.getClass().getName());
ctx.output(value);
}
})).setSchema(schema,
AvroUtils.getToRowFunction(GenericRecord.class, SCHEMA),
AvroUtils.getFromRowFunction(GenericRecord.class))
.apply(SqlTransform.query("SELECT id FROM PCOLLECTION"))
.apply("Output", MapElements.via(new SimpleFunction<Row, Row>() {
@Override public Row apply(Row input) {
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}));
;
p.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment