Skip to content

Instantly share code, notes, and snippets.

@AlexDaSoul
Last active January 20, 2020 20:15
Show Gist options
  • Save AlexDaSoul/02ba751fc7d5807d48e318473e13a53b to your computer and use it in GitHub Desktop.
Save AlexDaSoul/02ba751fc7d5807d48e318473e13a53b to your computer and use it in GitHub Desktop.
nga-29
export function grpcStream<T>(client: ClientReadableStream<T>): Observable<T> {
let stream: ClientReadableStream<T> = null;
let subscriptionCounter = 0;
const data: Observable<any> = new Observable((observer: Observer<T>) => {
if (subscriptionCounter === 0) {
stream = client;
}
subscriptionCounter++;
stream.on(StreamType.DATA, (response: jspb.Message) => {
observer.next(response.toObject());
});
stream.on(StreamType.STATUS, (status: Status) => {
if (status.code === StatusCode.UNAUTHENTICATED) {
jwtAuthError$.next();
}
if (status.code !== StatusCode.OK) {
observer.error(status);
}
});
});
return data.pipe(
finalize(() => {
subscriptionCounter--;
if (subscriptionCounter === 0) {
stream.cancel();
}
}),
share(),
retryWhen(errors =>
errors.pipe(
tap(val => console.warn(`Stream will be reconnected in 30 seconds`)),
delayWhen(val => timer(30000)),
),
),
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment