Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Creating a StreamSets Spark Transformer in Java - after second code expansion
package com.streamsets.spark;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.spark.api.SparkTransformer;
import com.streamsets.pipeline.spark.api.TransformResult;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
import com.streamsets.pipeline.api.Field;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import java.util.Iterator;
import java.util.LinkedList;
public class CustomTransformer extends SparkTransformer implements Serializable {
private static final String VALUE_PATH = "/credit_card";
private static final String RESULT_PATH = "/credit_card_type";
private static HashMap<String, String[]> ccTypes;
static {
// Create a map of card type to list of prefixes
ccTypes = new HashMap<>();
ccTypes.put("Visa", new String[]{"4"});
ccTypes.put("Mastercard", new String[]{"51","52","53","54","55"});
ccTypes.put("AMEX", new String[]{"34","37"});
ccTypes.put("Diners Club", new String[]{"300","301","302","303","304","305","36","38"});
ccTypes.put("Discover", new String[]{"6011","65"});
ccTypes.put("JCB", new String[]{"2131","1800","35"});
ccTypes.put("Other", new String[]{""});
}
private transient JavaSparkContext javaSparkContext;
@Override
public void init(JavaSparkContext javaSparkContext, List<String> params) {
this.javaSparkContext = javaSparkContext;
}
@Override
public TransformResult transform(JavaRDD<Record> records) {
// Validate incoming records
JavaPairRDD<Record, String> errors = records.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Record>, Record, String>() {
public Iterator<Tuple2<Record, String>> call(Iterator<Record> recordIterator) throws Exception {
List<Tuple2<Record, String>> errors = new LinkedList<>();
// Iterate through incoming records
while (recordIterator.hasNext()) {
Record record = recordIterator.next();
// Validate each record
if (!validateRecord(record)) {
// We have a problem - flag the record as an error
errors.add(new Tuple2<>(record, "Credit card number is missing"));
}
}
return errors.iterator();
}
});
// Filter out invalid records before applying the map
JavaRDD<Record> result = records.filter(new Function<Record, Boolean>() {
// Only operate on valid records
public Boolean call(Record record) throws Exception {
return validateRecord(record);
}
}).map(new Function<Record, Record>() {
public Record call(Record record) throws Exception {
// Get the credit card number from the record
String creditCard = record.get(VALUE_PATH).getValueAsString();
// Look through the map of credit card types
for (Map.Entry<String, String[]> entry : ccTypes.entrySet()) {
// Find the first matching prefix
for (String prefix : entry.getValue()) {
if (creditCard.startsWith(prefix)) {
// Set the credit card type
record.set(RESULT_PATH, Field.create(entry.getKey()));
return record;
}
}
}
return record;
}
});
return new TransformResult(result, errors);
}
private static Boolean validateRecord(Record record) {
// We need a field to operate on!
Field field = record.get(VALUE_PATH);
if (field == null) {
return false;
}
// The field must contain a value!
String val = field.getValueAsString();
return val != null && val.length() > 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment