Created
August 13, 2019 02:25
-
-
Save johntbush/b30cda4e97ec65ea911110ccc77a3907 to your computer and use it in GitHub Desktop.
beam java sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.java; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.google.common.collect.Iterables; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.Combine; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.Filter; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.apache.beam.sdk.transforms.Sum; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.join.CoGbkResult; | |
import org.apache.beam.sdk.transforms.join.CoGroupByKey; | |
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TupleTag; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
import java.io.IOException; | |
import java.util.Optional; | |
import java.util.stream.StreamSupport; | |
import javax.annotation.Nullable; | |
public class OrderSum { | |
public static void main(String[] args) { | |
PipelineOptions options = PipelineOptionsFactory.create(); | |
Pipeline p = Pipeline.create(options); | |
ObjectMapper objectMapper = new ObjectMapper(); | |
PCollection<Order> orders = | |
p.apply(TextIO.read().from("src/test/resources/orders.json")) | |
.apply(ParDo.of(new DoFn<String, Order>() { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
try { | |
c.output(objectMapper.readValue(c.element(), Order.class)); | |
} catch (IOException e) { | |
throw new RuntimeException(e.getMessage(), e); | |
} | |
} | |
})); | |
PCollection<Refund> refunds = | |
p.apply(TextIO.read().from("src/test/resources/refunds.json")) | |
.apply(ParDo.of(new DoFn<String, Refund>() { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
try { | |
c.output(objectMapper.readValue(c.element(), Refund.class)); | |
} catch (IOException e) { | |
throw new RuntimeException(e.getMessage(), e); | |
} | |
} | |
})); | |
PCollection<KV<String, LineItem>> filteredOrders = | |
orders | |
.apply( | |
MapElements | |
.into(TypeDescriptor.of(LineItem.class)) | |
.via((SerializableFunction<Order, LineItem>) order -> new LineItem(order.getOrder_id(), | |
order.getCustomer_id(), order.getOrder_amt()))) | |
.apply(Filter.by((SerializableFunction<LineItem, Boolean>) c -> c.getCustomerId().equals("a"))) | |
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class))); | |
PCollection<KV<String, LineItem>> mappedRefunds = | |
refunds | |
.apply(MapElements.into(TypeDescriptor.of(LineItem.class)) | |
.via((SerializableFunction<Refund, LineItem>) refund -> new LineItem(refund.getOriginal_order_id(), | |
refund.getCustomer_id(), refund.getRefund_amt()))) | |
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class))) | |
.apply(Combine.perKey((SerializableFunction<Iterable<LineItem>, LineItem>) input -> { | |
LineItem first = Iterables.get(input, 0); | |
String orderId = first.getOrderId(); | |
String customerId = first.getCustomerId(); | |
Long total = | |
StreamSupport | |
.stream(input.spliterator(), false) | |
.map((line1) -> line1.getAmount()) | |
.reduce((line1, line2) -> line1 + line2) | |
.orElse(0L); | |
return new LineItem(orderId, customerId, total); | |
})); | |
final TupleTag<LineItem> t1 = new TupleTag<>(); | |
final TupleTag<LineItem> t2 = new TupleTag<>(); | |
PCollection<KV<String, CoGbkResult>> groupedResult = | |
KeyedPCollectionTuple.of(t1, filteredOrders) | |
.and(t2, mappedRefunds) | |
.apply(CoGroupByKey.create()); | |
PCollection<Long> finalResultCollection = | |
groupedResult.apply(ParDo.of( | |
new DoFn<KV<String, CoGbkResult>, Long>() { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
KV<String, CoGbkResult> e = c.element(); | |
Optional<Long> ordersSum = | |
StreamSupport | |
.stream(e.getValue().getAll(t1).spliterator(), true) | |
.map((line1) -> line1.getAmount()) | |
.reduce((line1, line2) -> line1 + line2); | |
Optional<Long> refundsSum = | |
StreamSupport | |
.stream(e.getValue().getAll(t2).spliterator(), true) | |
.map((line1) -> line1.getAmount()) | |
.reduce((line1, line2) -> line1 + line2); | |
Long total = refundsSum.orElse(0L) + ordersSum.orElse(0L); | |
c.output(total); | |
} | |
})).apply(Combine.globally(Sum.ofLongs())); | |
finalResultCollection.apply(logRecords("result:")); | |
p.run().waitUntilFinish(); | |
} | |
private static MapElements<Long, Void> logRecords(String prefix) { | |
return MapElements.via( | |
new SimpleFunction<Long, Void>() { | |
public @Nullable | |
Void apply(Long input) { | |
System.out.println(prefix + ":" + input.toString()); | |
return null; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment