Skip to content

Instantly share code, notes, and snippets.

@ndimiduk
Last active February 8, 2016 18:40
Show Gist options
  • Save ndimiduk/18820fcd78412c6b4fc3 to your computer and use it in GitHub Desktop.
Save ndimiduk/18820fcd78412c6b4fc3 to your computer and use it in GitHub Desktop.
Flink: Use an OutputFormat in place of a SinkFunction
/** Use an {@link OutputFormat} in place of a {@link SinkFunction}. */
public class OutputFormatAdapter<T> extends RichSinkFunction<T> implements InputTypeConfigurable {
private final OutputFormat<T> delegate;
private final RichOutputFormat<T> richDelegate;
public OutputFormatAdapter(OutputFormat<T> delegate) {
this.delegate = delegate;
this.richDelegate = (this.delegate instanceof RichOutputFormat)
? (RichOutputFormat<T>) delegate
: null;
}
@Override public void setRuntimeContext(RuntimeContext t) {
if (richDelegate != null) {
richDelegate.setRuntimeContext(t);
} else {
super.setRuntimeContext(t);
}
}
@Override public RuntimeContext getRuntimeContext() {
if (richDelegate != null) {
return richDelegate.getRuntimeContext();
} else {
return super.getRuntimeContext();
}
}
@Override public void invoke(T t) throws Exception {
delegate.writeRecord(t);
}
@Override public void open(Configuration conf) throws IOException {
RuntimeContext context = getRuntimeContext();
delegate.configure(conf);
int indexInSubtaskGroup = context.getIndexOfThisSubtask();
int currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
delegate.open(indexInSubtaskGroup, currentNumberOfSubtasks);
}
@Override public void close() throws IOException {
delegate.close();
}
@Override
public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
if (delegate instanceof InputTypeConfigurable) {
((InputTypeConfigurable) delegate).setInputType(typeInformation, executionConfig);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment