Skip to content

Instantly share code, notes, and snippets.

@khaitranq
khaitranq / DedupFn.java
Created May 27, 2020 18:30
Beam message DedupFn
public class DedupFn<K, V> extends DoFn<KV<K, V>, KV<K, V>> {
private static final String STATE_ID = "messageDedup";
private final Duration dedupWindow;
@StateId(STATE_ID)
private final StateSpec<ValueState<Long>> lastAccessedTime = StateSpecs.value(VarLongCoder.of());
public DedupFn(Duration dedupWindow) {
this.dedupWindow = dedupWindow;
@khaitranq
khaitranq / example.plan
Created January 28, 2019 19:36
Calcite logical plan converted from the example Pig script at https://gist.github.com/likhtran/1d06c27832f15fa52a4a7e2fa7bec340
LogicalProject(intColGroup=[$1], stringColGroup=[$0], doubleMax=[$2], longSum=[$3])
LogicalAggregate(group=[{0, 1}], agg#0=[MAX($2)], agg#1=[SUM($3)])
LogicalProject(stringCol=[$1], intCol=[$0], doubleCol=[$2], longCol=[$5])
LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
LogicalFilter(condition=[AND(>($2, 1.0E0), =($1, 'hello world'))])
LogicalProject(intCol=[$7], stringCol=[$8], doubleCol=[$5])
LogicalTableScan(table=[[dummy]])
LogicalFilter(condition=[ENDSWITH(PIG_TUPLE($1, 'hello world'))])
LogicalProject(intCol=[$7], stringCol=[$8], longCol=[$6])
LogicalTableScan(table=[[dummy]])
@khaitranq
khaitranq / example.pig
Last active January 15, 2019 20:57
An example Pig script used to generate the following Samza Beam API code: https://gist.github.com/likhtran/785dbb8495cd382788f3ca8200231d84
inputData = load 'dummy' using AvroStorage();
project1 = foreach inputData generate intCol, stringCol, doubleCol;
filter1 = filter project1 by doubleCol > 1.0 and stringCol == 'hello world';
project2 = foreach inputData generate intCol, stringCol, longCol;
filter2 = filter project2 by ENDSWITH(stringCol, 'hello world');
joined = foreach(JOIN filter1 by (intCol, stringCol), filter2 by (intCol, stringCol)) generate
filter1::stringCol as stringCol,
filter1::intCol as intCol,
filter1::doubleCol as doubleCol,
filter2::longCol as longCol;
@khaitranq
khaitranq / ExampleApplication.java
Last active January 15, 2019 20:57
Example of Beam API code auto-generated from the following Pig script: https://gist.github.com/likhtran/1d06c27832f15fa52a4a7e2fa7bec340
import com.linkedin.beam.io.LiKafkaIO;
import com.linkedin.beam.io.LiKafkaIOConfig;
import com.linkedin.beam.pipeline.SamzaPipelineOptionsFactory;
import com.linkedin.beam.transforms.Joins;
import com.linkedin.ump.samza.excution.SamzaExecUtil;
import java.util.Iterator;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;