This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | // update a stateful count of calls by call type | |
| JavaPairDStream<String, Long> reducedState = pairs.mapToPair(pair -> | |
| new Tuple2<>(pair._1(), 1L)) | |
| .updateStateByKey( | |
| (List<Long> calls, Optional<Long> currentCount) -> { | |
| Long sum = currentCount.or(0L) + ((long) calls.size()); | |
| return Optional.of(sum); | |
| }); | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | // create pairs with key of call type and value of a Java call object | |
| JavaPairDStream<String, ArrayList<RealTime911>> pairs = callData.mapToPair(rt911 -> | |
| new Tuple2<>(rt911.getCallType(), new ArrayList<>(Arrays.asList(rt911)))); | |
| // micro batch summary - a micro batch grouped by call type | |
| // create reduced pairs with key of call type and value of the list of call java call objects of that type | |
| JavaPairDStream<String, ArrayList<RealTime911>> reducedPairs = pairs.reduceByKey((a, b) -> { | |
| a.addAll(b); | |
| return a; | |
| }); | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | @Component | |
| public class SparkAnalysis implements Serializable { | |
| static final long serialVersionUID = 100L; | |
| // clean the raw input data | |
| public JavaDStream<RealTime911> cleanData(JavaDStream<RealTime911> callData) { | |
| return callData.map(x -> { | |
| String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); | |
| x.setCallType(callType); | |
| return x; | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | public void reportCurrentCallTypeMicroBatch(JavaPairDStream<String, ArrayList<RealTime911>> reduced) { | |
| reduced.foreachRDD( rdd -> { | |
| if(rdd.count() < 1) { // dont show empty micro batch (e.g. no data received in the last time interval) | |
| return; | |
| } | |
| JavaPairRDD<String, ArrayList<RealTime911>> sorted = rdd.sortByKey(); | |
| sorted.foreach( record -> { | |
| buildLogRecord(record._1(), record._2()); | |
| }); | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | callData = callData.map(x -> { // clean data | |
| String callType = x.getCallType().replaceAll("\"", "").replaceAll("[-|,]", ""); | |
| x.setCallType(callType); | |
| return x; | |
| }).filter(pair -> { // filter data | |
| return !isFilteredOnFire || pair.getCallType().matches("(?i).*\\bFire\\b.*"); | |
| }); | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | @Component | |
| public class Map911Call implements Function<String, RealTime911> { | |
| private static Logger logger = LoggerFactory.getLogger(Map911Call.class); | |
| @Override | |
| public RealTime911 call(String line) throws Exception { | |
| RealTime911 call = new RealTime911(); | |
| // split on, but not inside, quoted field | |
| String[] columns = line.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)"); | |
| if (columns.length < 8) { | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | // Get the kafka topic data | |
| JavaDStream<String> lines = rawDataLines.map((Function<Tuple2<String, String>, String>) Tuple2::_2); |