Skip to content

Instantly share code, notes, and snippets.

@arleighdickerson
Created March 11, 2024 19:49
Show Gist options
  • Save arleighdickerson/7d1fd1a31fc8ac381a48afbeb9a2f546 to your computer and use it in GitHub Desktop.
Save arleighdickerson/7d1fd1a31fc8ac381a48afbeb9a2f546 to your computer and use it in GitHub Desktop.
import { Channel, NotUndefined } from '@redux-saga/types';
import { Buffer, channel, EventChannel, Saga } from 'redux-saga';
import * as effects from 'redux-saga/effects';
import _ from 'lodash';
import { cast, isInteger } from 'src/util/invariant';
export type THandleRequest<T extends NotUndefined> = Saga<[Channel<T>]>;
export type TCreatePipe<T extends NotUndefined> = Saga<[EventChannel<T>, Saga<[Channel<T>]>]>;
export interface CreatePipeOptions {
threads: number;
buffer?: Buffer<any>;
}
export default function* createPipe<T extends NotUndefined>(
source: EventChannel<T>,
handleRequest: THandleRequest<T>,
options: CreatePipeOptions = { threads: 1 },
) {
const threads = cast(isInteger).value(options.threads);
const dest: Channel<T> = yield effects.call(channel, options.buffer);
for (let i = 0; i < threads; i += 1) {
yield effects.fork(handleRequest, dest);
}
while (true) {
try {
const payload: unknown = yield effects.take(source);
yield effects.put(dest, payload);
} catch (e) {
if (process.env.NODE_ENV === 'development') {
console.error('[util/createPipe] (main loop)', e);
}
throw e;
} finally {
const value: unknown = yield effects.cancelled();
const cancelled: boolean = cast(_.isBoolean).value(value);
if (cancelled) {
source.close();
dest.close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment