Updating the Prediction Model
// process call left out for clarity; it places airline data
// in a list, storing in state store by airport code
public void punctuate(long timestamp) {
KeyValueIterator<String, List<String>> allFlights = flights.all();
while (allFlights.hasNext()) {
KeyValue<String, List<String>> kv =;
List<String> flightList = kv.value;
String key = kv.key;
if(flightList.size() >= 100) {
try {
byte[] serializedRegression = ModelBuilder.train(flightList);
context().forward(key, serializedRegression);"updating model for {}", key);
flights.put(key, flightList);
} catch (Exception e) {
LOG.error("couldn't update online regression for {}",key, e);
