Skip to content

Instantly share code, notes, and snippets.

@johntbush
Created August 13, 2019 02:25
Show Gist options
  • Save johntbush/b30cda4e97ec65ea911110ccc77a3907 to your computer and use it in GitHub Desktop.
Save johntbush/b30cda4e97ec65ea911110ccc77a3907 to your computer and use it in GitHub Desktop.
beam java sample
package example.java;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
public class OrderSum {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
ObjectMapper objectMapper = new ObjectMapper();
PCollection<Order> orders =
p.apply(TextIO.read().from("src/test/resources/orders.json"))
.apply(ParDo.of(new DoFn<String, Order>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(objectMapper.readValue(c.element(), Order.class));
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}));
PCollection<Refund> refunds =
p.apply(TextIO.read().from("src/test/resources/refunds.json"))
.apply(ParDo.of(new DoFn<String, Refund>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(objectMapper.readValue(c.element(), Refund.class));
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}));
PCollection<KV<String, LineItem>> filteredOrders =
orders
.apply(
MapElements
.into(TypeDescriptor.of(LineItem.class))
.via((SerializableFunction<Order, LineItem>) order -> new LineItem(order.getOrder_id(),
order.getCustomer_id(), order.getOrder_amt())))
.apply(Filter.by((SerializableFunction<LineItem, Boolean>) c -> c.getCustomerId().equals("a")))
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)));
PCollection<KV<String, LineItem>> mappedRefunds =
refunds
.apply(MapElements.into(TypeDescriptor.of(LineItem.class))
.via((SerializableFunction<Refund, LineItem>) refund -> new LineItem(refund.getOriginal_order_id(),
refund.getCustomer_id(), refund.getRefund_amt())))
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)))
.apply(Combine.perKey((SerializableFunction<Iterable<LineItem>, LineItem>) input -> {
LineItem first = Iterables.get(input, 0);
String orderId = first.getOrderId();
String customerId = first.getCustomerId();
Long total =
StreamSupport
.stream(input.spliterator(), false)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2)
.orElse(0L);
return new LineItem(orderId, customerId, total);
}));
final TupleTag<LineItem> t1 = new TupleTag<>();
final TupleTag<LineItem> t2 = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> groupedResult =
KeyedPCollectionTuple.of(t1, filteredOrders)
.and(t2, mappedRefunds)
.apply(CoGroupByKey.create());
PCollection<Long> finalResultCollection =
groupedResult.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
Optional<Long> ordersSum =
StreamSupport
.stream(e.getValue().getAll(t1).spliterator(), true)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2);
Optional<Long> refundsSum =
StreamSupport
.stream(e.getValue().getAll(t2).spliterator(), true)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2);
Long total = refundsSum.orElse(0L) + ordersSum.orElse(0L);
c.output(total);
}
})).apply(Combine.globally(Sum.ofLongs()));
finalResultCollection.apply(logRecords("result:"));
p.run().waitUntilFinish();
}
private static MapElements<Long, Void> logRecords(String prefix) {
return MapElements.via(
new SimpleFunction<Long, Void>() {
public @Nullable
Void apply(Long input) {
System.out.println(prefix + ":" + input.toString());
return null;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment