Skip to content

Instantly share code, notes, and snippets.

@shrijitpillai
Created December 9, 2018 21:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb to your computer and use it in GitHub Desktop.
Save shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb to your computer and use it in GitHub Desktop.
Snippet to join data from Hbase and Kafka using FixedWindows strategy
final Pipeline pi = Pipeline.create(directRunnerOptions);
// read from kafka
final PCollection<KV<String, String>> stream = pi.apply(KafkaIO.<String, String>read()
.withBootstrapServers("kafka:29092").withTopic("words").withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object) "earliest")).withoutMetadata());
// apply fixed window and trigger
final PCollection<KV<String, String>> windowedStream = stream.apply(Window
.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(20)))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(20)))
.accumulatingFiredPanes());
// read from hbase, apply fixed window and trigger
final PCollection<Result> hbaseResult = pi.apply(
HBaseIO.read().withConfiguration(conf).withTableId(TableName.valueOf(TABLE_NAME).getNameAsString()))
.apply(Window
.<Result>into(FixedWindows.of(Duration.standardSeconds(20)))
.triggering(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(20)))
.accumulatingFiredPanes());
final PCollection<KV<String, String>> hbase = hbaseResult.apply(ParDo
.of(new DoFn<Result, KV<String, String>>() {
@ProcessElement
public void processElement(final ProcessContext c) {
final Cell cell = c.element().getColumnLatestCell(Bytes.toBytes("m"), Bytes.toBytes("word"));
c.output(KV.of(Bytes.toString(cell.getValue()), Bytes.toString(cell.getValue())));
}
}));
final TupleTag<String> streamTag = new TupleTag<>();
final TupleTag<String> hbaseTag = new TupleTag<>();
final PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(streamTag, windowedStream)
.and(hbaseTag, hbase).apply(CoGroupByKey.<String>create());
results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
private static final long serialVersionUID = -6419857828265091278L;
/* If join works, the output will be of the form
* Key: <common key>
* Stream
* <value1>
* HBase
* <value2>
* --------------------
*/
@ProcessElement
public void processElement(final ProcessContext c) {
final KV<String, CoGbkResult> e = c.element();
final String key = e.getKey();
final Iterable<String> streamIt = e.getValue().getAll(streamTag);
final Iterable<String> hbaseIt = e.getValue().getAll(hbaseTag);
System.out.println("Key: " + key);
System.out.println("Stream");
for (final String s : streamIt) {
System.out.print(s);
}
System.out.println();
System.out.println("HBase");
for (final String h : hbaseIt) {
System.out.print(h);
}
System.out.println();
System.out.println("------------------");
c.output(key);
}
}));
pi.run().waitUntilFinish();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment