Created
June 25, 2014 12:09
-
-
Save davidmoten/e6d630ae2b2c8cf91a34 to your computer and use it in GitHub Desktop.
distance travelled calculator - using sort
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Returns distinct cells and the the total nautical miles travelled in that | |
* cell. Uses RxJava {@link Observable}s to maximize throughput and will | |
* scale to use all available processors and to handle a large number of | |
* files (number of open file handles should be limited by number of | |
* available processors). | |
* | |
* @param files | |
* @return | |
*/ | |
public Observable<CellAndDistance> calculateDistanceByCellFromFiles( | |
Observable<File> files) { | |
// use a map-reduce approach where the parallel method shares out | |
// ('maps') the fixes by craft to multiple threads (number determined by | |
// available processors) and is passed the 'reduce' | |
// calculateDistanceByCellFromFile() method to combine the results. | |
return files | |
// extract fixes from each file | |
.map(BinaryFixesObservable.TO_FIXES) | |
// use multi cores | |
.parallel( | |
calculateDistanceByCellFromFile(new OperatorSumCellDistances())) | |
// wait for the map to have been filled by the last reporter | |
.lastOrDefault(new HashMap<Cell, AtomicDouble>()) | |
// report the cell distances for the grid | |
.lift(emitMapEntries()) | |
// record total nm in metrics | |
.doOnNext(sumNauticalMiles()); | |
} | |
private Func1<Observable<Observable<Fix>>, Observable<Map<Cell, AtomicDouble>>> calculateDistanceByCellFromFile( | |
final OperatorSumCellDistances sumCellDistances) { | |
return new Func1<Observable<Observable<Fix>>, Observable<Map<Cell, AtomicDouble>>>() { | |
@Override | |
public Observable<Map<Cell, AtomicDouble>> call( | |
Observable<Observable<Fix>> fixesByCraft) { | |
return fixesByCraft | |
// for one craft aggregate distance (not a problem with | |
// SerializedObserver buffering because each file relatively | |
// small) | |
.flatMap(toCraftCellAndDistances) | |
// sum distances into global map | |
.lift(sumCellDistances); | |
} | |
}; | |
} | |
private final Func1<Observable<Fix>, Observable<CellAndDistance>> toCraftCellAndDistances = new Func1<Observable<Fix>, Observable<CellAndDistance>>() { | |
@Override | |
public Observable<CellAndDistance> call( | |
Observable<Fix> allFixesForASingleCraft) { | |
return allFixesForASingleCraft | |
// count fixes | |
.doOnNext(incrementFixesCount) | |
// filter on time between startTime and finishTime if exist | |
.filter(inTimeRange) | |
// restrict to fixes in filter bounds | |
.filter(inRegion) | |
// sort fixes by position time | |
.toSortedList(COMPARE_FIXES_BY_POSITION_TIME) | |
// convert list to Observable and flatten | |
.concatMap(TO_OBSERVABLE) | |
// keep only positions that pass effective speed | |
.lift(filterOnEffectiveSpeedOk()) | |
// update metrics with fixes passing effective speed check | |
.doOnNext(countFixesPassedEffectiveSpeedCheck) | |
// pair them up again | |
.buffer(2, 1) | |
// segments only | |
.filter(PAIRS_ONLY) | |
// remove segments with invalid time separation | |
.filter(timeDifferenceOk) | |
// calculate distances | |
.flatMap(toCellAndDistance) | |
// update counts of cells in each segment | |
.doOnNext(countSegmentCells); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment