Skip to content

Instantly share code, notes, and snippets.

@EnricoPicci
Created March 18, 2022 12:58
Show Gist options
  • Save EnricoPicci/2cdf63260a5d79cbd7d241eb458116e3 to your computer and use it in GitHub Desktop.
Save EnricoPicci/2cdf63260a5d79cbd7d241eb458116e3 to your computer and use it in GitHub Desktop.
bufferConcatMap operator that works also with concurrent subscriptions
export function bufferConcatMap<T, R>(project: (val: T[], index: number, bufferIndex: number) => Observable<R>) {
return (sourceObservable: Observable<T>) => {
// buld the Observable returned by this operator
return defer(() => {
// this function will be called each time this Observable is subscribed to.
// initialize the state - these variables will hold the state for every subscripition of the returned Observable
let bufferedNotifications = [] as T[];
let processing = false;
let _index = -1; // index of the notification from upstream
let _bufferIndex = 0; // index of the buffer passed to the project function
return sourceObservable.pipe(
tap((val) => {
// every value notified by the source is stored in the buffer
bufferedNotifications.push(val);
_index++;
}),
concatMap(() => {
// if processing or if there are no items in the buffer just complete without notifying anything
if (processing || bufferedNotifications.length === 0) {
return EMPTY;
}
// create a copy of the buffer to be passed to the project function so that the bufferedNotifications array
// can be safely reset
const _buffer = [...bufferedNotifications];
// update the state: now processing start and the bufferedNotifications needs to be reset
processing = true;
bufferedNotifications = [];
// return the result of the project function invoked with the buffer of values stored
return project(_buffer, _index, _bufferIndex).pipe(
tap({
// when the Observable returned by the project function completes it
// means that there is no processing on the fly
complete: () => {
_bufferIndex++;
processing = false;
},
}),
);
}),
);
});
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment