Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@kostrse
Last active September 2, 2017 00:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kostrse/51ad93664827402a49accc89270d5222 to your computer and use it in GitHub Desktop.
Save kostrse/51ad93664827402a49accc89270d5222 to your computer and use it in GitHub Desktop.
RxJS subscription reconnect
import { Observable, ObservableInput } from 'rxjs/Observable';
import { OuterSubscriber } from 'rxjs/OuterSubscriber';
import { InnerSubscriber } from 'rxjs/InnerSubscriber';
import { Operator } from 'rxjs/Operator';
import { subscribeToResult } from 'rxjs/util/subscribeToResult';
import { Observer } from "rxjs/Observer";
import { Subscriber } from "rxjs/Subscriber";
import { Unsubscribed } from './Unsubscribed';
import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/concat';
class ReconnectOperator<T> implements Operator<T, T> {
constructor(private handler: (err: any, lastValue?: T) => Observable<T>) {
}
call(subscriber: Subscriber<T>, source: any): any {
return source.subscribe(new ReconnectSubscriber(subscriber, this.handler));
}
}
class ReconnectSubscriber<T> extends Subscriber<T> {
private lastValue?: T;
constructor(destination: Subscriber<T>, private handler: (err: any, lastValue?: T) => Observable<T>) {
super(destination);
}
protected _next(value: T) {
this.lastValue = value;
super._next(value);
}
error(err: any) {
if (!this.isStopped) {
let result: Observable<T>;
try {
result = this.handler(err, this.lastValue);
} catch (err2) {
super.error(err2);
return;
}
// TODO: ???
result.subscribe(this._unsubscribeAndRecycle());
// this._unsubscribeAndRecycle();
//this.source.subscribe(result);
//this.add(subscribeToResult(this, result));
}
}
}
// TEST SAMPLE
class DisconnectedError extends Error {
constructor() {
super('Ups, connection lost.');
}
}
interface MyQuery {
fromId: number;
toId: number;
}
interface MyItem {
id: number;
}
function observeUnstable(query: MyQuery): Observable<MyItem> {
const failAfter = 5;
return Observable
.range(query.fromId, query.toId - query.fromId + 1)
.map(id => { return { id: id }})
.take(failAfter).concat(Observable.throw(new DisconnectedError()));
}
let startQuery = { fromId: 1, toId: 10 };
let reconnectable = observeUnstable(startQuery).lift(new ReconnectOperator<MyItem>((err, lastValue?) => {
if (err instanceof DisconnectedError) {
// The error indicates that we've been disconnected,
// reconnectiong from the place we stopped
let continueQuery = { fromId: lastValue ? lastValue.id + 1 : startQuery.fromId, toId: startQuery.toId };
return observeUnstable(continueQuery);
} else {
// Rethrowing error we don't expect
throw err;
}
}));
reconnectable.subscribe(
(value) => console.log("Next: %O", value),
(err) => console.error('Error: %O', err),
() => console.log("Complete")
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment