Skip to content

Instantly share code, notes, and snippets.

@randallwhitman
Last active December 17, 2015 17:19
Show Gist options
  • Save randallwhitman/5645688 to your computer and use it in GitHub Desktop.
Save randallwhitman/5645688 to your computer and use it in GitHub Desktop.
Trip Discovery - Infer Trips by Stoppage
public void reduce(Text key, Iterable<CarSortWritable> values, Context ctx)
throws IOException, InterruptedException {
String[] kys = key.toString().split(","); // carID & date
Text outKy = new Text(kys[0]);
// Expect at most tens of thousands of positions per car per day - expect up to thousands.
// (per year, up to 2-3 hundreds of thousands)
final int MAX_BUFFER_SIZE = 8000; // would fit a record every 11s all day
ArrayList<CarSortWritable> records = new ArrayList<CarSortWritable>(MAX_BUFFER_SIZE);
for (CarSortWritable entry : values) {
records.add(new CarSortWritable(entry));
}
Collections.sort(records);
// Keep origin & last/previous time & position
CarSortWritable first = records.get(0);
String theDate = first.getDate(), origTime = first.getTime(),
origLon = first.getLon(), origLat = first.getLat(), origSpd = first.getSpeed(),
prevTime = null, prevLon = null, prevLat = null, prevSpd = null;
long nOrgTm = timeAsInteger(theDate, origTime), nPrvTm = -1;
try { // Check if lapse exceeding threshold.
// The check for time lapse, without checking position movement,
// utilizes the fact that these GPS units transmit data only
// when the car is on - or at least do not transmit data when
// the key is altogether out of the ignition.
for (CarSortWritable entry : records) {
String curTime = entry.getTime(), curLon = entry.getLon(), curLat = entry.getLat(),
curSpd = entry.getSpeed();
long nCurTm = timeAsInteger(theDate, curTime);
if (nPrvTm > nOrgTm //ignore lone points
&& nCurTm > nPrvTm + threshold) {
int idxOrig = queryGrid(DmsUtil.parseDms(origLon),
DmsUtil.parseDms(origLat));
int idxDest = queryGrid(DmsUtil.parseDms(prevLon),
DmsUtil.parseDms(prevLat));
if (idxOrig >= 0 && idxDest > 0) { // discard outliers
double[] cellOrig = grid.get(idxOrig);
double[] cellDest = grid.get(idxDest);
ctx.write(outKy,
new TripCellWrit(theDate, origTime, origLon, origLat, origSpd,
cellOrig[0], cellOrig[1], cellOrig[2], cellOrig[3],
theDate, prevTime, prevLon, prevLat, prevSpd,
cellDest[0], cellDest[1], cellDest[2], cellDest[3])
);
}
nOrgTm = nCurTm;
origTime = curTime;
origLon = curLon;
origLat = curLat;
origSpd = curSpd;
}
nPrvTm = nCurTm;
prevTime = curTime;
prevLon = curLon;
prevLat = curLat;
prevSpd = curSpd;
}
if (/*records.size() > 1 && */ nPrvTm > nOrgTm) { // no lone point
int idxOrig = queryGrid(DmsUtil.parseDms(origLon),
DmsUtil.parseDms(origLat));
int idxDest = queryGrid(DmsUtil.parseDms(prevLon),
DmsUtil.parseDms(prevLat));
if (idxOrig >= 0 && idxDest > 0) { // discard outliers
double[] cellOrig = grid.get(idxOrig);
double[] cellDest = grid.get(idxDest);
ctx.write(outKy,
new TripCellWrit(theDate, origTime, origLon, origLat, origSpd,
cellOrig[0], cellOrig[1], cellOrig[2], cellOrig[3],
theDate, prevTime, prevLon, prevLat, prevSpd,
cellDest[0], cellDest[1], cellDest[2], cellDest[3])); // current, after loop exit
}
}
} catch (Exception e) {
// could log something
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment