Skip to content

Instantly share code, notes, and snippets.

@jlewi
Created January 19, 2015 19:20
Show Gist options
  • Save jlewi/f1cd323dc88bd58601ef to your computer and use it in GitHub Desktop.
Save jlewi/f1cd323dc88bd58601ef to your computer and use it in GitHub Desktop.
A DoFn for wrapping AvroMapper
public static class AvroMapperDoFn<
MAPPER extends AvroMapper<I, Pair<OUT_KEY, OUT_VALUE>>, I, OUT_KEY, OUT_VALUE>
extends DoFn<I, KV<OUT_KEY, OUT_VALUE>> {
private Class avroMapperClass;
private byte[] jobConfBytes;
transient MAPPER mapper;
transient DataflowAvroCollector collector;
transient Reporter reporter;
transient JobConf jobConf;
/**
* A wrapper for AvroCollector that will emit the values using Dataflow.
* @param <TYPE>
*/
protected class DataflowAvroCollector extends
AvroCollector<Pair<OUT_KEY, OUT_VALUE>> {
public DoFn<I, KV<OUT_KEY, OUT_VALUE>>.ProcessContext c;
@Override
public void collect(Pair<OUT_KEY, OUT_VALUE> p) {
c.output(KV.of(p.key(), p.value()));
}
}
/**
*
* @param avroMapperClass The class for the mapper.
*/
public AvroMapperDoFn(Class avroMapperClass, JobConf conf) {
this.avroMapperClass = avroMapperClass;
this.jobConf = conf;
this.jobConfBytes = serializeJobConf(conf);
this.collector = new DataflowAvroCollector();
this.reporter = new Reporter();
}
@Override
public void startBundle(DoFn.Context c) {
try {
mapper = (MAPPER) avroMapperClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
jobConf = deserializeJobConf(jobConfBytes);
mapper.configure(jobConf);
}
@Override
public void processElement(
DoFn<I, KV<OUT_KEY, OUT_VALUE>>.ProcessContext c) throws Exception {
collector.c = c;
I input = c.element();
mapper.map(input, collector, reporter);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment