Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:11
Show Gist options
  • Save smaldini/119fdd7ddfe249babf97 to your computer and use it in GitHub Desktop.
Save smaldini/119fdd7ddfe249babf97 to your computer and use it in GitHub Desktop.
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> {
@Override
public Stream<SensorSummary> process(Stream<SensorData> inputStream) {
return inputStream
.buffer(5, 20, TimeUnit.SECONDS)
//would be better to convert to stream of double 'values' and then have generic avg for type safety.
.map( buffer ->
new SensorSummary(
avg(buffer, "value"),
System.currentTimeMillis()
)
);
}
private double avg(List<?> objectList, String propertyName) {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression(propertyName);
double sum = 0;
double count = 0;
for(Object obj : objectList) {
count++;
sum += exp.getValue(obj, Double.class);
}
return sum/count;
}
//older stuff with specific domain objects used in calculation....
public Stream<SensorSummary> processOld(Stream<SensorData> inputStream) {
return inputStream
.buffer(5, 20, TimeUnit.SECONDS)
.map(getAverage());
}
private Function<List<SensorData>, SensorSummary> getAverage() {
return next -> {
long sum = 0;
long count = 0;
for (SensorData sensorData : next) {
sum += sensorData.getValue();
count++;
}
return new SensorSummary(sum / count);
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment