Last active
August 29, 2015 14:11
-
-
Save smaldini/119fdd7ddfe249babf97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> { | |
@Override | |
public Stream<SensorSummary> process(Stream<SensorData> inputStream) { | |
return inputStream | |
.buffer(5, 20, TimeUnit.SECONDS) | |
//would be better to convert to stream of double 'values' and then have generic avg for type safety. | |
.map( buffer -> | |
new SensorSummary( | |
avg(buffer, "value"), | |
System.currentTimeMillis() | |
) | |
); | |
} | |
private double avg(List<?> objectList, String propertyName) { | |
ExpressionParser parser = new SpelExpressionParser(); | |
Expression exp = parser.parseExpression(propertyName); | |
double sum = 0; | |
double count = 0; | |
for(Object obj : objectList) { | |
count++; | |
sum += exp.getValue(obj, Double.class); | |
} | |
return sum/count; | |
} | |
//older stuff with specific domain objects used in calculation.... | |
public Stream<SensorSummary> processOld(Stream<SensorData> inputStream) { | |
return inputStream | |
.buffer(5, 20, TimeUnit.SECONDS) | |
.map(getAverage()); | |
} | |
private Function<List<SensorData>, SensorSummary> getAverage() { | |
return next -> { | |
long sum = 0; | |
long count = 0; | |
for (SensorData sensorData : next) { | |
sum += sensorData.getValue(); | |
count++; | |
} | |
return new SensorSummary(sum / count); | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment