Last active
January 15, 2019 20:57
Example of Beam API code auto-generated from the following Pig script: https://gist.github.com/likhtran/1d06c27832f15fa52a4a7e2fa7bec340
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.linkedin.beam.io.LiKafkaIO; | |
import com.linkedin.beam.io.LiKafkaIOConfig; | |
import com.linkedin.beam.pipeline.SamzaPipelineOptionsFactory; | |
import com.linkedin.beam.transforms.Joins; | |
import com.linkedin.ump.samza.excution.SamzaExecUtil; | |
import java.util.Iterator; | |
import java.util.Objects; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.beam.runners.samza.SamzaPipelineOptions; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.AvroCoder; | |
import org.apache.beam.sdk.coders.KvCoder; | |
import org.apache.beam.sdk.coders.StringUtf8Coder; | |
import org.apache.beam.sdk.transforms.Combine; | |
import org.apache.beam.sdk.transforms.Filter; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.apache.beam.sdk.transforms.Values; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.GlobalWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import org.apache.calcite.piglet.PigRelSqlUDFs; | |
import org.apache.pig.builtin.ENDSWITH; | |
import org.apache.pig.data.Tuple; | |
import org.joda.time.Duration; | |
public class ExampleApplication { | |
private static final Schema PROJECT3_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]}]}"); | |
private static final Schema PROJECT5_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}"); | |
private static final Schema JOIN6_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]},{\"name\":\"intCol0\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringCol0\",\"type\":[\"string\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}"); | |
private static final Schema PROJECT7_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"doubleCol\",\"type\":[\"double\",\"null\"]},{\"name\":\"longCol\",\"type\":[\"long\",\"null\"]}]}"); | |
private static final Schema AGGREGATE8_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"stringCol\",\"type\":[\"string\",\"null\"]},{\"name\":\"intCol\",\"type\":[\"int\",\"null\"]},{\"name\":\"_f2\",\"type\":[\"double\",\"null\"]},{\"name\":\"_f3\",\"type\":[\"long\",\"null\"]}]}"); | |
private static final Schema PROJECT9_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"record0\",\"namespace\":\"rel_avro\",\"fields\":[{\"name\":\"intColGroup\",\"type\":[\"int\",\"null\"]},{\"name\":\"stringColGroup\",\"type\":[\"string\",\"null\"]},{\"name\":\"doubleMax\",\"type\":[\"double\",\"null\"]},{\"name\":\"longSum\",\"type\":[\"long\",\"null\"]}]}"); | |
private static boolean staticVarInit = false; | |
private static ENDSWITH org_apache_pig_builtin_ENDSWITH; | |
public static void main(String[] args) { | |
if (!staticVarInit) { | |
org_apache_pig_builtin_ENDSWITH = new ENDSWITH(); | |
staticVarInit = true; | |
} | |
final SamzaPipelineOptions pipelineOpts = SamzaPipelineOptionsFactory.create(args); | |
final LiKafkaIOConfig kafkaConfig = LiKafkaIOConfig.Factory.create(pipelineOpts); | |
pipelineOpts.setStateDurable(Boolean.valueOf(true)); | |
final Pipeline pipeline = Pipeline.create(pipelineOpts); | |
PCollection<KV<String, GenericRecord>> stream1 = pipeline.apply("Read inputStream", LiKafkaIO.readGenericRecord() | |
.withTopic("inputStream") | |
.withConsumerConfig(kafkaConfig.getConsumerConfig(LiKafkaIOConfig.ClusterName.AGGREGATE_TRACKING)) | |
.withoutMetadata()); | |
PCollection<KV<String, GenericRecord>> filter2 = | |
stream1.apply(Filter.by(new SerializableFunction<KV<String, GenericRecord>, Boolean>() { | |
public Boolean apply(KV<String, GenericRecord> stream1KV) { | |
final GenericRecord stream1Record = stream1KV.getValue(); | |
final Double var0 = (Double) stream1Record.get("doubleCol"); | |
final String var1 = Objects.toString(stream1Record.get("stringCol"), null); | |
return var0 != null && var0 > 1.0D && (var1 != null && var1.equals("hello world")); | |
} | |
})); | |
PCollection<KV<String, GenericRecord>> project3 = | |
filter2.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() { | |
public KV<String, GenericRecord> apply(KV<String, GenericRecord> filter2KV) { | |
final GenericRecord filter2Record = filter2KV.getValue(); | |
GenericRecord project3Record = new GenericData.Record(PROJECT3_SCHEMA); | |
final Integer var0 = (Integer) filter2Record.get("intCol"); | |
project3Record.put("intCol", var0); | |
final String var1 = Objects.toString(filter2Record.get("stringCol"), null); | |
project3Record.put("stringCol", var1); | |
final Double var2 = (Double) filter2Record.get("doubleCol"); | |
project3Record.put("doubleCol", var2); | |
return KV.of(filter2KV.getKey(), project3Record); | |
} | |
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT3_SCHEMA))); | |
PCollection<KV<String, GenericRecord>> filter4 = | |
stream1.apply(Filter.by(new SerializableFunction<KV<String, GenericRecord>, Boolean>() { | |
public Boolean apply(KV<String, GenericRecord> stream1KV) { | |
final GenericRecord stream1Record = stream1KV.getValue(); | |
final String var0 = Objects.toString(stream1Record.get("stringCol"), null); | |
final Tuple var1 = PigRelSqlUDFs.buildTuple(var0, "hello world"); | |
final Boolean var2 = org_apache_pig_builtin_ENDSWITH.exec(var1); | |
return var2; | |
} | |
})); | |
PCollection<KV<String, GenericRecord>> project5 = | |
filter4.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() { | |
public KV<String, GenericRecord> apply(KV<String, GenericRecord> filter4KV) { | |
final GenericRecord filter4Record = filter4KV.getValue(); | |
GenericRecord project5Record = new GenericData.Record(PROJECT5_SCHEMA); | |
final Integer var0 = (Integer) filter4Record.get("intCol"); | |
project5Record.put("intCol", var0); | |
final String var1 = Objects.toString(filter4Record.get("stringCol"), null); | |
project5Record.put("stringCol", var1); | |
final Long var2 = (Long) filter4Record.get("longCol"); | |
project5Record.put("longCol", var2); | |
return KV.of(filter4KV.getKey(), project5Record); | |
} | |
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT5_SCHEMA))); | |
PCollection<KV<String, GenericRecord>> join6Left = | |
project3.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() { | |
public String apply(GenericRecord project3Record) { | |
return SamzaExecUtil.buildStringKey(Objects.toString((Integer) project3Record.get("intCol"), null), | |
Objects.toString(project3Record.get("stringCol"), null)); | |
} | |
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))); | |
PCollection<KV<String, GenericRecord>> join6Right = | |
project5.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() { | |
public String apply(GenericRecord project5Record) { | |
return SamzaExecUtil.buildStringKey(Objects.toString((Integer) project5Record.get("intCol"), null), | |
Objects.toString(project5Record.get("stringCol"), null)); | |
} | |
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))); | |
PCollection<KV<String, GenericRecord>> join6 = | |
join6Left.apply(Joins.InnerJoin.with(join6Right).withoutRepartition()) | |
.apply(MapElements.via( | |
new SimpleFunction<KV<String, KV<GenericRecord, GenericRecord>>, KV<String, GenericRecord>>() { | |
public KV<String, GenericRecord> apply(KV<String, KV<GenericRecord, GenericRecord>> join6KVV) { | |
GenericRecord project3Record = ((KV<GenericRecord, GenericRecord>) join6KVV.getValue()).getKey(); | |
GenericRecord project5Record = ((KV<GenericRecord, GenericRecord>) join6KVV.getValue()).getValue(); | |
GenericRecord join6Record = new GenericData.Record(JOIN6_SCHEMA); | |
join6Record.put("intCol", project3Record == null ? null : project3Record.get("intCol")); | |
join6Record.put("stringCol", project3Record == null ? null | |
: (Object) Objects.toString(project3Record.get("stringCol"), null)); | |
join6Record.put("doubleCol", project3Record == null ? null : project3Record.get("doubleCol")); | |
join6Record.put("intCol0", project5Record == null ? null : project5Record.get("intCol")); | |
join6Record.put("stringCol0", project5Record == null ? null | |
: (Object) Objects.toString(project5Record.get("stringCol"), null)); | |
join6Record.put("longCol", project5Record == null ? null : project5Record.get("longCol")); | |
return KV.of(join6KVV.getKey(), join6Record); | |
} | |
})) | |
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(JOIN6_SCHEMA))) | |
.apply(Window.into(new GlobalWindows())); | |
PCollection<KV<String, GenericRecord>> project7 = | |
join6.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() { | |
public KV<String, GenericRecord> apply(KV<String, GenericRecord> join6KV) { | |
final GenericRecord join6Record = join6KV.getValue(); | |
GenericRecord project7Record = new GenericData.Record(PROJECT7_SCHEMA); | |
final String var0 = Objects.toString(join6Record.get("stringCol"), null); | |
project7Record.put("stringCol", var0); | |
final Integer var1 = (Integer) join6Record.get("intCol"); | |
project7Record.put("intCol", var1); | |
final Double var2 = (Double) join6Record.get("doubleCol"); | |
project7Record.put("doubleCol", var2); | |
final Long var3 = (Long) join6Record.get("longCol"); | |
project7Record.put("longCol", var3); | |
return KV.of(join6KV.getKey(), project7Record); | |
} | |
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT7_SCHEMA))); | |
PCollection<KV<String, GenericRecord>> aggregate8KV = | |
project7.apply(Values.create()).apply(WithKeys.of(new SerializableFunction<GenericRecord, String>() { | |
public String apply(GenericRecord project7Record) { | |
return SamzaExecUtil.buildStringKey(Objects.toString(project7Record.get("stringCol"), null), | |
Objects.toString((Integer) project7Record.get("intCol"), null)); | |
} | |
}).withKeyType(TypeDescriptors.strings())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(15L)))); | |
PCollection<KV<String, GenericRecord>> aggregate8 = | |
aggregate8KV.apply(Combine.perKey(new SerializableFunction<Iterable<GenericRecord>, GenericRecord>() { | |
public GenericRecord apply(Iterable<GenericRecord> project7GroupedRecords) { | |
GenericRecord aggregate8Record = new GenericData.Record(AGGREGATE8_SCHEMA); | |
Iterator<GenericRecord> iterator = project7GroupedRecords.iterator(); | |
GenericRecord inputRecord = iterator.next(); | |
aggregate8Record.put("stringCol", (Object) Objects.toString(inputRecord.get("stringCol"), null)); | |
aggregate8Record.put("intCol", inputRecord.get("intCol")); | |
double aggregate8_f2 = (Double) inputRecord.get("doubleCol"); | |
long aggregate8_f3 = 0; | |
boolean doLoop = true; | |
while (doLoop) { | |
Double project7doubleCol = (Double) inputRecord.get("doubleCol"); | |
Long project7longCol = (Long) inputRecord.get("longCol"); | |
if (project7doubleCol != null) { | |
aggregate8_f2 = aggregate8_f2 > project7doubleCol ? aggregate8_f2 : project7doubleCol; | |
} | |
if (project7longCol != null) { | |
aggregate8_f3 = aggregate8_f3 + project7longCol; | |
} | |
if (iterator.hasNext()) { | |
inputRecord = iterator.next(); | |
} else { | |
doLoop = false; | |
} | |
} | |
aggregate8Record.put("_f2", (Object) aggregate8_f2); | |
aggregate8Record.put("_f3", (Object) aggregate8_f3); | |
return aggregate8Record; | |
} | |
})) | |
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(AGGREGATE8_SCHEMA))) | |
.apply(Window.into(new GlobalWindows())); | |
PCollection<KV<String, GenericRecord>> project9 = | |
aggregate8.apply(MapElements.via(new SimpleFunction<KV<String, GenericRecord>, KV<String, GenericRecord>>() { | |
public KV<String, GenericRecord> apply(KV<String, GenericRecord> aggregate8KV) { | |
final GenericRecord aggregate8Record = aggregate8KV.getValue(); | |
GenericRecord project9Record = new GenericData.Record(PROJECT9_SCHEMA); | |
final Integer var0 = (Integer) aggregate8Record.get("intCol"); | |
project9Record.put("intColGroup", var0); | |
final String var1 = Objects.toString(aggregate8Record.get("stringCol"), null); | |
project9Record.put("stringColGroup", var1); | |
final Double var2 = (Double) aggregate8Record.get("_f2"); | |
project9Record.put("doubleMax", var2); | |
final Long var3 = (Long) aggregate8Record.get("_f3"); | |
project9Record.put("longSum", var3); | |
return KV.of(aggregate8KV.getKey(), project9Record); | |
} | |
})).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(PROJECT9_SCHEMA))); | |
project9.apply(LiKafkaIO.writeGenericRecord() | |
.withTopic("example_output") | |
.withProducerConfig(kafkaConfig.getProducerConfig(LiKafkaIOConfig.ClusterName.QUEUING))); | |
pipeline.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment