Last active
May 6, 2021 00:09
-
-
Save iamssen/85df7be9c7b93fa204f5af74e42e1628 to your computer and use it in GitHub Desktop.
typescript stream-pipe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ObservableInput, isObservable, Observable } from 'rxjs'; | |
export type OperatorReturn<R> = ObservableInput<R> | R extends ObservableInput< | |
infer U | |
> | |
? U | |
: R; | |
export type Operator<T, R> = (params: T) => OperatorReturn<R>; | |
export type StripOperatorResult<T> = T extends ObservableInput<infer U> ? U : T; | |
export function streamPipe<Params, R1>( | |
o1: Operator<Params, R1>, | |
): (params: Params) => Observable<R1>; | |
export function streamPipe<Params, R1, R2>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
): ( | |
params: Params, | |
) => Observable<StripOperatorResult<R1> | StripOperatorResult<R2>>; | |
export function streamPipe<Params, R1, R2, R3>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
): ( | |
params: Params, | |
) => Observable< | |
StripOperatorResult<R1> | StripOperatorResult<R2> | StripOperatorResult<R3> | |
>; | |
export function streamPipe<Params, R1, R2, R3, R4>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
o4: Operator<StripOperatorResult<R3>, R4>, | |
): ( | |
params: Params, | |
) => Observable< | |
| StripOperatorResult<R1> | |
| StripOperatorResult<R2> | |
| StripOperatorResult<R3> | |
| StripOperatorResult<R4> | |
>; | |
export function streamPipe<Params, R1, R2, R3, R4, R5>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
o4: Operator<StripOperatorResult<R3>, R4>, | |
o5: Operator<StripOperatorResult<R4>, R5>, | |
): ( | |
params: Params, | |
) => Observable< | |
| StripOperatorResult<R1> | |
| StripOperatorResult<R2> | |
| StripOperatorResult<R3> | |
| StripOperatorResult<R4> | |
| StripOperatorResult<R5> | |
>; | |
export function streamPipe<Params, R1, R2, R3, R4, R5, R6>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
o4: Operator<StripOperatorResult<R3>, R4>, | |
o5: Operator<StripOperatorResult<R4>, R5>, | |
o6: Operator<StripOperatorResult<R5>, R6>, | |
): ( | |
params: Params, | |
) => Observable< | |
| StripOperatorResult<R1> | |
| StripOperatorResult<R2> | |
| StripOperatorResult<R3> | |
| StripOperatorResult<R4> | |
| StripOperatorResult<R5> | |
| StripOperatorResult<R6> | |
>; | |
export function streamPipe<Params, R1, R2, R3, R4, R5, R6, R7>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
o4: Operator<StripOperatorResult<R3>, R4>, | |
o5: Operator<StripOperatorResult<R4>, R5>, | |
o6: Operator<StripOperatorResult<R5>, R6>, | |
o7: Operator<StripOperatorResult<R6>, R7>, | |
): ( | |
params: Params, | |
) => Observable< | |
| StripOperatorResult<R1> | |
| StripOperatorResult<R2> | |
| StripOperatorResult<R3> | |
| StripOperatorResult<R4> | |
| StripOperatorResult<R5> | |
| StripOperatorResult<R6> | |
| StripOperatorResult<R7> | |
>; | |
export function streamPipe<Params, R1, R2, R3, R4, R5, R6, R7, R8>( | |
o1: Operator<Params, R1>, | |
o2: Operator<StripOperatorResult<R1>, R2>, | |
o3: Operator<StripOperatorResult<R2>, R3>, | |
o4: Operator<StripOperatorResult<R3>, R4>, | |
o5: Operator<StripOperatorResult<R4>, R5>, | |
o6: Operator<StripOperatorResult<R5>, R6>, | |
o7: Operator<StripOperatorResult<R6>, R7>, | |
o8: Operator<StripOperatorResult<R7>, R8>, | |
): ( | |
params: Params, | |
) => Observable< | |
| StripOperatorResult<R1> | |
| StripOperatorResult<R2> | |
| StripOperatorResult<R3> | |
| StripOperatorResult<R4> | |
| StripOperatorResult<R5> | |
| StripOperatorResult<R6> | |
| StripOperatorResult<R7> | |
| StripOperatorResult<R8> | |
>; | |
export function streamPipe( | |
...operators: Operator<any, any>[] | |
): (params: any) => Observable<any> { | |
return (params: any) => | |
new Observable<any>((subscriber) => { | |
let i = -1; | |
const run = (input: any) => { | |
if (subscriber.closed) { | |
return; | |
} | |
i += 1; | |
if (i >= operators.length) { | |
subscriber.complete(); | |
} else { | |
const operation = operators[i](input); | |
if (isObservable(operation)) { | |
let latestValue: any; | |
operation.subscribe( | |
(r) => { | |
latestValue = r; | |
subscriber.next(r); | |
}, | |
(error) => { | |
subscriber.error(error); | |
}, | |
() => { | |
run(latestValue); | |
}, | |
); | |
} else { | |
Promise.resolve(operation) | |
.then((value) => { | |
subscriber.next(value); | |
run(value); | |
}) | |
.catch((error) => { | |
subscriber.error(error); | |
}); | |
} | |
} | |
}; | |
run(params); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { streamPipe } from '../stream-pipe'; | |
import { Observable, of } from 'rxjs'; | |
describe('stream-pipe', () => { | |
test('typing test - empty params', () => { | |
const o = streamPipe( | |
(_: void) => of({ x: 1 }), | |
({ x }) => Promise.resolve({ y: x + 1 }), | |
({ y }) => ({ z: y + 1 }), | |
); | |
const s: Observable<{ x: number } | { y: number } | { z: number }> = o(); | |
s.subscribe((result) => { | |
expect( | |
'x' in result | |
? result.x === 1 | |
: 'y' in result | |
? result.y === 2 | |
: result.z === 3, | |
).toBeTruthy(); | |
}); | |
}); | |
test('typing test - if statement with type or', () => { | |
const o = streamPipe( | |
(a: number) => of({ x: a * 2 }), | |
({ x }) => (x > 10 ? of({ y1: x * 100 }) : { y2: 0 }), | |
(result) => ('y1' in result ? { z: true } : { z: false }), | |
); | |
const s: Observable< | |
{ x: number } | { y1: number } | { y2: number } | { z: boolean } | |
> = o(10); | |
s.subscribe((result) => { | |
expect( | |
'x' in result | |
? result.x === 20 | |
: 'y1' in result | |
? result.y1 === 200 | |
: 'z' in result | |
? result.z | |
: false, | |
).toBeTruthy(); | |
}); | |
o(1).subscribe((result) => { | |
expect( | |
'x' in result | |
? result.x === 2 | |
: 'y2' in result | |
? result.y2 === 0 | |
: 'z' in result | |
? result.z === false | |
: false, | |
).toBeTruthy(); | |
}); | |
}); | |
test('typing test - complex structure', () => { | |
const o = streamPipe( | |
(_: void) => | |
Math.random() > 0.5 ? of(10) : Math.random() ? Promise.resolve(10) : 10, | |
(i: number) => (i > 10 ? Promise.resolve(i + 10) : i.toString()), | |
(i: number | string) => i.toString(), | |
); | |
const s: Observable<number | string> = o(); | |
s.subscribe((result) => { | |
expect(result).toBe('10'); | |
}); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment