Last active
December 17, 2015 17:19
-
-
Save randallwhitman/5645688 to your computer and use it in GitHub Desktop.
Trip Discovery - Infer Trips by Stoppage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void reduce(Text key, Iterable<CarSortWritable> values, Context ctx) | |
throws IOException, InterruptedException { | |
String[] kys = key.toString().split(","); // carID & date | |
Text outKy = new Text(kys[0]); | |
// Expect at most tens of thousands of positions per car per day - expect up to thousands. | |
// (per year, up to 2-3 hundreds of thousands) | |
final int MAX_BUFFER_SIZE = 8000; // would fit a record every 11s all day | |
ArrayList<CarSortWritable> records = new ArrayList<CarSortWritable>(MAX_BUFFER_SIZE); | |
for (CarSortWritable entry : values) { | |
records.add(new CarSortWritable(entry)); | |
} | |
Collections.sort(records); | |
// Keep origin & last/previous time & position | |
CarSortWritable first = records.get(0); | |
String theDate = first.getDate(), origTime = first.getTime(), | |
origLon = first.getLon(), origLat = first.getLat(), origSpd = first.getSpeed(), | |
prevTime = null, prevLon = null, prevLat = null, prevSpd = null; | |
long nOrgTm = timeAsInteger(theDate, origTime), nPrvTm = -1; | |
try { // Check if lapse exceeding threshold. | |
// The check for time lapse, without checking position movement, | |
// utilizes the fact that these GPS units transmit data only | |
// when the car is on - or at least do not transmit data when | |
// the key is altogether out of the ignition. | |
for (CarSortWritable entry : records) { | |
String curTime = entry.getTime(), curLon = entry.getLon(), curLat = entry.getLat(), | |
curSpd = entry.getSpeed(); | |
long nCurTm = timeAsInteger(theDate, curTime); | |
if (nPrvTm > nOrgTm //ignore lone points | |
&& nCurTm > nPrvTm + threshold) { | |
int idxOrig = queryGrid(DmsUtil.parseDms(origLon), | |
DmsUtil.parseDms(origLat)); | |
int idxDest = queryGrid(DmsUtil.parseDms(prevLon), | |
DmsUtil.parseDms(prevLat)); | |
if (idxOrig >= 0 && idxDest > 0) { // discard outliers | |
double[] cellOrig = grid.get(idxOrig); | |
double[] cellDest = grid.get(idxDest); | |
ctx.write(outKy, | |
new TripCellWrit(theDate, origTime, origLon, origLat, origSpd, | |
cellOrig[0], cellOrig[1], cellOrig[2], cellOrig[3], | |
theDate, prevTime, prevLon, prevLat, prevSpd, | |
cellDest[0], cellDest[1], cellDest[2], cellDest[3]) | |
); | |
} | |
nOrgTm = nCurTm; | |
origTime = curTime; | |
origLon = curLon; | |
origLat = curLat; | |
origSpd = curSpd; | |
} | |
nPrvTm = nCurTm; | |
prevTime = curTime; | |
prevLon = curLon; | |
prevLat = curLat; | |
prevSpd = curSpd; | |
} | |
if (/*records.size() > 1 && */ nPrvTm > nOrgTm) { // no lone point | |
int idxOrig = queryGrid(DmsUtil.parseDms(origLon), | |
DmsUtil.parseDms(origLat)); | |
int idxDest = queryGrid(DmsUtil.parseDms(prevLon), | |
DmsUtil.parseDms(prevLat)); | |
if (idxOrig >= 0 && idxDest > 0) { // discard outliers | |
double[] cellOrig = grid.get(idxOrig); | |
double[] cellDest = grid.get(idxDest); | |
ctx.write(outKy, | |
new TripCellWrit(theDate, origTime, origLon, origLat, origSpd, | |
cellOrig[0], cellOrig[1], cellOrig[2], cellOrig[3], | |
theDate, prevTime, prevLon, prevLat, prevSpd, | |
cellDest[0], cellDest[1], cellDest[2], cellDest[3])); // current, after loop exit | |
} | |
} | |
} catch (Exception e) { | |
// could log something | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment