Skip to content

Instantly share code, notes, and snippets.

@jacace
Created March 8, 2021 19:56
Show Gist options
  • Save jacace/6965669d626aefb1eb2bb5db4d14bd1c to your computer and use it in GitHub Desktop.
Save jacace/6965669d626aefb1eb2bb5db4d14bd1c to your computer and use it in GitHub Desktop.
Joins in Java Spak Streaming API
private static void joinDemo(JavaDStream<ConsumerRecord<String, String>> productStream,
JavaDStream<ConsumerRecord<String, String>> salesStream) {
ObjectMapper jacksonParser = new ObjectMapper();
JavaPairDStream<Object, Object> s1 = productStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(),
jacksonParser.readValue(record.value(), Item.class)));
JavaPairDStream<Object, Object> s2 = salesStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(),
jacksonParser.readValue(record.value(), DailySales.class)));
JavaPairDStream<Object, Tuple2<Object, Object>> s3 = s1.join(s2);
s3.foreachRDD(new VoidFunction<JavaPairRDD<Object, Tuple2<Object, Object>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaPairRDD<Object, Tuple2<Object, Object>> rdd) throws Exception {
System.out.println("Num of Records in RDD: " + Long.toString(rdd.count()));
rdd.foreach(data -> {
System.out.println("Key: " + data._1().toString() + ". Obj in Value 1: " + ((Item) data._2()._1()).name
+ ". Obj in Value 2: " + Integer.toString(((DailySales) data._2()._2()).soldUnits));
});
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment