Skip to content

Instantly share code, notes, and snippets.

@aikoven
Created March 18, 2019 10:26
Show Gist options
  • Save aikoven/b6788f3ce0d146302d1a9d9201576bb7 to your computer and use it in GitHub Desktop.
Save aikoven/b6788f3ce0d146302d1a9d9201576bb7 to your computer and use it in GitHub Desktop.
import * as fdb from 'foundationdb';
require('segfault-handler').registerHandler();
fdb.setAPIVersion(600);
main().catch(err => console.error(err));
async function main() {
const db = (await fdb.open()).withKeyEncoding(fdb.encoders.tuple);
const streamsSpace = db
.at(['streams'])
.withValueEncoding(fdb.encoders.string);
const countersSpace = db
.at(['counters'])
.withValueEncoding(fdb.encoders.int32BE);
function append(stream: string, eventData: string): Promise<void> {
return db.doTransaction(async tn => {
const count = await tn.scopedTo(countersSpace).get([stream]);
const eventNumber = count == null ? 0 : count + 1;
tn.scopedTo(countersSpace).set([stream], eventNumber);
tn.scopedTo(streamsSpace).set([stream, eventNumber], eventData);
});
}
async function* subscribe(
stream: string,
fromEventNumber: number,
): AsyncIterableIterator<{
events: Array<{eventNumber: number; eventData: string}>;
isLive: boolean;
}> {
const eventsSpace = streamsSpace.at([stream]);
let start = fdb.keySelector.firstGreaterOrEqual<fdb.TupleItem[]>([
fromEventNumber,
]);
const end = fdb.keySelector.firstGreaterOrEqual<fdb.TupleItem[]>([
Math.pow(2, 32),
]);
let iter = 0;
while (true) {
const {events, more, watch} = await db.doTransaction(async tn => {
const {results, more} = await tn
.scopedTo(eventsSpace)
.getRangeRaw(
start,
end,
0,
0,
fdb.StreamingMode.Iterator,
++iter,
false,
);
const events = results.map(([[eventNumber], eventData]) => ({
eventNumber: eventNumber as number,
eventData,
}));
const watch = more
? undefined
: tn.scopedTo(countersSpace).watch([stream]);
return {events, more, watch};
});
if (events.length > 0) {
start = fdb.keySelector.firstGreaterThan([
events[events.length - 1].eventNumber,
]);
}
// cancel watch on break during this yield
let continuing = false;
try {
yield {events, isLive: !more};
continuing = true;
} finally {
if (!continuing && watch != null) {
watch.cancel();
}
}
if (watch != null) {
const success = await watch.promise;
if (success) {
continue;
} else {
throw new Error('Watch cancelled');
}
}
}
}
async function testWriter() {
let i = 0;
while (true) {
// console.log(`appending event`, i);
await append('test-stream', `test-event-${i++}`);
await delay(50);
}
}
async function testReader() {
for await (const data of subscribe('test-stream', 0)) {
// console.log(data);
}
}
const promises: Promise<void>[] = [];
for (let i = 0; i < 5; i++) {
promises.push(testWriter());
}
for (let i = 0; i < 50; i++) {
promises.push(testReader());
}
await Promise.all(promises);
}
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment