Skip to content

Instantly share code, notes, and snippets.

@e3oroush
Created July 13, 2019 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e3oroush/9cbd87d7e9bab649f18a3ab5119ab410 to your computer and use it in GitHub Desktop.
Save e3oroush/9cbd87d7e9bab649f18a3ab5119ab410 to your computer and use it in GitHub Desktop.
Source of random data stream
package mypackage.sources;
import mypackage.events.Sensor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.concurrent.ThreadLocalRandom;
public class TemperatureSensor extends RichSourceFunction<Sensor> {
private boolean running = true;
@Override
public void run(SourceContext<Sensor> sourceContext) throws Exception {
while (this.running) {
final ThreadLocalRandom random = ThreadLocalRandom.current();
long timestamp = System.currentTimeMillis();
Sensor sensor = new Sensor(random.nextInt(0, 10), random.nextDouble(50, 80), timestamp);
// put generated sensor data to the queue
sourceContext.collect(sensor);
// sleep every one second after generating the fictional sensor data
Thread.sleep(100);
}
}
@Override
public void cancel() {
this.running = false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment