Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Created September 22, 2018 00:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save metadaddy/c74fe6e48ca23b7b79191a734b316c4b to your computer and use it in GitHub Desktop.
Save metadaddy/c74fe6e48ca23b7b79191a734b316c4b to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Java - after third code expansion
package com.streamsets.spark;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.spark.api.SparkTransformer;
import com.streamsets.pipeline.spark.api.TransformResult;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
import com.streamsets.pipeline.api.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import java.util.Iterator;
import java.util.LinkedList;
public class CustomTransformer extends SparkTransformer implements Serializable {
private static final String VALUE_PATH = "/credit_card";
private static final String RESULT_PATH = "/credit_card_type";
private HashMap<String, String[]> ccTypes = new HashMap<>();
@Override
public void init(JavaSparkContext javaSparkContext, List<String> params) {
// Params are in form "MasterCard=51,52,53,54,55"
for (String param : params) {
// Parse the credit card type and list of prefixes
String key = param.substring(0, param.indexOf('='));
String prefixes[] = param.substring(param.indexOf('=') + 1, param.length()).split(",");
ccTypes.put(key, prefixes);
}
}
@Override
public TransformResult transform(JavaRDD<Record> records) {
// Validate incoming records
JavaPairRDD<Record, String> errors = records.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Record>, Record, String>() {
public Iterator<Tuple2<Record, String>> call(Iterator<Record> recordIterator) throws Exception {
List<Tuple2<Record, String>> errors = new LinkedList<>();
// Iterate through incoming records
while (recordIterator.hasNext()) {
Record record = recordIterator.next();
// Validate each record
if (!validateRecord(record)) {
// We have a problem - flag the record as an error
errors.add(new Tuple2<>(record, "Credit card number is missing"));
}
}
return errors.iterator();
}
});
// Filter out invalid records before applying the map
JavaRDD<Record> result = records.filter(new Function<Record, Boolean>() {
// Only operate on valid records
public Boolean call(Record record) throws Exception {
return validateRecord(record);
}
}).map(new Function<Record, Record>() {
public Record call(Record record) throws Exception {
// Get the credit card number from the record
String creditCard = record.get(VALUE_PATH).getValueAsString();
// Look through the map of credit card types
for (Map.Entry<String, String[]> entry : ccTypes.entrySet()) {
// Find the first matching prefix
for (String prefix : entry.getValue()) {
if (creditCard.startsWith(prefix)) {
// Set the credit card type
record.set(RESULT_PATH, Field.create(entry.getKey()));
return record;
}
}
}
return record;
}
});
return new TransformResult(result, errors);
}
private static Boolean validateRecord(Record record) {
// We need a field to operate on!
Field field = record.get(VALUE_PATH);
if (field == null) {
return false;
}
// The field must contain a value!
String val = field.getValueAsString();
return val != null && val.length() > 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment