Skip to content

Instantly share code, notes, and snippets.

@ankurcha
Created December 19, 2016 05:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ankurcha/179e49bf5fef5aee3e92aa0b44246ee2 to your computer and use it in GitHub Desktop.
Save ankurcha/179e49bf5fef5aee3e92aa0b44246ee2 to your computer and use it in GitHub Desktop.
import com.brightcove.rna.model.Events;
import com.brightcove.rna.model.Row;
import io.grpc.Context;
import org.jetbrains.annotations.NotNull;
import org.mapdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class DiskOverflowReducer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(DiskOverflowReducer.class);
private static final long ALLOCATED_SIZE_IN_BYTES = 1024 * 1024 * 10; // 10 MB
private static final DB dbDisk =
DBMaker.tempFileDB()
.concurrencyScale(8)
.closeOnJvmShutdown()
.fileDeleteAfterClose()
.fileLockDisable()
.closeOnJvmShutdownWeakReference()
.fileMmapEnable()
.fileMmapPreclearDisable()
.cleanerHackEnable()
.allocateStartSize(ALLOCATED_SIZE_IN_BYTES * 3)
.allocateIncrement(ALLOCATED_SIZE_IN_BYTES)
.fileChannelEnable()
.make();
private static final DB dbMemory = DBMaker.memoryDirectDB().concurrencyScale(8).make();
private static final Serializer<Row> ROW_SERIALIZER =
new Serializer<Row>() {
@Override
public void serialize(@NotNull DataOutput2 out, @NotNull Row value) throws IOException {
value.writeDelimitedTo(out);
}
@Override
public Row deserialize(@NotNull DataInput2 input, int available) throws IOException {
return Row.parseDelimitedFrom(new DataInput2.DataInputToStream(input));
}
};
static {
dbDisk.defaultSerializerRegisterClass(Row.class);
// register shutdown hooks
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
dbMemory.close();
dbDisk.close();
}));
}
private final HTreeMap<byte[], Row> diskMap;
private final HTreeMap<byte[], Row> aggregationMap;
private final String name;
public DiskOverflowReducer() {
MemoryMonitor.INSTANCE.waitForResources("DiskOverflowReducer");
this.name = UUID.randomUUID().toString();
log.info("Creating new Disk overflow reducer");
this.diskMap =
dbDisk
.hashMap(name + "-onDisk")
.removeCollapsesIndexTreeDisable()
.keySerializer(Serializer.BYTE_ARRAY_DELTA2)
.valueSerializer(ROW_SERIALIZER)
.create();
this.aggregationMap =
dbMemory
.hashMap(name + "-inMemory")
.keySerializer(Serializer.BYTE_ARRAY_DELTA2)
.valueSerializer(ROW_SERIALIZER)
.expireMaxSize(ALLOCATED_SIZE_IN_BYTES)
.expireOverflow(diskMap)
.expireAfterCreate(15, TimeUnit.MINUTES)
.expireExecutor(Executors.newScheduledThreadPool(2))
.create();
}
public DiskOverflowReducer mergeAll(Stream<Row> input) {
input.forEach(
row -> {
if (Context.current().isCancelled()) {
throw new RequestCancelledException();
}
if (!aggregationMap.isClosed()) {
aggregationMap.merge(row.getKey().toByteArray(), row, RowUtil::merge);
}
});
return this;
}
public Stream<Row> values() {
return aggregationMap.getValues().stream();
}
@Override
public void close() throws Exception {
log.info("Closing reducer: aggregationMap={}, diskMap={}", aggregationMap.getSize(), diskMap.getSize());
aggregationMap.clear();
diskMap.clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment