Skip to content

Instantly share code, notes, and snippets.

@e3oroush
Last active July 13, 2019 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e3oroush/6807e4de573ddce67a335b8b3f157e36 to your computer and use it in GitHub Desktop.
Save e3oroush/6807e4de573ddce67a335b8b3f157e36 to your computer and use it in GitHub Desktop.
Data stream Processing
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set time characteristic to be event timestamp
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// assign fictious stream of data as our data source and extract timestamp field
DataStream<Sensor> inputStream = env.addSource(new TemperatureSensor())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Sensor>() {
@Override
public long extractAscendingTimestamp(Sensor sensor) {
return sensor.getTimestamp();
}
});
DataStream<Double> temps = inputStream.map(new MapFunction<Sensor, Double>() {
@Override
public Double map(Sensor sensor) throws Exception {
return sensor.getTemperature();
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment