Last active
February 8, 2016 18:40
-
-
Save ndimiduk/18820fcd78412c6b4fc3 to your computer and use it in GitHub Desktop.
Flink: Use an OutputFormat in place of a SinkFunction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Use an {@link OutputFormat} in place of a {@link SinkFunction}. */ | |
public class OutputFormatAdapter<T> extends RichSinkFunction<T> implements InputTypeConfigurable { | |
private final OutputFormat<T> delegate; | |
private final RichOutputFormat<T> richDelegate; | |
public OutputFormatAdapter(OutputFormat<T> delegate) { | |
this.delegate = delegate; | |
this.richDelegate = (this.delegate instanceof RichOutputFormat) | |
? (RichOutputFormat<T>) delegate | |
: null; | |
} | |
@Override public void setRuntimeContext(RuntimeContext t) { | |
if (richDelegate != null) { | |
richDelegate.setRuntimeContext(t); | |
} else { | |
super.setRuntimeContext(t); | |
} | |
} | |
@Override public RuntimeContext getRuntimeContext() { | |
if (richDelegate != null) { | |
return richDelegate.getRuntimeContext(); | |
} else { | |
return super.getRuntimeContext(); | |
} | |
} | |
@Override public void invoke(T t) throws Exception { | |
delegate.writeRecord(t); | |
} | |
@Override public void open(Configuration conf) throws IOException { | |
RuntimeContext context = getRuntimeContext(); | |
delegate.configure(conf); | |
int indexInSubtaskGroup = context.getIndexOfThisSubtask(); | |
int currentNumberOfSubtasks = context.getNumberOfParallelSubtasks(); | |
delegate.open(indexInSubtaskGroup, currentNumberOfSubtasks); | |
} | |
@Override public void close() throws IOException { | |
delegate.close(); | |
} | |
@Override | |
public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) { | |
if (delegate instanceof InputTypeConfigurable) { | |
((InputTypeConfigurable) delegate).setInputType(typeInformation, executionConfig); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment