Last active
September 2, 2017 00:21
-
-
Save kostrse/51ad93664827402a49accc89270d5222 to your computer and use it in GitHub Desktop.
RxJS subscription reconnect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, ObservableInput } from 'rxjs/Observable'; | |
import { OuterSubscriber } from 'rxjs/OuterSubscriber'; | |
import { InnerSubscriber } from 'rxjs/InnerSubscriber'; | |
import { Operator } from 'rxjs/Operator'; | |
import { subscribeToResult } from 'rxjs/util/subscribeToResult'; | |
import { Observer } from "rxjs/Observer"; | |
import { Subscriber } from "rxjs/Subscriber"; | |
import { Unsubscribed } from './Unsubscribed'; | |
import 'rxjs/add/observable/empty'; | |
import 'rxjs/add/observable/of'; | |
import 'rxjs/add/observable/throw'; | |
import 'rxjs/add/observable/range'; | |
import 'rxjs/add/operator/take'; | |
import 'rxjs/add/operator/map'; | |
import 'rxjs/add/operator/catch'; | |
import 'rxjs/add/operator/concat'; | |
class ReconnectOperator<T> implements Operator<T, T> { | |
constructor(private handler: (err: any, lastValue?: T) => Observable<T>) { | |
} | |
call(subscriber: Subscriber<T>, source: any): any { | |
return source.subscribe(new ReconnectSubscriber(subscriber, this.handler)); | |
} | |
} | |
class ReconnectSubscriber<T> extends Subscriber<T> { | |
private lastValue?: T; | |
constructor(destination: Subscriber<T>, private handler: (err: any, lastValue?: T) => Observable<T>) { | |
super(destination); | |
} | |
protected _next(value: T) { | |
this.lastValue = value; | |
super._next(value); | |
} | |
error(err: any) { | |
if (!this.isStopped) { | |
let result: Observable<T>; | |
try { | |
result = this.handler(err, this.lastValue); | |
} catch (err2) { | |
super.error(err2); | |
return; | |
} | |
// TODO: ??? | |
result.subscribe(this._unsubscribeAndRecycle()); | |
// this._unsubscribeAndRecycle(); | |
//this.source.subscribe(result); | |
//this.add(subscribeToResult(this, result)); | |
} | |
} | |
} | |
// TEST SAMPLE | |
class DisconnectedError extends Error { | |
constructor() { | |
super('Ups, connection lost.'); | |
} | |
} | |
interface MyQuery { | |
fromId: number; | |
toId: number; | |
} | |
interface MyItem { | |
id: number; | |
} | |
function observeUnstable(query: MyQuery): Observable<MyItem> { | |
const failAfter = 5; | |
return Observable | |
.range(query.fromId, query.toId - query.fromId + 1) | |
.map(id => { return { id: id }}) | |
.take(failAfter).concat(Observable.throw(new DisconnectedError())); | |
} | |
let startQuery = { fromId: 1, toId: 10 }; | |
let reconnectable = observeUnstable(startQuery).lift(new ReconnectOperator<MyItem>((err, lastValue?) => { | |
if (err instanceof DisconnectedError) { | |
// The error indicates that we've been disconnected, | |
// reconnectiong from the place we stopped | |
let continueQuery = { fromId: lastValue ? lastValue.id + 1 : startQuery.fromId, toId: startQuery.toId }; | |
return observeUnstable(continueQuery); | |
} else { | |
// Rethrowing error we don't expect | |
throw err; | |
} | |
})); | |
reconnectable.subscribe( | |
(value) => console.log("Next: %O", value), | |
(err) => console.error('Error: %O', err), | |
() => console.log("Complete") | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment