Skip to content

Instantly share code, notes, and snippets.

@oluies
Created November 1, 2017 13:44
Show Gist options
  • Save oluies/f18e05f653fc8f6e189c6a2824c4e02e to your computer and use it in GitHub Desktop.
Save oluies/f18e05f653fc8f6e189c6a2824c4e02e to your computer and use it in GitHub Desktop.
public static void main(String[] args) throws IOException {
PipelineOptionsFactory.register(PipelineCmdLineOptions.class);
PipelineCmdLineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineCmdLineOptions.class);
//Config config = ConfigFactory.parseFile(new File(args[0]));
//LOG.info(config.root().render(ConfigRenderOptions.concise().setFormatted(true)));
//options.setJobName("NBI Kafka to Elastic");
/*ConnectionConfiguration connectionConfiguration = null;
try {
connectionConfiguration = getConnectionConfiguration(options,ElasticsearchIOReadOrWrite.WRITE);
} catch (IOException e) {
final String msg = "FATAL: Connection to elasticsearch " + options.getElasticsearchServer() + ":" + options.getElasticsearchHttpPort() + " failed";
System.err.format(msg);
LOG.error(msg,e);
// return;
}
*/
ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.create(new String[] {"http://elasticsearch:9200"},"bigindex_write","bigindex");
Pipeline p = Pipeline.create(options);
PTransform<PBegin, PCollection<KV<String, LineageEvent>>> kafka = KafkaIO.<String,LineageEvent>read()
.withBootstrapServers("kafka:9092")
.withTopic("lineage")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(LinageEventDeserializer.class, AvroCoder.of(LineageEvent.class))
// .withTimestampFn(new LinageEventTimeStampFunction())
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"http://registry:8081"))
// .updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 2 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
//.withMaxNumRecords(10)
.withoutMetadata();
long ELASTIC_BATCH_SIZE = 1000;
p.apply(kafka)
.apply(Values.<LineageEvent>create())
.apply("FormatLinageEvents", ParDo.of(new DoFn<LineageEvent, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LineageEvent e = c.element();
final String s = e.toString();
LOG.info(s);
c.output(s);
}
}))
.apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration).withMaxBatchSize(ELASTIC_BATCH_SIZE));
p.run().waitUntilFinish();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment