Created
March 18, 2019 10:26
-
-
Save aikoven/b6788f3ce0d146302d1a9d9201576bb7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as fdb from 'foundationdb'; | |
require('segfault-handler').registerHandler(); | |
fdb.setAPIVersion(600); | |
main().catch(err => console.error(err)); | |
async function main() { | |
const db = (await fdb.open()).withKeyEncoding(fdb.encoders.tuple); | |
const streamsSpace = db | |
.at(['streams']) | |
.withValueEncoding(fdb.encoders.string); | |
const countersSpace = db | |
.at(['counters']) | |
.withValueEncoding(fdb.encoders.int32BE); | |
function append(stream: string, eventData: string): Promise<void> { | |
return db.doTransaction(async tn => { | |
const count = await tn.scopedTo(countersSpace).get([stream]); | |
const eventNumber = count == null ? 0 : count + 1; | |
tn.scopedTo(countersSpace).set([stream], eventNumber); | |
tn.scopedTo(streamsSpace).set([stream, eventNumber], eventData); | |
}); | |
} | |
async function* subscribe( | |
stream: string, | |
fromEventNumber: number, | |
): AsyncIterableIterator<{ | |
events: Array<{eventNumber: number; eventData: string}>; | |
isLive: boolean; | |
}> { | |
const eventsSpace = streamsSpace.at([stream]); | |
let start = fdb.keySelector.firstGreaterOrEqual<fdb.TupleItem[]>([ | |
fromEventNumber, | |
]); | |
const end = fdb.keySelector.firstGreaterOrEqual<fdb.TupleItem[]>([ | |
Math.pow(2, 32), | |
]); | |
let iter = 0; | |
while (true) { | |
const {events, more, watch} = await db.doTransaction(async tn => { | |
const {results, more} = await tn | |
.scopedTo(eventsSpace) | |
.getRangeRaw( | |
start, | |
end, | |
0, | |
0, | |
fdb.StreamingMode.Iterator, | |
++iter, | |
false, | |
); | |
const events = results.map(([[eventNumber], eventData]) => ({ | |
eventNumber: eventNumber as number, | |
eventData, | |
})); | |
const watch = more | |
? undefined | |
: tn.scopedTo(countersSpace).watch([stream]); | |
return {events, more, watch}; | |
}); | |
if (events.length > 0) { | |
start = fdb.keySelector.firstGreaterThan([ | |
events[events.length - 1].eventNumber, | |
]); | |
} | |
// cancel watch on break during this yield | |
let continuing = false; | |
try { | |
yield {events, isLive: !more}; | |
continuing = true; | |
} finally { | |
if (!continuing && watch != null) { | |
watch.cancel(); | |
} | |
} | |
if (watch != null) { | |
const success = await watch.promise; | |
if (success) { | |
continue; | |
} else { | |
throw new Error('Watch cancelled'); | |
} | |
} | |
} | |
} | |
async function testWriter() { | |
let i = 0; | |
while (true) { | |
// console.log(`appending event`, i); | |
await append('test-stream', `test-event-${i++}`); | |
await delay(50); | |
} | |
} | |
async function testReader() { | |
for await (const data of subscribe('test-stream', 0)) { | |
// console.log(data); | |
} | |
} | |
const promises: Promise<void>[] = []; | |
for (let i = 0; i < 5; i++) { | |
promises.push(testWriter()); | |
} | |
for (let i = 0; i < 50; i++) { | |
promises.push(testReader()); | |
} | |
await Promise.all(promises); | |
} | |
const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment