Skip to content

Instantly share code, notes, and snippets.

View waldmark's full-sized avatar

mark waldmark

  • Minnesota
  • 18:42 (UTC -05:00)
View GitHub Profile
@waldmark
waldmark / Spark stateful call count
Created October 10, 2016 16:30
Spark stateful call count
// update a stateful count of calls by call type
JavaPairDStream<String, Long> reducedState = pairs.mapToPair(pair ->
new Tuple2<>(pair._1(), 1L))
.updateStateByKey(
(List<Long> calls, Optional<Long> currentCount) -> {
Long sum = currentCount.or(0L) + ((long) calls.size());
return Optional.of(sum);
});
@waldmark
waldmark / Spark micro batch pairs
Created October 10, 2016 16:23
Spark micro batch pairs
// create pairs with key of call type and value of a Java call object
JavaPairDStream<String, ArrayList<RealTime911>> pairs = callData.mapToPair(rt911 ->
new Tuple2<>(rt911.getCallType(), new ArrayList<>(Arrays.asList(rt911))));
// micro batch summary - a micro batch grouped by call type
// create reduced pairs with key of call type and value of the list of call java call objects of that type
JavaPairDStream<String, ArrayList<RealTime911>> reducedPairs = pairs.reduceByKey((a, b) -> {
a.addAll(b);
return a;
});
@waldmark
waldmark / Spark Analysis
Last active October 7, 2016 14:56
Spark Analysis
@Component
public class SparkAnalysis implements Serializable {
static final long serialVersionUID = 100L;
// clean the raw input data
public JavaDStream<RealTime911> cleanData(JavaDStream<RealTime911> callData) {
return callData.map(x -> {
String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", "");
x.setCallType(callType);
return x;
@waldmark
waldmark / Micro Batch output
Last active October 7, 2016 14:50
Micro Batch output
public void reportCurrentCallTypeMicroBatch(JavaPairDStream<String, ArrayList<RealTime911>> reduced) {
reduced.foreachRDD( rdd -> {
if(rdd.count() < 1) { // dont show empty micro batch (e.g. no data received in the last time interval)
return;
}
JavaPairRDD<String, ArrayList<RealTime911>> sorted = rdd.sortByKey();
sorted.foreach( record -> {
buildLogRecord(record._1(), record._2());
});
@waldmark
waldmark / Spark clean and filter
Last active October 10, 2016 16:19
Spark clean and filter
callData = callData.map(x -> { // clean data
String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", "");
x.setCallType(callType);
return x;
}).filter(pair -> { // filter data
return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*");
});
@waldmark
waldmark / Map 911 to Java
Created October 7, 2016 14:38
Map 911 to Java
@Component
public class Map911Call implements Function<String, RealTime911> {
private static Logger logger = LoggerFactory.getLogger(Map911Call.class);
@Override
public RealTime911 call(String line) throws Exception {
RealTime911 call = new RealTime911();
// split on, but not inside, quoted field
String[] columns = line.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
if (columns.length < 8) {
@waldmark
waldmark / Spark Streaming Context
Last active October 7, 2016 14:33
DStream raw data
// Get the kafka topic data
JavaDStream<String> lines = rawDataLines.map((Function<Tuple2<String, String>, String>) Tuple2::_2);