Skip to content

Instantly share code, notes, and snippets.

@trajakovic
Last active March 22, 2021 04:23
  • Star 11 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save trajakovic/3b0239cae11e23c76b80 to your computer and use it in GitHub Desktop.
RxJs extension implementation for cache results with time expiration
var slowJob = Rx.Observable.defer(function () {
return Rx.Observable.return(Math.random() * 1000).delay(2000);
});
var cached = slowJob.cacheWithExpiration(5000);
var last = Date.now();
function repeat() {
last = Date.now();
cached.subscribe(function (data) {
console.log("number:", data, 'took:', Date.now() - last, '[ms]');
setTimeout(repeat, 1000);
});
}
setTimeout(repeat, 1000);
Rx.Observable.prototype.cacheWithExpiration = function (expirationMs, scheduler) {
var source = this;
var cachedData = null;
// Use timeout scheduler if scheduler not supplied
scheduler = scheduler || Rx.Scheduler.timeout;
return Rx.Observable.createWithDisposable(function (observer) {
if (!cachedData) {
// The data is not cached.
// create a subject to hold the result
cachedData = new Rx.AsyncSubject();
// subscribe to the query
source.subscribe(cachedData);
// when the query completes, start a timer which will expire the cache
cachedData.subscribe(function () {
scheduler.scheduleWithRelative(expirationMs, function () {
// clear the cache
cachedData = null;
});
});
}
// subscribe the observer to the cached data
return cachedData.subscribe(observer);
});
};
@MoLow
Copy link

MoLow commented Feb 20, 2017

Here is a working typescript version, in case anyone needs it:

import { Observable } from 'rxjs/Observable';
import { Scheduler as IScheduler } from 'rxjs/Scheduler';
import { AsyncSubject, Scheduler } from 'rxjs/Rx';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        cacheWithExpiration: (expirationMs: number, scheduler?: IScheduler) => Observable<T>;
    }
}

Observable.prototype.cacheWithExpiration = function (expirationMs: number, scheduler?: IScheduler) {
    var cachedData: AsyncSubject<any>,
        source = this;

    scheduler = scheduler || Scheduler.async;

    return Observable.create(observer => {
        if (!cachedData) {
            cachedData = new AsyncSubject();
            source.subscribe(cachedData);
            cachedData.subscribe(data => {
                scheduler.schedule(() => {
                    cachedData = null;
                }, expirationMs);
            });
        }

        return cachedData.subscribe(observer);
    });
}

@knuijver
Copy link

knuijver commented Nov 23, 2020

and in case anyone needs it compile correctly in v6, then try this one:

import { asyncScheduler, AsyncSubject, Observable, SchedulerLike } from 'rxjs';

export const fromCacheWithExperation = <T>(expirationMs: number, scheduler: SchedulerLike = asyncScheduler) => (
    source: Observable<T>,
) => {
    let cached: AsyncSubject<T> | null;
    return new Observable<T>((observer) => {
        if (!cached) {
            cached = new AsyncSubject();
            source.subscribe(cached);
            cached.subscribe(() => {
                scheduler.schedule(() => {
                    cached = null;
                }, expirationMs);
            });
        }
        return cached.subscribe(observer);
    });
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment