Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Created September 22, 2018 00:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save metadaddy/0447075d0bfe5345ffedbd8d6c870ca0 to your computer and use it in GitHub Desktop.
Save metadaddy/0447075d0bfe5345ffedbd8d6c870ca0 to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Java - after first code expansion
package com.streamsets.spark;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.spark.api.SparkTransformer;
import com.streamsets.pipeline.spark.api.TransformResult;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
import com.streamsets.pipeline.api.Field;
import java.util.HashMap;
import java.util.Map;
public class CustomTransformer extends SparkTransformer implements Serializable {
private static final String VALUE_PATH = "/credit_card";
private static final String RESULT_PATH = "/credit_card_type";
private static HashMap<String, String[]> ccTypes;
static {
// Create a map of card type to list of prefixes
ccTypes = new HashMap<>();
ccTypes.put("Visa", new String[]{"4"});
ccTypes.put("Mastercard", new String[]{"51","52","53","54","55"});
ccTypes.put("AMEX", new String[]{"34","37"});
ccTypes.put("Diners Club", new String[]{"300","301","302","303","304","305","36","38"});
ccTypes.put("Discover", new String[]{"6011","65"});
ccTypes.put("JCB", new String[]{"2131","1800","35"});
ccTypes.put("Other", new String[]{""});
}
private transient JavaSparkContext javaSparkContext;
@Override
public void init(JavaSparkContext javaSparkContext, List<String> params) {
this.javaSparkContext = javaSparkContext;
}
@Override
public TransformResult transform(JavaRDD<Record> records) {
// Create an empty errors JavaPairRDD
JavaRDD<Tuple2<Record,String>> emptyRDD = javaSparkContext.emptyRDD();
JavaPairRDD<Record, String> errors = JavaPairRDD.fromJavaRDD(emptyRDD);
// Apply a map to the incoming records
JavaRDD<Record> result = records.map(new Function<Record, Record>() {
public Record call(Record record) throws Exception {
// Get the credit card number from the record
String creditCard = record.get(VALUE_PATH).getValueAsString();
// Look through the map of credit card types
for (Map.Entry<String, String[]> entry : ccTypes.entrySet()) {
// Find the first matching prefix
for (String prefix : entry.getValue()) {
if (creditCard.startsWith(prefix)) {
// Set the credit card type
record.set(RESULT_PATH, Field.create(entry.getKey()));
return record;
}
}
}
return record;
}
});
return new TransformResult(result, errors);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment