Skip to content

Instantly share code, notes, and snippets.

@shlomiassaf
Created May 3, 2018 19:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save shlomiassaf/7b2b2b5155c0d5f7ef18328fa72a8e40 to your computer and use it in GitHub Desktop.
Save shlomiassaf/7b2b2b5155c0d5f7ef18328fa72a8e40 to your computer and use it in GitHub Desktop.
RxJS cached polling operator with cache invalidation support in 10 LOC.
const reset$ = new Subject();
// the API call we want to poll
const poller$ = myApi.getData()
.pipe(poll(reset$, 30000)); // 30 secs
// no polling yet...
const sub1 = poller$.subsbribe(data => console.log('POLL TICK 1') );
const sub2 = poller$.subsbribe(data => console.log(' & 2') );
// console: POLL TICK 1 & 2
// 30 secs passed ...
// console: POLL TICK 1 & 2
// 5 secs passed...
reset$.next();
// console: POLL TICK 1 & 2
// 30 secs passed ...
// console: POLL TICK 1 & 2
// 5 secs passed...
reset$.next();
// no console output, call cancelled.
reset$.next();
// console: POLL TICK 1 & 2
sub1.unsubscribe();
// 30 secs passed ...
// console: & 2
sub2.unsubscribe();
// NO MORE PULLING
export const poll = (reset: Observable<any>, interval = 5000) => {
return (source: Observable<any>) => source.pipe(
delay(interval),
repeat(),
retryWhen( errors => errors.pipe(delay(interval)) ),
takeUntil(reset),
repeat()
publishReplay(1), // a weak shareReplay()
refCount()
);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment