Created
February 8, 2018 17:36
-
-
Save jwalton922/0abbf4379ed47a4574a3e89a7681ad70 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* To change this license header, choose License Headers in Project Properties. | |
* To change this template file, choose Tools | Templates | |
* and open the template in the editor. | |
*/ | |
package com.pearson.engagement; | |
import com.pearson.autobahn.AutobahnMessage; | |
import com.pearson.autobahn.SparkReader; | |
import com.pearson.graphdatagenerator.EngagementData; | |
import com.pearson.graphdatagenerator.Student; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Date; | |
import java.util.HashSet; | |
import java.util.List; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import scala.Tuple2; | |
/** | |
* | |
* @author uwaltj6 | |
*/ | |
public class ActiveStudentJob { | |
public void run(String engagementInputDir, String outputDir){ | |
SparkConf conf = new SparkConf(); | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
JavaRDD<AutobahnMessage> enagementMessages = SparkReader.readAutobahnMessages(engagementInputDir, sc); | |
JavaPairRDD<String, EngagementData> engagementByCs = enagementMessages.mapToPair(message -> { | |
String courseSection = (String) message.getPayload().get("courseSectionId"); | |
String student = (String) message.getPayload().get("personId"); | |
String unloadDt = (String) message.getPayload().get("unloadDt"); | |
EngagementData engagementData = new EngagementData(); | |
engagementData.setCourseSection(courseSection); | |
engagementData.setStudentId(student); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:MM:ss.SSSZ"); | |
Date time = dateFormat.parse(unloadDt); | |
engagementData.setTimestamp(time.getTime()); | |
return new Tuple2<>(courseSection, engagementData); | |
}); | |
JavaPairRDD<String,String> enrolledStudents = engagementByCs.keys().distinct().flatMapToPair(cs -> { | |
//GET enrolled students from registrar | |
List<Tuple2<String,String>> retList = new ArrayList<>(); | |
List<String> enrolledUsers = null; | |
for(String user : enrolledUsers){ | |
retList.add(new Tuple2<>(user,user)); | |
} | |
return retList.iterator(); | |
}); | |
JavaPairRDD<String, EngagementData> filteredEngagementByBin = engagementByCs.leftOuterJoin(enrolledStudents).mapToPair(tuple -> { | |
String courseSection = tuple._1(); | |
EngagementData engagement = tuple._2()._1(); | |
Date date = new Date(); | |
date.setTime(engagement.getTimestamp()); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh"); | |
String hourBin = dateFormat.format(date); | |
return new Tuple2<>(courseSection+"_"+hourBin, engagement); | |
}); | |
JavaPairRDD<String,Integer> countByBin = filteredEngagementByBin.aggregateByKey(0, (count,engagement) -> { | |
return count++; | |
}, (a,b)-> { | |
return a+b; | |
}); | |
JavaRDD<String> output = countByBin.map(tuple -> { | |
return tuple._1()+","+tuple._2(); | |
}); | |
output.repartition(5).saveAsTextFile(outputDir); | |
} | |
/** | |
* No UserCourseSection messages from ETEXT | |
* @param engagementInputDir | |
* @param registrarData | |
* @param outputDir | |
*/ | |
public void runWithAllABMessages(String engagementInputDir, String userCourseSectonDir, String outputDir) { | |
SparkConf conf = new SparkConf(); | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
JavaRDD<AutobahnMessage> enagementMessages = SparkReader.readAutobahnMessages(engagementInputDir, sc); | |
JavaPairRDD<String, EngagementData> engagementByStudent = enagementMessages.mapToPair(message -> { | |
String courseSection = (String) message.getPayload().get("courseSectionId"); | |
String student = (String) message.getPayload().get("personId"); | |
String unloadDt = (String) message.getPayload().get("unloadDt"); | |
EngagementData engagementData = new EngagementData(); | |
engagementData.setCourseSection(courseSection); | |
engagementData.setStudentId(student); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:MM:ss.SSSZ"); | |
Date time = dateFormat.parse(unloadDt); | |
engagementData.setTimestamp(time.getTime()); | |
return new Tuple2<>(student, engagementData); | |
}); | |
//no UserCourseSection messages from etext | |
JavaPairRDD<String, Student> droppedStudents = SparkReader.readAutobahnMessages(userCourseSectonDir, sc).mapToPair(message -> { | |
String studentId = (String) message.getPayload().get("personId"); | |
String courseSection = (String) message.getPayload().get("courseSectionId"); | |
String enrollmentStatusCode = (String) message.getPayload().get("enrollmentStatusCode"); | |
Student student = new Student(); | |
student.setCourseSections(new HashSet<>(Arrays.asList(courseSection))); | |
student.setId(studentId); | |
return new Tuple2<>(studentId, student); | |
}); | |
JavaPairRDD<String, EngagementData> filteredEngagementByBin = engagementByStudent.subtractByKey(droppedStudents).mapToPair(tuple -> { | |
String courseSection = tuple._2().getCourseSection(); | |
EngagementData engagement = tuple._2(); | |
Date date = new Date(); | |
date.setTime(engagement.getTimestamp()); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh"); | |
String hourBin = dateFormat.format(date); | |
return new Tuple2<>(courseSection+"_"+hourBin, engagement); | |
}); | |
JavaPairRDD<String,Integer> countByBin = filteredEngagementByBin.aggregateByKey(0, (count,engagement) -> { | |
return count++; | |
}, (a,b)-> { | |
return a+b; | |
}); | |
JavaRDD<String> output = countByBin.map(tuple -> { | |
return tuple._1()+","+tuple._2(); | |
}); | |
output.repartition(5).saveAsTextFile(outputDir); | |
} | |
public static void main(String[] args) { | |
String engagementInput = args[0]; | |
String userCourseSectionInput = args[1]; | |
String outputDir = args[2]; | |
ActiveStudentJob job = new ActiveStudentJob(); | |
job.run(engagementInput, outputDir); | |
job.runWithAllABMessages(engagementInput, userCourseSectionInput, outputDir); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment