Last active
December 24, 2015 07:59
-
-
Save matthayes/6767030 to your computer and use it in GitHub Desktop.
DataFu's Hourglass: Example 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Mapper<GenericRecord,GenericRecord,GenericRecord> mapper = | |
new Mapper<GenericRecord,GenericRecord,GenericRecord>() { | |
private transient Schema kSchema; | |
private transient Schema vSchema; | |
@Override | |
public void map( | |
GenericRecord input, | |
KeyValueCollector<GenericRecord, GenericRecord> collector) | |
throws IOException, InterruptedException | |
{ | |
if (kSchema == null) | |
kSchema = new Schema.Parser().parse(keySchemaString); | |
if (vSchema == null) | |
vSchema = new Schema.Parser().parse(valueSchemaString); | |
GenericRecord key = new GenericData.Record(kSchema); | |
key.put("name", "member_count"); | |
GenericRecord value = new GenericData.Record(vSchema); | |
value.put("data",input.get("id")); // member id | |
value.put("count", 1L); // just a single member | |
collector.collect(key,value); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Accumulator<GenericRecord,GenericRecord> accumulator = | |
new Accumulator<GenericRecord,GenericRecord>() { | |
private transient HyperLogLogPlus estimator; | |
private transient Schema vSchema; | |
@Override | |
public void accumulate(GenericRecord value) | |
{ | |
if (estimator == null) estimator = new HyperLogLogPlus(20); | |
Object data = value.get("data"); | |
if (data instanceof Long) | |
{ | |
estimator.offer(data); | |
} | |
else if (data instanceof ByteBuffer) | |
{ | |
ByteBuffer bytes = (ByteBuffer)data; | |
HyperLogLogPlus newEstimator; | |
try | |
{ | |
newEstimator = | |
HyperLogLogPlus.Builder.build(bytes.array()); | |
estimator = | |
(HyperLogLogPlus)estimator.merge(newEstimator); | |
} | |
catch (IOException e) | |
{ | |
throw new RuntimeException(e); | |
} | |
catch (CardinalityMergeException e) | |
{ | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
@Override | |
public GenericRecord getFinal() | |
{ | |
if (vSchema == null) | |
vSchema = new Schema.Parser().parse(valueSchemaString); | |
GenericRecord output = new GenericData.Record(vSchema); | |
try | |
{ | |
ByteBuffer bytes = | |
ByteBuffer.wrap(estimator.getBytes()); | |
output.put("data", bytes); | |
output.put("count", estimator.cardinality()); | |
} | |
catch (IOException e) | |
{ | |
throw new RuntimeException(e); | |
} | |
return output; | |
} | |
@Override | |
public void cleanup() | |
{ | |
estimator = null; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PartitionCollapsingIncrementalJob job2 = | |
new PartitionCollapsingIncrementalJob(Example.class); | |
// ... | |
job2.setNumDays(30); // 30 day sliding window |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment