Last active
December 24, 2015 07:58
-
-
Save matthayes/6766909 to your computer and use it in GitHub Desktop.
DataFu's Hourglass: Example 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type" : "record", "name" : "ExampleEvent", | |
"namespace" : "datafu.hourglass.test", | |
"fields" : [ { | |
"name" : "id", | |
"type" : "long", | |
"doc" : "ID" | |
} ] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.setCombinerAccumulator(job.getReducerAccumulator()); | |
job.setUseCombiner(true); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.run(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"key": {"member_id": 1}, "value": {"count": 5}} | |
{"key": {"member_id": 2}, "value": {"count": 3}} | |
{"key": {"member_id": 3}, "value": {"count": 3}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2013/03/17: | |
{"id": 1}, {"id": 1}, {"id": 2}, {"id": 2}, {"id": 2}, | |
{"id": 3}, {"id": 3} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"key": {"member_id": 1}, "value": {"count": 7}} | |
{"key": {"member_id": 2}, "value": {"count": 6}} | |
{"key": {"member_id": 3}, "value": {"count": 5}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2013/03/15: | |
{"id": 1}, {"id": 1}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 3} | |
2013/03/16: | |
{"id": 1}, {"id": 1}, {"id": 2}, {"id": 2}, {"id": 3}, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PartitionCollapsingIncrementalJob job = | |
new PartitionCollapsingIncrementalJob(Example.class); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final String namespace = "com.example"; | |
final Schema keySchema = | |
Schema.createRecord("Key",null,namespace,false); | |
keySchema.setFields(Arrays.asList( | |
new Field("member_id",Schema.create(Type.LONG),null,null))); | |
final String keySchemaString = keySchema.toString(true); | |
final Schema valueSchema = | |
Schema.createRecord("Value",null,namespace,false); | |
valueSchema.setFields(Arrays.asList( | |
new Field("count",Schema.create(Type.INT),null,null))); | |
final String valueSchemaString = valueSchema.toString(true); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type" : "record", "name" : "Key", "namespace" : "com.example", | |
"fields" : [ { | |
"name" : "member_id", | |
"type" : "long" | |
} ] | |
} | |
{ | |
"type" : "record", "name" : "Value", "namespace" : "com.example", | |
"fields" : [ { | |
"name" : "count", | |
"type" : "int" | |
} ] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.setKeySchema(keySchema); | |
job.setIntermediateValueSchema(valueSchema); | |
job.setOutputValueSchema(valueSchema); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.setInputPaths(Arrays.asList(new Path("/data/event"))); | |
job.setOutputPath(new Path("/output")); | |
job.setReusePreviousOutput(true); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.setMapper(new Mapper<GenericRecord,GenericRecord,GenericRecord>() | |
{ | |
private transient Schema kSchema; | |
private transient Schema vSchema; | |
@Override | |
public void map( | |
GenericRecord input, | |
KeyValueCollector<GenericRecord, GenericRecord> collector) | |
throws IOException, InterruptedException | |
{ | |
if (kSchema == null) | |
kSchema = new Schema.Parser().parse(keySchemaString); | |
if (vSchema == null) | |
vSchema = new Schema.Parser().parse(valueSchemaString); | |
GenericRecord key = new GenericData.Record(kSchema); | |
key.put("member_id", input.get("id")); | |
GenericRecord value = new GenericData.Record(vSchema); | |
value.put("count", 1); | |
collector.collect(key,value); | |
} | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job.setReducerAccumulator(new Accumulator<GenericRecord,GenericRecord>() | |
{ | |
private transient int count; | |
private transient Schema vSchema; | |
@Override | |
public void accumulate(GenericRecord value) { | |
this.count += (Integer)value.get("count"); | |
} | |
@Override | |
public GenericRecord getFinal() { | |
if (vSchema == null) | |
vSchema = new Schema.Parser().parse(valueSchemaString); | |
GenericRecord output = new GenericData.Record(vSchema); | |
output.put("count", count); | |
return output; | |
} | |
@Override | |
public void cleanup() { | |
this.count = 0; | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment