Skip to content

Instantly share code, notes, and snippets.

@fieldju
Last active September 21, 2018 19:26
Show Gist options
  • Save fieldju/a0e698344f85e0284a77cabeee34986e to your computer and use it in GitHub Desktop.
Save fieldju/a0e698344f85e0284a77cabeee34986e to your computer and use it in GitHub Desktop.
protected List<Double> getTimeSeriesDataFromChannelMessages(List<ChannelMessage> channelMessages) {
// TODO If an error message is present at all does that mean everything is garbage?
channelMessages.parallelStream().filter(channelMessage -> channelMessage.getType().equals(ERROR_MESSAGE))
.findAny()
.ifPresent(error -> {
ChannelMessage.ErrorMessage errorMessage = (ChannelMessage.ErrorMessage) error;
// TODO How do you get the errors from these objects?
List<Object> errros = errorMessage.getErrors();
// TODO this error message sucks, how do we add more context to it, ex: get error messages from above?
throw new RuntimeException("Some sort of error occurred, when executing the signal flow program");
});
return channelMessages.parallelStream()
.filter(channelMessage -> channelMessage.getType().equals(DATA_MESSAGE))
.map(message -> {
ChannelMessage.DataMessage dataMessage = (ChannelMessage.DataMessage) message;
Map<String, Number> data = dataMessage.getData();
if (data.size() > 1) {
throw new IllegalStateException("There was more than one value for a given timestamp, an " +
"aggregation method should have been applied to the signal flow program");
}
return data.size() == 1 ? data.values().stream().findFirst().get().doubleValue() : Double.NaN;
}).collect(Collectors.toList());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment