Skip to content

Instantly share code, notes, and snippets.

@imperatorx
Created November 30, 2022 09:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imperatorx/8c22af7ba741b90e82fdd6279786ce31 to your computer and use it in GitHub Desktop.
Save imperatorx/8c22af7ba741b90e82fdd6279786ce31 to your computer and use it in GitHub Desktop.
import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.directory.DirectoryLayer;
import com.apple.foundationdb.directory.DirectorySubspace;
import com.apple.foundationdb.tuple.Tuple;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.LongStream;
public class RedwoodReproducer {
public static void main(String[] args) {
int deleteInterval = 20;
int targetSeconds;
if (args.length > 0) {
targetSeconds = Integer.parseInt(args[0]);
} else {
targetSeconds = 3600;
}
int targetCount;
if (args.length > 1) {
targetCount = Integer.parseInt(args[1]);
} else {
targetCount = 50_000_000;
}
System.out.println("Trying to reach a constant size of " + targetCount + " records in " + targetSeconds + " seconds.");
int threads;
if (args.length > 2) {
threads = Integer.parseInt(args[2]);
} else {
threads = 10;
}
int categoryCount;
if (args.length > 3) {
categoryCount = Integer.parseInt(args[3]);
} else {
categoryCount = 1000;
}
double perSecond = targetCount / (double) targetSeconds;
Database database = FDB.selectAPIVersion(710)
.open();
DirectorySubspace directory = database.run(tr -> {
var dir = DirectoryLayer.getDefault()
.createOrOpen(tr, List.of("redwood-key-test"))
.join();
tr.clear(dir.range());
return dir;
});
int perThreadPerSecond = (int) (perSecond / threads);
long[] categoryIds = LongStream.generate(() -> ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE))
.limit(categoryCount)
.toArray();
generate(database, directory, threads, perThreadPerSecond, categoryIds);
while (true) {
database.run(tr -> {
long cutoff = System.currentTimeMillis() - (targetSeconds * 1000L);
for (long id : categoryIds) {
byte[] begin = directory.pack(Tuple.from(id, 0L));
byte[] end = directory.pack(Tuple.from(id, cutoff));
System.out.println("CLEAR FROM " + Arrays.toString(begin));
System.out.println("CLEAR TO " + Arrays.toString(end));
tr.clear(new Range(begin, end));
}
return null;
});
try {
Thread.sleep(deleteInterval * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private static void generate(Database database, DirectorySubspace directory, int threads, int perThreadPerSecond, long[] categoryIds) {
for (int i = 0; i < threads; i++) {
new Thread(() -> generate(database, directory, perThreadPerSecond, categoryIds)).start();
}
}
private static void generate(Database database, DirectorySubspace directory, int perSecond, long[] categoryIds) {
long elapsed = 0;
while (true) {
long startedAt = System.currentTimeMillis();
database.run(tr -> {
// Key : (directoryprefix).(long - item identifier).(long - timestamp truncated to seconds).(long - random)
// Value: 52 bytes, content does not matter
for (int i = 0; i < perSecond; i++) {
long id = categoryIds[ThreadLocalRandom.current().nextInt(categoryIds.length)];
long now = (System.currentTimeMillis() / 1000L) * 1000L;
var tuple = Tuple.from(id, now, ThreadLocalRandom.current().nextLong());
byte[] key = directory.pack(tuple);
byte[] value = new byte[52];
tr.set(key, value);
}
return null;
});
long endedAt = System.currentTimeMillis();
elapsed = endedAt - startedAt;
long sleep = 1001 - elapsed;
if (sleep < 0) {
sleep = 0;
}
try {
Thread.sleep(sleep);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment