Created
September 22, 2018 00:52
-
-
Save metadaddy/c74fe6e48ca23b7b79191a734b316c4b to your computer and use it in GitHub Desktop.
Creating a StreamSets Spark Transformer in Java - after third code expansion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.streamsets.spark; | |
import com.streamsets.pipeline.api.Record; | |
import com.streamsets.pipeline.spark.api.SparkTransformer; | |
import com.streamsets.pipeline.spark.api.TransformResult; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import scala.Tuple2; | |
import java.io.Serializable; | |
import java.util.List; | |
import com.streamsets.pipeline.api.Field; | |
import java.util.HashMap; | |
import java.util.Map; | |
import org.apache.spark.api.java.function.PairFlatMapFunction; | |
import java.util.Iterator; | |
import java.util.LinkedList; | |
public class CustomTransformer extends SparkTransformer implements Serializable { | |
private static final String VALUE_PATH = "/credit_card"; | |
private static final String RESULT_PATH = "/credit_card_type"; | |
private HashMap<String, String[]> ccTypes = new HashMap<>(); | |
@Override | |
public void init(JavaSparkContext javaSparkContext, List<String> params) { | |
// Params are in form "MasterCard=51,52,53,54,55" | |
for (String param : params) { | |
// Parse the credit card type and list of prefixes | |
String key = param.substring(0, param.indexOf('=')); | |
String prefixes[] = param.substring(param.indexOf('=') + 1, param.length()).split(","); | |
ccTypes.put(key, prefixes); | |
} | |
} | |
@Override | |
public TransformResult transform(JavaRDD<Record> records) { | |
// Validate incoming records | |
JavaPairRDD<Record, String> errors = records.mapPartitionsToPair( | |
new PairFlatMapFunction<Iterator<Record>, Record, String>() { | |
public Iterator<Tuple2<Record, String>> call(Iterator<Record> recordIterator) throws Exception { | |
List<Tuple2<Record, String>> errors = new LinkedList<>(); | |
// Iterate through incoming records | |
while (recordIterator.hasNext()) { | |
Record record = recordIterator.next(); | |
// Validate each record | |
if (!validateRecord(record)) { | |
// We have a problem - flag the record as an error | |
errors.add(new Tuple2<>(record, "Credit card number is missing")); | |
} | |
} | |
return errors.iterator(); | |
} | |
}); | |
// Filter out invalid records before applying the map | |
JavaRDD<Record> result = records.filter(new Function<Record, Boolean>() { | |
// Only operate on valid records | |
public Boolean call(Record record) throws Exception { | |
return validateRecord(record); | |
} | |
}).map(new Function<Record, Record>() { | |
public Record call(Record record) throws Exception { | |
// Get the credit card number from the record | |
String creditCard = record.get(VALUE_PATH).getValueAsString(); | |
// Look through the map of credit card types | |
for (Map.Entry<String, String[]> entry : ccTypes.entrySet()) { | |
// Find the first matching prefix | |
for (String prefix : entry.getValue()) { | |
if (creditCard.startsWith(prefix)) { | |
// Set the credit card type | |
record.set(RESULT_PATH, Field.create(entry.getKey())); | |
return record; | |
} | |
} | |
} | |
return record; | |
} | |
}); | |
return new TransformResult(result, errors); | |
} | |
private static Boolean validateRecord(Record record) { | |
// We need a field to operate on! | |
Field field = record.get(VALUE_PATH); | |
if (field == null) { | |
return false; | |
} | |
// The field must contain a value! | |
String val = field.getValueAsString(); | |
return val != null && val.length() > 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment