Skip to content

Instantly share code, notes, and snippets.

@matthayes
Last active December 24, 2015 07:58
Show Gist options
  • Save matthayes/6766909 to your computer and use it in GitHub Desktop.
Save matthayes/6766909 to your computer and use it in GitHub Desktop.
DataFu's Hourglass: Example 1
{
"type" : "record", "name" : "ExampleEvent",
"namespace" : "datafu.hourglass.test",
"fields" : [ {
"name" : "id",
"type" : "long",
"doc" : "ID"
} ]
}
job.setCombinerAccumulator(job.getReducerAccumulator());
job.setUseCombiner(true);
{"key": {"member_id": 1}, "value": {"count": 5}}
{"key": {"member_id": 2}, "value": {"count": 3}}
{"key": {"member_id": 3}, "value": {"count": 3}}
2013/03/17:
{"id": 1}, {"id": 1}, {"id": 2}, {"id": 2}, {"id": 2},
{"id": 3}, {"id": 3}
{"key": {"member_id": 1}, "value": {"count": 7}}
{"key": {"member_id": 2}, "value": {"count": 6}}
{"key": {"member_id": 3}, "value": {"count": 5}}
2013/03/15:
{"id": 1}, {"id": 1}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 3}
2013/03/16:
{"id": 1}, {"id": 1}, {"id": 2}, {"id": 2}, {"id": 3},
PartitionCollapsingIncrementalJob job =
new PartitionCollapsingIncrementalJob(Example.class);
final String namespace = "com.example";
final Schema keySchema =
Schema.createRecord("Key",null,namespace,false);
keySchema.setFields(Arrays.asList(
new Field("member_id",Schema.create(Type.LONG),null,null)));
final String keySchemaString = keySchema.toString(true);
final Schema valueSchema =
Schema.createRecord("Value",null,namespace,false);
valueSchema.setFields(Arrays.asList(
new Field("count",Schema.create(Type.INT),null,null)));
final String valueSchemaString = valueSchema.toString(true);
{
"type" : "record", "name" : "Key", "namespace" : "com.example",
"fields" : [ {
"name" : "member_id",
"type" : "long"
} ]
}
{
"type" : "record", "name" : "Value", "namespace" : "com.example",
"fields" : [ {
"name" : "count",
"type" : "int"
} ]
}
job.setKeySchema(keySchema);
job.setIntermediateValueSchema(valueSchema);
job.setOutputValueSchema(valueSchema);
job.setInputPaths(Arrays.asList(new Path("/data/event")));
job.setOutputPath(new Path("/output"));
job.setReusePreviousOutput(true);
job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>()
{
private transient Schema kSchema;
private transient Schema vSchema;
@Override
public void map(
GenericRecord input,
KeyValueCollector<GenericRecord, GenericRecord> collector)
throws IOException, InterruptedException
{
if (kSchema == null)
kSchema = new Schema.Parser().parse(keySchemaString);
if (vSchema == null)
vSchema = new Schema.Parser().parse(valueSchemaString);
GenericRecord key = new GenericData.Record(kSchema);
key.put("member_id", input.get("id"));
GenericRecord value = new GenericData.Record(vSchema);
value.put("count", 1);
collector.collect(key,value);
}
});
job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>()
{
private transient int count;
private transient Schema vSchema;
@Override
public void accumulate(GenericRecord value) {
this.count += (Integer)value.get("count");
}
@Override
public GenericRecord getFinal() {
if (vSchema == null)
vSchema = new Schema.Parser().parse(valueSchemaString);
GenericRecord output = new GenericData.Record(vSchema);
output.put("count", count);
return output;
}
@Override
public void cleanup() {
this.count = 0;
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment