-
-
Save shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb to your computer and use it in GitHub Desktop.
Snippet to join data from Hbase and Kafka using FixedWindows strategy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final Pipeline pi = Pipeline.create(directRunnerOptions); | |
// read from kafka | |
final PCollection<KV<String, String>> stream = pi.apply(KafkaIO.<String, String>read() | |
.withBootstrapServers("kafka:29092").withTopic("words").withKeyDeserializer(StringDeserializer.class) | |
.withValueDeserializer(StringDeserializer.class) | |
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object) "earliest")).withoutMetadata()); | |
// apply fixed window and trigger | |
final PCollection<KV<String, String>> windowedStream = stream.apply(Window | |
.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(20))) | |
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(20))) | |
.accumulatingFiredPanes()); | |
// read from hbase, apply fixed window and trigger | |
final PCollection<Result> hbaseResult = pi.apply( | |
HBaseIO.read().withConfiguration(conf).withTableId(TableName.valueOf(TABLE_NAME).getNameAsString())) | |
.apply(Window | |
.<Result>into(FixedWindows.of(Duration.standardSeconds(20))) | |
.triggering( | |
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(20))) | |
.accumulatingFiredPanes()); | |
final PCollection<KV<String, String>> hbase = hbaseResult.apply(ParDo | |
.of(new DoFn<Result, KV<String, String>>() { | |
@ProcessElement | |
public void processElement(final ProcessContext c) { | |
final Cell cell = c.element().getColumnLatestCell(Bytes.toBytes("m"), Bytes.toBytes("word")); | |
c.output(KV.of(Bytes.toString(cell.getValue()), Bytes.toString(cell.getValue()))); | |
} | |
})); | |
final TupleTag<String> streamTag = new TupleTag<>(); | |
final TupleTag<String> hbaseTag = new TupleTag<>(); | |
final PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(streamTag, windowedStream) | |
.and(hbaseTag, hbase).apply(CoGroupByKey.<String>create()); | |
results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() { | |
private static final long serialVersionUID = -6419857828265091278L; | |
/* If join works, the output will be of the form | |
* Key: <common key> | |
* Stream | |
* <value1> | |
* HBase | |
* <value2> | |
* -------------------- | |
*/ | |
@ProcessElement | |
public void processElement(final ProcessContext c) { | |
final KV<String, CoGbkResult> e = c.element(); | |
final String key = e.getKey(); | |
final Iterable<String> streamIt = e.getValue().getAll(streamTag); | |
final Iterable<String> hbaseIt = e.getValue().getAll(hbaseTag); | |
System.out.println("Key: " + key); | |
System.out.println("Stream"); | |
for (final String s : streamIt) { | |
System.out.print(s); | |
} | |
System.out.println(); | |
System.out.println("HBase"); | |
for (final String h : hbaseIt) { | |
System.out.print(h); | |
} | |
System.out.println(); | |
System.out.println("------------------"); | |
c.output(key); | |
} | |
})); | |
pi.run().waitUntilFinish(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment