Skip to content

Instantly share code, notes, and snippets.

@jwalton922
Created February 8, 2018 17:36
Show Gist options
  • Save jwalton922/0abbf4379ed47a4574a3e89a7681ad70 to your computer and use it in GitHub Desktop.
Save jwalton922/0abbf4379ed47a4574a3e89a7681ad70 to your computer and use it in GitHub Desktop.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.pearson.engagement;
import com.pearson.autobahn.AutobahnMessage;
import com.pearson.autobahn.SparkReader;
import com.pearson.graphdatagenerator.EngagementData;
import com.pearson.graphdatagenerator.Student;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
*
* @author uwaltj6
*/
public class ActiveStudentJob {
public void run(String engagementInputDir, String outputDir){
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<AutobahnMessage> enagementMessages = SparkReader.readAutobahnMessages(engagementInputDir, sc);
JavaPairRDD<String, EngagementData> engagementByCs = enagementMessages.mapToPair(message -> {
String courseSection = (String) message.getPayload().get("courseSectionId");
String student = (String) message.getPayload().get("personId");
String unloadDt = (String) message.getPayload().get("unloadDt");
EngagementData engagementData = new EngagementData();
engagementData.setCourseSection(courseSection);
engagementData.setStudentId(student);
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:MM:ss.SSSZ");
Date time = dateFormat.parse(unloadDt);
engagementData.setTimestamp(time.getTime());
return new Tuple2<>(courseSection, engagementData);
});
JavaPairRDD<String,String> enrolledStudents = engagementByCs.keys().distinct().flatMapToPair(cs -> {
//GET enrolled students from registrar
List<Tuple2<String,String>> retList = new ArrayList<>();
List<String> enrolledUsers = null;
for(String user : enrolledUsers){
retList.add(new Tuple2<>(user,user));
}
return retList.iterator();
});
JavaPairRDD<String, EngagementData> filteredEngagementByBin = engagementByCs.leftOuterJoin(enrolledStudents).mapToPair(tuple -> {
String courseSection = tuple._1();
EngagementData engagement = tuple._2()._1();
Date date = new Date();
date.setTime(engagement.getTimestamp());
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh");
String hourBin = dateFormat.format(date);
return new Tuple2<>(courseSection+"_"+hourBin, engagement);
});
JavaPairRDD<String,Integer> countByBin = filteredEngagementByBin.aggregateByKey(0, (count,engagement) -> {
return count++;
}, (a,b)-> {
return a+b;
});
JavaRDD<String> output = countByBin.map(tuple -> {
return tuple._1()+","+tuple._2();
});
output.repartition(5).saveAsTextFile(outputDir);
}
/**
* No UserCourseSection messages from ETEXT
* @param engagementInputDir
* @param registrarData
* @param outputDir
*/
public void runWithAllABMessages(String engagementInputDir, String userCourseSectonDir, String outputDir) {
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<AutobahnMessage> enagementMessages = SparkReader.readAutobahnMessages(engagementInputDir, sc);
JavaPairRDD<String, EngagementData> engagementByStudent = enagementMessages.mapToPair(message -> {
String courseSection = (String) message.getPayload().get("courseSectionId");
String student = (String) message.getPayload().get("personId");
String unloadDt = (String) message.getPayload().get("unloadDt");
EngagementData engagementData = new EngagementData();
engagementData.setCourseSection(courseSection);
engagementData.setStudentId(student);
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:MM:ss.SSSZ");
Date time = dateFormat.parse(unloadDt);
engagementData.setTimestamp(time.getTime());
return new Tuple2<>(student, engagementData);
});
//no UserCourseSection messages from etext
JavaPairRDD<String, Student> droppedStudents = SparkReader.readAutobahnMessages(userCourseSectonDir, sc).mapToPair(message -> {
String studentId = (String) message.getPayload().get("personId");
String courseSection = (String) message.getPayload().get("courseSectionId");
String enrollmentStatusCode = (String) message.getPayload().get("enrollmentStatusCode");
Student student = new Student();
student.setCourseSections(new HashSet<>(Arrays.asList(courseSection)));
student.setId(studentId);
return new Tuple2<>(studentId, student);
});
JavaPairRDD<String, EngagementData> filteredEngagementByBin = engagementByStudent.subtractByKey(droppedStudents).mapToPair(tuple -> {
String courseSection = tuple._2().getCourseSection();
EngagementData engagement = tuple._2();
Date date = new Date();
date.setTime(engagement.getTimestamp());
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh");
String hourBin = dateFormat.format(date);
return new Tuple2<>(courseSection+"_"+hourBin, engagement);
});
JavaPairRDD<String,Integer> countByBin = filteredEngagementByBin.aggregateByKey(0, (count,engagement) -> {
return count++;
}, (a,b)-> {
return a+b;
});
JavaRDD<String> output = countByBin.map(tuple -> {
return tuple._1()+","+tuple._2();
});
output.repartition(5).saveAsTextFile(outputDir);
}
public static void main(String[] args) {
String engagementInput = args[0];
String userCourseSectionInput = args[1];
String outputDir = args[2];
ActiveStudentJob job = new ActiveStudentJob();
job.run(engagementInput, outputDir);
job.runWithAllABMessages(engagementInput, userCourseSectionInput, outputDir);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment