Created
September 22, 2018 00:43
-
-
Save metadaddy/0447075d0bfe5345ffedbd8d6c870ca0 to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Java - after first code expansion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.streamsets.spark; | |
import com.streamsets.pipeline.api.Record; | |
import com.streamsets.pipeline.spark.api.SparkTransformer; | |
import com.streamsets.pipeline.spark.api.TransformResult; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import scala.Tuple2; | |
import java.io.Serializable; | |
import java.util.List; | |
import com.streamsets.pipeline.api.Field; | |
import java.util.HashMap; | |
import java.util.Map; | |
public class CustomTransformer extends SparkTransformer implements Serializable { | |
private static final String VALUE_PATH = "/credit_card"; | |
private static final String RESULT_PATH = "/credit_card_type"; | |
private static HashMap<String, String[]> ccTypes; | |
static { | |
// Create a map of card type to list of prefixes | |
ccTypes = new HashMap<>(); | |
ccTypes.put("Visa", new String[]{"4"}); | |
ccTypes.put("Mastercard", new String[]{"51","52","53","54","55"}); | |
ccTypes.put("AMEX", new String[]{"34","37"}); | |
ccTypes.put("Diners Club", new String[]{"300","301","302","303","304","305","36","38"}); | |
ccTypes.put("Discover", new String[]{"6011","65"}); | |
ccTypes.put("JCB", new String[]{"2131","1800","35"}); | |
ccTypes.put("Other", new String[]{""}); | |
} | |
private transient JavaSparkContext javaSparkContext; | |
@Override | |
public void init(JavaSparkContext javaSparkContext, List<String> params) { | |
this.javaSparkContext = javaSparkContext; | |
} | |
@Override | |
public TransformResult transform(JavaRDD<Record> records) { | |
// Create an empty errors JavaPairRDD | |
JavaRDD<Tuple2<Record,String>> emptyRDD = javaSparkContext.emptyRDD(); | |
JavaPairRDD<Record, String> errors = JavaPairRDD.fromJavaRDD(emptyRDD); | |
// Apply a map to the incoming records | |
JavaRDD<Record> result = records.map(new Function<Record, Record>() { | |
public Record call(Record record) throws Exception { | |
// Get the credit card number from the record | |
String creditCard = record.get(VALUE_PATH).getValueAsString(); | |
// Look through the map of credit card types | |
for (Map.Entry<String, String[]> entry : ccTypes.entrySet()) { | |
// Find the first matching prefix | |
for (String prefix : entry.getValue()) { | |
if (creditCard.startsWith(prefix)) { | |
// Set the credit card type | |
record.set(RESULT_PATH, Field.create(entry.getKey())); | |
return record; | |
} | |
} | |
} | |
return record; | |
} | |
}); | |
return new TransformResult(result, errors); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment