Skip to content

Instantly share code, notes, and snippets.

@iamssen
Last active May 6, 2021 00:09
Show Gist options
  • Save iamssen/85df7be9c7b93fa204f5af74e42e1628 to your computer and use it in GitHub Desktop.
Save iamssen/85df7be9c7b93fa204f5af74e42e1628 to your computer and use it in GitHub Desktop.
typescript stream-pipe
import { ObservableInput, isObservable, Observable } from 'rxjs';
export type OperatorReturn<R> = ObservableInput<R> | R extends ObservableInput<
infer U
>
? U
: R;
export type Operator<T, R> = (params: T) => OperatorReturn<R>;
export type StripOperatorResult<T> = T extends ObservableInput<infer U> ? U : T;
export function streamPipe<Params, R1>(
o1: Operator<Params, R1>,
): (params: Params) => Observable<R1>;
export function streamPipe<Params, R1, R2>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
): (
params: Params,
) => Observable<StripOperatorResult<R1> | StripOperatorResult<R2>>;
export function streamPipe<Params, R1, R2, R3>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
): (
params: Params,
) => Observable<
StripOperatorResult<R1> | StripOperatorResult<R2> | StripOperatorResult<R3>
>;
export function streamPipe<Params, R1, R2, R3, R4>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
o4: Operator<StripOperatorResult<R3>, R4>,
): (
params: Params,
) => Observable<
| StripOperatorResult<R1>
| StripOperatorResult<R2>
| StripOperatorResult<R3>
| StripOperatorResult<R4>
>;
export function streamPipe<Params, R1, R2, R3, R4, R5>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
o4: Operator<StripOperatorResult<R3>, R4>,
o5: Operator<StripOperatorResult<R4>, R5>,
): (
params: Params,
) => Observable<
| StripOperatorResult<R1>
| StripOperatorResult<R2>
| StripOperatorResult<R3>
| StripOperatorResult<R4>
| StripOperatorResult<R5>
>;
export function streamPipe<Params, R1, R2, R3, R4, R5, R6>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
o4: Operator<StripOperatorResult<R3>, R4>,
o5: Operator<StripOperatorResult<R4>, R5>,
o6: Operator<StripOperatorResult<R5>, R6>,
): (
params: Params,
) => Observable<
| StripOperatorResult<R1>
| StripOperatorResult<R2>
| StripOperatorResult<R3>
| StripOperatorResult<R4>
| StripOperatorResult<R5>
| StripOperatorResult<R6>
>;
export function streamPipe<Params, R1, R2, R3, R4, R5, R6, R7>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
o4: Operator<StripOperatorResult<R3>, R4>,
o5: Operator<StripOperatorResult<R4>, R5>,
o6: Operator<StripOperatorResult<R5>, R6>,
o7: Operator<StripOperatorResult<R6>, R7>,
): (
params: Params,
) => Observable<
| StripOperatorResult<R1>
| StripOperatorResult<R2>
| StripOperatorResult<R3>
| StripOperatorResult<R4>
| StripOperatorResult<R5>
| StripOperatorResult<R6>
| StripOperatorResult<R7>
>;
export function streamPipe<Params, R1, R2, R3, R4, R5, R6, R7, R8>(
o1: Operator<Params, R1>,
o2: Operator<StripOperatorResult<R1>, R2>,
o3: Operator<StripOperatorResult<R2>, R3>,
o4: Operator<StripOperatorResult<R3>, R4>,
o5: Operator<StripOperatorResult<R4>, R5>,
o6: Operator<StripOperatorResult<R5>, R6>,
o7: Operator<StripOperatorResult<R6>, R7>,
o8: Operator<StripOperatorResult<R7>, R8>,
): (
params: Params,
) => Observable<
| StripOperatorResult<R1>
| StripOperatorResult<R2>
| StripOperatorResult<R3>
| StripOperatorResult<R4>
| StripOperatorResult<R5>
| StripOperatorResult<R6>
| StripOperatorResult<R7>
| StripOperatorResult<R8>
>;
export function streamPipe(
...operators: Operator<any, any>[]
): (params: any) => Observable<any> {
return (params: any) =>
new Observable<any>((subscriber) => {
let i = -1;
const run = (input: any) => {
if (subscriber.closed) {
return;
}
i += 1;
if (i >= operators.length) {
subscriber.complete();
} else {
const operation = operators[i](input);
if (isObservable(operation)) {
let latestValue: any;
operation.subscribe(
(r) => {
latestValue = r;
subscriber.next(r);
},
(error) => {
subscriber.error(error);
},
() => {
run(latestValue);
},
);
} else {
Promise.resolve(operation)
.then((value) => {
subscriber.next(value);
run(value);
})
.catch((error) => {
subscriber.error(error);
});
}
}
};
run(params);
});
}
import { streamPipe } from '../stream-pipe';
import { Observable, of } from 'rxjs';
describe('stream-pipe', () => {
test('typing test - empty params', () => {
const o = streamPipe(
(_: void) => of({ x: 1 }),
({ x }) => Promise.resolve({ y: x + 1 }),
({ y }) => ({ z: y + 1 }),
);
const s: Observable<{ x: number } | { y: number } | { z: number }> = o();
s.subscribe((result) => {
expect(
'x' in result
? result.x === 1
: 'y' in result
? result.y === 2
: result.z === 3,
).toBeTruthy();
});
});
test('typing test - if statement with type or', () => {
const o = streamPipe(
(a: number) => of({ x: a * 2 }),
({ x }) => (x > 10 ? of({ y1: x * 100 }) : { y2: 0 }),
(result) => ('y1' in result ? { z: true } : { z: false }),
);
const s: Observable<
{ x: number } | { y1: number } | { y2: number } | { z: boolean }
> = o(10);
s.subscribe((result) => {
expect(
'x' in result
? result.x === 20
: 'y1' in result
? result.y1 === 200
: 'z' in result
? result.z
: false,
).toBeTruthy();
});
o(1).subscribe((result) => {
expect(
'x' in result
? result.x === 2
: 'y2' in result
? result.y2 === 0
: 'z' in result
? result.z === false
: false,
).toBeTruthy();
});
});
test('typing test - complex structure', () => {
const o = streamPipe(
(_: void) =>
Math.random() > 0.5 ? of(10) : Math.random() ? Promise.resolve(10) : 10,
(i: number) => (i > 10 ? Promise.resolve(i + 10) : i.toString()),
(i: number | string) => i.toString(),
);
const s: Observable<number | string> = o();
s.subscribe((result) => {
expect(result).toBe('10');
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment