Skip to content

Instantly share code, notes, and snippets.

@davidmoten
Created June 25, 2014 12:09
Show Gist options
  • Save davidmoten/e6d630ae2b2c8cf91a34 to your computer and use it in GitHub Desktop.
Save davidmoten/e6d630ae2b2c8cf91a34 to your computer and use it in GitHub Desktop.
distance travelled calculator - using sort
/**
* Returns distinct cells and the the total nautical miles travelled in that
* cell. Uses RxJava {@link Observable}s to maximize throughput and will
* scale to use all available processors and to handle a large number of
* files (number of open file handles should be limited by number of
* available processors).
*
* @param files
* @return
*/
public Observable<CellAndDistance> calculateDistanceByCellFromFiles(
Observable<File> files) {
// use a map-reduce approach where the parallel method shares out
// ('maps') the fixes by craft to multiple threads (number determined by
// available processors) and is passed the 'reduce'
// calculateDistanceByCellFromFile() method to combine the results.
return files
// extract fixes from each file
.map(BinaryFixesObservable.TO_FIXES)
// use multi cores
.parallel(
calculateDistanceByCellFromFile(new OperatorSumCellDistances()))
// wait for the map to have been filled by the last reporter
.lastOrDefault(new HashMap<Cell, AtomicDouble>())
// report the cell distances for the grid
.lift(emitMapEntries())
// record total nm in metrics
.doOnNext(sumNauticalMiles());
}
private Func1<Observable<Observable<Fix>>, Observable<Map<Cell, AtomicDouble>>> calculateDistanceByCellFromFile(
final OperatorSumCellDistances sumCellDistances) {
return new Func1<Observable<Observable<Fix>>, Observable<Map<Cell, AtomicDouble>>>() {
@Override
public Observable<Map<Cell, AtomicDouble>> call(
Observable<Observable<Fix>> fixesByCraft) {
return fixesByCraft
// for one craft aggregate distance (not a problem with
// SerializedObserver buffering because each file relatively
// small)
.flatMap(toCraftCellAndDistances)
// sum distances into global map
.lift(sumCellDistances);
}
};
}
private final Func1<Observable<Fix>, Observable<CellAndDistance>> toCraftCellAndDistances = new Func1<Observable<Fix>, Observable<CellAndDistance>>() {
@Override
public Observable<CellAndDistance> call(
Observable<Fix> allFixesForASingleCraft) {
return allFixesForASingleCraft
// count fixes
.doOnNext(incrementFixesCount)
// filter on time between startTime and finishTime if exist
.filter(inTimeRange)
// restrict to fixes in filter bounds
.filter(inRegion)
// sort fixes by position time
.toSortedList(COMPARE_FIXES_BY_POSITION_TIME)
// convert list to Observable and flatten
.concatMap(TO_OBSERVABLE)
// keep only positions that pass effective speed
.lift(filterOnEffectiveSpeedOk())
// update metrics with fixes passing effective speed check
.doOnNext(countFixesPassedEffectiveSpeedCheck)
// pair them up again
.buffer(2, 1)
// segments only
.filter(PAIRS_ONLY)
// remove segments with invalid time separation
.filter(timeDifferenceOk)
// calculate distances
.flatMap(toCellAndDistance)
// update counts of cells in each segment
.doOnNext(countSegmentCells);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment