Skip to content

Instantly share code, notes, and snippets.

@garvitlnmiit
Last active June 2, 2018 14:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garvitlnmiit/10db9d4b6eb41135332fba13d908e36c to your computer and use it in GitHub Desktop.
Save garvitlnmiit/10db9d4b6eb41135332fba13d908e36c to your computer and use it in GitHub Desktop.
Flink Process Element
DataStream stream = ...; // Kafka stream consumer08 (version)
stream.keyBy("personId");
stream.process(new ProcessAlerts());
/**
* The data type stored in the state
*/
public class PersonDetail {
public int personId;
public long count;
public long lastModified;
}
@Slf4j
public class ProcessAlerts extends ProcessFunction<PersonStream, Object> {
private ValueState<PersonDetail> state;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PersonDetail> descriptor = new ValueStateDescriptor<>("personState", TypeInformation.of(new TypeHint<PersonDetail>() {}));
state = getRuntimeContext().getState(descriptor);
}
public void processElement(PersonStream ele, Context ctx, Collector<Object> out) {
try {
PersonDetail currentState = state.value();
currentState.count++;
currentState.lastModified = System.currentTimeMillis();
state.update(currentState);
} catch (e) {
log.error("Error occurred while processing payload {}, Exception : {}", ele, e);
log.error("{}", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment