Skip to content

Instantly share code, notes, and snippets.

@pcurrivan
Last active March 4, 2020 02:16
Show Gist options
  • Save pcurrivan/73872aa87d41fcaeda804b962041d61d to your computer and use it in GitHub Desktop.
Save pcurrivan/73872aa87d41fcaeda804b962041d61d to your computer and use it in GitHub Desktop.
Replacement for removed RxJS operator inspectTime
import { Observable } from 'rxjs';
// custom RxJS operator
// should be equivalent to the inspectTime operator that was removed from RxJS
export function inspectTime<T>(delay: number) {
return (source: Observable<T>): Observable<T> => {
return new Observable<T>(subscriber => {
let lastValue: T;
const action = () => subscriber.next(lastValue);
const throttle = ActionThrottle(action, delay);
const subscription = source.subscribe({
next(t) {
lastValue = t;
throttle.enqueue();
},
error(err) {
throttle.dequeue();
subscriber.error(err);
},
complete() {
throttle.dequeue();
subscriber.complete();
}
});
return () => {
throttle.dequeue();
subscription.unsubscribe();
};
});
};
}
/*
limits the frequency at which the provided action is executed.
call enqueue to execute the action - it will execute either immediately or, if it was executed less than minPeriod_ms in the past,
as soon as minPeriod_ms has expired.
call dequeue to cancel any pending action.
*/
function ActionThrottle(action, minPeriod_ms) {
let blocked = false;
let queued = false;
function enqueue() {
if (!blocked) {
blockAndExecute();
} else {
queued = true;
}
}
function dequeue() {
queued = false;
}
function blockAndExecute() {
blocked = true;
setTimeout(unblock, minPeriod_ms);
action();
}
function unblock() {
if (queued) {
dequeue();
blockAndExecute();
} else {
blocked = false;
}
}
return {
enqueue: enqueue,
dequeue: dequeue
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment