Skip to content

Instantly share code, notes, and snippets.

@abh006
Last active June 18, 2022 05:59
Show Gist options
  • Save abh006/6c8cd44803b7f24f7fb58caffb5f7d04 to your computer and use it in GitHub Desktop.
Save abh006/6c8cd44803b7f24f7fb58caffb5f7d04 to your computer and use it in GitHub Desktop.
RxJS pipeline with zero backpressure.
import { map, Subject, tap } from "rxjs";
function getBalancedPipeline<T>(
generator: AsyncGenerator<T, void, void>,
pipeline = new Subject<T>()
) {
pipeline.subscribe({
next: async () => {
const nextInput = await generator.next();
if (!nextInput.done) {
pipeline.next(nextInput.value);
} else {
console.log("Generator exhausted");
}
},
});
return pipeline;
}
async function* getData(): AsyncGenerator<string, void, void> {
const values = ["one", "two", "three", "four"];
for (const value of values) {
yield value;
}
}
(async () => {
const generator = getData();
const pipeline = await getBalancedPipeline(generator);
pipeline
.pipe(
map((value: string) => {
return value.toUpperCase();
}),
tap((value) => console.log(value))
)
.subscribe();
const initial = await generator.next();
if (!initial.done) {
pipeline.next(initial.value);
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment