Skip to content

Instantly share code, notes, and snippets.

@MatthiasKunnen
Last active April 18, 2018 12:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MatthiasKunnen/bc82adb90068072fed1031045f245074 to your computer and use it in GitHub Desktop.
Save MatthiasKunnen/bc82adb90068072fed1031045f245074 to your computer and use it in GitHub Desktop.
throwIf pipeable operator for RxJs
import { MonoTypeOperatorFunction } from 'rxjs/interfaces';
import { Observable } from 'rxjs/Observable';
import { Operator } from 'rxjs/Operator';
import { Subscriber } from 'rxjs/Subscriber';
import { TeardownLogic } from 'rxjs/Subscription';
class ThrowIfSubscriber<T> extends Subscriber<T> {
constructor(
private myDestination: Subscriber<T>,
private predicate: (value: T) => boolean,
private errorMessage?: string,
) {
super(myDestination);
}
protected _next(value: T) {
let result: boolean;
try {
result = this.predicate(value);
} catch (err) {
this.myDestination.error(err);
return;
}
if (result) {
this.myDestination.error(this.errorMessage);
} else {
this.myDestination.next(value);
}
}
}
class ThrowIfOperator<T> implements Operator<T, T> {
constructor(
private predicate: (value: T) => boolean,
private errorMessage?: string,
) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new ThrowIfSubscriber(
subscriber,
this.predicate,
this.errorMessage,
));
}
}
export function throwIf<T>(
predicate: (value: T) => boolean,
errorMessage?: string,
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
source.lift(new ThrowIfOperator(predicate, errorMessage));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment