Skip to content

Instantly share code, notes, and snippets.

@matthayes
Last active December 24, 2015 07:59
Show Gist options
  • Save matthayes/6767030 to your computer and use it in GitHub Desktop.
Save matthayes/6767030 to your computer and use it in GitHub Desktop.
DataFu's Hourglass: Example 2
Mapper<GenericRecord,GenericRecord,GenericRecord> mapper =
new Mapper<GenericRecord,GenericRecord,GenericRecord>() {
private transient Schema kSchema;
private transient Schema vSchema;
@Override
public void map(
GenericRecord input,
KeyValueCollector<GenericRecord, GenericRecord> collector)
throws IOException, InterruptedException
{
if (kSchema == null)
kSchema = new Schema.Parser().parse(keySchemaString);
if (vSchema == null)
vSchema = new Schema.Parser().parse(valueSchemaString);
GenericRecord key = new GenericData.Record(kSchema);
key.put("name", "member_count");
GenericRecord value = new GenericData.Record(vSchema);
value.put("data",input.get("id")); // member id
value.put("count", 1L); // just a single member
collector.collect(key,value);
}
};
Accumulator<GenericRecord,GenericRecord> accumulator =
new Accumulator<GenericRecord,GenericRecord>() {
private transient HyperLogLogPlus estimator;
private transient Schema vSchema;
@Override
public void accumulate(GenericRecord value)
{
if (estimator == null) estimator = new HyperLogLogPlus(20);
Object data = value.get("data");
if (data instanceof Long)
{
estimator.offer(data);
}
else if (data instanceof ByteBuffer)
{
ByteBuffer bytes = (ByteBuffer)data;
HyperLogLogPlus newEstimator;
try
{
newEstimator =
HyperLogLogPlus.Builder.build(bytes.array());
estimator =
(HyperLogLogPlus)estimator.merge(newEstimator);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
catch (CardinalityMergeException e)
{
throw new RuntimeException(e);
}
}
}
@Override
public GenericRecord getFinal()
{
if (vSchema == null)
vSchema = new Schema.Parser().parse(valueSchemaString);
GenericRecord output = new GenericData.Record(vSchema);
try
{
ByteBuffer bytes =
ByteBuffer.wrap(estimator.getBytes());
output.put("data", bytes);
output.put("count", estimator.cardinality());
}
catch (IOException e)
{
throw new RuntimeException(e);
}
return output;
}
@Override
public void cleanup()
{
estimator = null;
}
};
PartitionCollapsingIncrementalJob job2 =
new PartitionCollapsingIncrementalJob(Example.class);
// ...
job2.setNumDays(30); // 30 day sliding window
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment