Skip to content

Instantly share code, notes, and snippets.

@khaitranq
Last active January 15, 2019 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save khaitranq/785dbb8495cd382788f3ca8200231d84 to your computer and use it in GitHub Desktop.
Save khaitranq/785dbb8495cd382788f3ca8200231d84 to your computer and use it in GitHub Desktop.
Example of Beam API code auto-generated from the following Pig script: https://gist.github.com/likhtran/1d06c27832f15fa52a4a7e2fa7bec340
import com.linkedin.beam.io.LiKafkaIO;
import com.linkedin.beam.io.LiKafkaIOConfig;
import com.linkedin.beam.pipeline.SamzaPipelineOptionsFactory;
import com.linkedin.beam.transforms.Joins;
import com.linkedin.ump.samza.excution.SamzaExecUtil;
import java.util.Iterator;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.calcite.piglet.PigRelSqlUDFs;
import org.apache.pig.builtin.ENDSWITH;
import org.apache.pig.data.Tuple;
import org.joda.time.Duration;
public class ExampleApplication {
private static final Schema PROJECT3_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]}]}");
private static final Schema PROJECT5_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}");
private static final Schema JOIN6_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]},{\"name\":\"intCol0\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol0\",\"type\":[\"string\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}");
private static final Schema PROJECT7_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}");
private static final Schema AGGREGATE8_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"_f2\",\"type\":[\"double\",\"null\"]},{\"name\":\"_f3\",\"type\":[\"long\",\"null\"]}]}");
private static final Schema PROJECT9_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intColGroup\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringColGroup\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleMax\",\"type\":[\"double\",\"null\"]},{\"name\":\"longSum\",\"type\":[\"long\",\"null\"]}]}");
private static boolean staticVarInit = false;
private static ENDSWITH org_apache_pig_builtin_ENDSWITH;
public static void main(String[] args) {
if (!staticVarInit) {
org_apache_pig_builtin_ENDSWITH = new ENDSWITH();
staticVarInit = true;
}
final SamzaPipelineOptions pipelineOpts = SamzaPipelineOptionsFactory.create(args);
final LiKafkaIOConfig kafkaConfig = LiKafkaIOConfig.Factory.create(pipelineOpts);
pipelineOpts.setStateDurable(Boolean.valueOf(true));
final Pipeline pipeline = Pipeline.create(pipelineOpts);
PCollection<KV<String, GenericRecord>> stream1 = pipeline.apply("Read inputStream", LiKafkaIO.readGenericRecord()
.withTopic("inputStream")
.withConsumerConfig(kafkaConfig.getConsumerConfig(LiKafkaIOConfig.ClusterName.AGGREGATE_TRACKING))
.withoutMetadata());
PCollection<KV<String, GenericRecord>> filter2 =
stream1.apply(Filter.by(new SerializableFunction<KV<String, GenericRecord>, Boolean>() {
public Boolean apply(KV<String, GenericRecord> stream1KV) {
final GenericRecord stream1Record = stream1KV.getValue();
final Double var0 = (Double) stream1Record.get("doubleCol");
final String var1 = Objects.toString(stream1Record.get("stringCol"), null);
return var0 != null && var0 > 1.0D && (var1 != null && var1.equals("hello world"));
}
}));
PCollection<KV<String, GenericRecord>> project3 =
filter2.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() {
public KV<String, GenericRecord> apply(KV<String, GenericRecord> filter2KV) {
final GenericRecord filter2Record = filter2KV.getValue();
GenericRecord project3Record = new GenericData.Record(PROJECT3_SCHEMA);
final Integer var0 = (Integer) filter2Record.get("intCol");
project3Record.put("intCol", var0);
final String var1 = Objects.toString(filter2Record.get("stringCol"), null);
project3Record.put("stringCol", var1);
final Double var2 = (Double) filter2Record.get("doubleCol");
project3Record.put("doubleCol", var2);
return KV.of(filter2KV.getKey(), project3Record);
}
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT3_SCHEMA)));
PCollection<KV<String, GenericRecord>> filter4 =
stream1.apply(Filter.by(new SerializableFunction<KV<String, GenericRecord>, Boolean>() {
public Boolean apply(KV<String, GenericRecord> stream1KV) {
final GenericRecord stream1Record = stream1KV.getValue();
final String var0 = Objects.toString(stream1Record.get("stringCol"), null);
final Tuple var1 = PigRelSqlUDFs.buildTuple(var0, "hello world");
final Boolean var2 = org_apache_pig_builtin_ENDSWITH.exec(var1);
return var2;
}
}));
PCollection<KV<String, GenericRecord>> project5 =
filter4.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() {
public KV<String, GenericRecord> apply(KV<String, GenericRecord> filter4KV) {
final GenericRecord filter4Record = filter4KV.getValue();
GenericRecord project5Record = new GenericData.Record(PROJECT5_SCHEMA);
final Integer var0 = (Integer) filter4Record.get("intCol");
project5Record.put("intCol", var0);
final String var1 = Objects.toString(filter4Record.get("stringCol"), null);
project5Record.put("stringCol", var1);
final Long var2 = (Long) filter4Record.get("longCol");
project5Record.put("longCol", var2);
return KV.of(filter4KV.getKey(), project5Record);
}
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT5_SCHEMA)));
PCollection<KV<String, GenericRecord>> join6Left =
project3.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() {
public String apply(GenericRecord project3Record) {
return SamzaExecUtil.buildStringKey(Objects.toString((Integer) project3Record.get("intCol"), null),
Objects.toString(project3Record.get("stringCol"), null));
}
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))));
PCollection<KV<String, GenericRecord>> join6Right =
project5.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() {
public String apply(GenericRecord project5Record) {
return SamzaExecUtil.buildStringKey(Objects.toString((Integer) project5Record.get("intCol"), null),
Objects.toString(project5Record.get("stringCol"), null));
}
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))));
PCollection<KV<String, GenericRecord>> join6 =
join6Left.apply(Joins.InnerJoin.with(join6Right).withoutRepartition())
.apply(MapElements.via(
new SimpleFunction<KV<String, KV<GenericRecord, GenericRecord>>, KV<String, GenericRecord>>() {
public KV<String, GenericRecord> apply(KV<String, KV<GenericRecord, GenericRecord>> join6KVV) {
GenericRecord project3Record = ((KV<GenericRecord, GenericRecord>) join6KVV.getValue()).getKey();
GenericRecord project5Record = ((KV<GenericRecord, GenericRecord>) join6KVV.getValue()).getValue();
GenericRecord join6Record = new GenericData.Record(JOIN6_SCHEMA);
join6Record.put("intCol", project3Record == null ? null : project3Record.get("intCol"));
join6Record.put("stringCol", project3Record == null ? null
: (Object) Objects.toString(project3Record.get("stringCol"), null));
join6Record.put("doubleCol", project3Record == null ? null : project3Record.get("doubleCol"));
join6Record.put("intCol0", project5Record == null ? null : project5Record.get("intCol"));
join6Record.put("stringCol0", project5Record == null ? null
: (Object) Objects.toString(project5Record.get("stringCol"), null));
join6Record.put("longCol", project5Record == null ? null : project5Record.get("longCol"));
return KV.of(join6KVV.getKey(), join6Record);
}
}))
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(JOIN6_SCHEMA)))
.apply(Window.into(new GlobalWindows()));
PCollection<KV<String, GenericRecord>> project7 =
join6.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() {
public KV<String, GenericRecord> apply(KV<String, GenericRecord> join6KV) {
final GenericRecord join6Record = join6KV.getValue();
GenericRecord project7Record = new GenericData.Record(PROJECT7_SCHEMA);
final String var0 = Objects.toString(join6Record.get("stringCol"), null);
project7Record.put("stringCol", var0);
final Integer var1 = (Integer) join6Record.get("intCol");
project7Record.put("intCol", var1);
final Double var2 = (Double) join6Record.get("doubleCol");
project7Record.put("doubleCol", var2);
final Long var3 = (Long) join6Record.get("longCol");
project7Record.put("longCol", var3);
return KV.of(join6KV.getKey(), project7Record);
}
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT7_SCHEMA)));
PCollection<KV<String, GenericRecord>> aggregate8KV =
project7.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() {
public String apply(GenericRecord project7Record) {
return SamzaExecUtil.buildStringKey(Objects.toString(project7Record.get("stringCol"), null),
Objects.toString((Integer) project7Record.get("intCol"), null));
}
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(15L))));
PCollection<KV<String, GenericRecord>> aggregate8 =
aggregate8KV.apply(Combine.perKey(new SerializableFunction<Iterable<GenericRecord>, GenericRecord>() {
public GenericRecord apply(Iterable<GenericRecord> project7GroupedRecords) {
GenericRecord aggregate8Record = new GenericData.Record(AGGREGATE8_SCHEMA);
Iterator<GenericRecord> iterator = project7GroupedRecords.iterator();
GenericRecord inputRecord = iterator.next();
aggregate8Record.put("stringCol", (Object) Objects.toString(inputRecord.get("stringCol"), null));
aggregate8Record.put("intCol", inputRecord.get("intCol"));
double aggregate8_f2 = (Double) inputRecord.get("doubleCol");
long aggregate8_f3 = 0;
boolean doLoop = true;
while (doLoop) {
Double project7doubleCol = (Double) inputRecord.get("doubleCol");
Long project7longCol = (Long) inputRecord.get("longCol");
if (project7doubleCol != null) {
aggregate8_f2 = aggregate8_f2 > project7doubleCol ? aggregate8_f2 : project7doubleCol;
}
if (project7longCol != null) {
aggregate8_f3 = aggregate8_f3 + project7longCol;
}
if (iterator.hasNext()) {
inputRecord = iterator.next();
} else {
doLoop = false;
}
}
aggregate8Record.put("_f2", (Object) aggregate8_f2);
aggregate8Record.put("_f3", (Object) aggregate8_f3);
return aggregate8Record;
}
}))
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(AGGREGATE8_SCHEMA)))
.apply(Window.into(new GlobalWindows()));
PCollection<KV<String, GenericRecord>> project9 =
aggregate8.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() {
public KV<String, GenericRecord> apply(KV<String, GenericRecord> aggregate8KV) {
final GenericRecord aggregate8Record = aggregate8KV.getValue();
GenericRecord project9Record = new GenericData.Record(PROJECT9_SCHEMA);
final Integer var0 = (Integer) aggregate8Record.get("intCol");
project9Record.put("intColGroup", var0);
final String var1 = Objects.toString(aggregate8Record.get("stringCol"), null);
project9Record.put("stringColGroup", var1);
final Double var2 = (Double) aggregate8Record.get("_f2");
project9Record.put("doubleMax", var2);
final Long var3 = (Long) aggregate8Record.get("_f3");
project9Record.put("longSum", var3);
return KV.of(aggregate8KV.getKey(), project9Record);
}
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT9_SCHEMA)));
project9.apply(LiKafkaIO.writeGenericRecord()
.withTopic("example_output")
.withProducerConfig(kafkaConfig.getProducerConfig(LiKafkaIOConfig.ClusterName.QUEUING)));
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment