Skip to content

Instantly share code, notes, and snippets.

@maxiwu
Created August 14, 2018 03:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxiwu/9458e10c6c1d58918242a30ad1290380 to your computer and use it in GitHub Desktop.
Save maxiwu/9458e10c6c1d58918242a30ad1290380 to your computer and use it in GitHub Desktop.
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment