Last active
June 2, 2018 14:46
-
-
Save garvitlnmiit/10db9d4b6eb41135332fba13d908e36c to your computer and use it in GitHub Desktop.
Flink Process Element
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DataStream stream = ...; // Kafka stream consumer08 (version) | |
stream.keyBy("personId"); | |
stream.process(new ProcessAlerts()); | |
/** | |
* The data type stored in the state | |
*/ | |
public class PersonDetail { | |
public int personId; | |
public long count; | |
public long lastModified; | |
} | |
@Slf4j | |
public class ProcessAlerts extends ProcessFunction<PersonStream, Object> { | |
private ValueState<PersonDetail> state; | |
@Override | |
public void open(Configuration config) { | |
ValueStateDescriptor<PersonDetail> descriptor = new ValueStateDescriptor<>("personState", TypeInformation.of(new TypeHint<PersonDetail>() {})); | |
state = getRuntimeContext().getState(descriptor); | |
} | |
public void processElement(PersonStream ele, Context ctx, Collector<Object> out) { | |
try { | |
PersonDetail currentState = state.value(); | |
currentState.count++; | |
currentState.lastModified = System.currentTimeMillis(); | |
state.update(currentState); | |
} catch (e) { | |
log.error("Error occurred while processing payload {}, Exception : {}", ele, e); | |
log.error("{}", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment