Created
March 18, 2022 12:58
-
-
Save EnricoPicci/2cdf63260a5d79cbd7d241eb458116e3 to your computer and use it in GitHub Desktop.
bufferConcatMap operator that works also with concurrent subscriptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function bufferConcatMap<T, R>(project: (val: T[], index: number, bufferIndex: number) => Observable<R>) { | |
return (sourceObservable: Observable<T>) => { | |
// buld the Observable returned by this operator | |
return defer(() => { | |
// this function will be called each time this Observable is subscribed to. | |
// initialize the state - these variables will hold the state for every subscripition of the returned Observable | |
let bufferedNotifications = [] as T[]; | |
let processing = false; | |
let _index = -1; // index of the notification from upstream | |
let _bufferIndex = 0; // index of the buffer passed to the project function | |
return sourceObservable.pipe( | |
tap((val) => { | |
// every value notified by the source is stored in the buffer | |
bufferedNotifications.push(val); | |
_index++; | |
}), | |
concatMap(() => { | |
// if processing or if there are no items in the buffer just complete without notifying anything | |
if (processing || bufferedNotifications.length === 0) { | |
return EMPTY; | |
} | |
// create a copy of the buffer to be passed to the project function so that the bufferedNotifications array | |
// can be safely reset | |
const _buffer = [...bufferedNotifications]; | |
// update the state: now processing start and the bufferedNotifications needs to be reset | |
processing = true; | |
bufferedNotifications = []; | |
// return the result of the project function invoked with the buffer of values stored | |
return project(_buffer, _index, _bufferIndex).pipe( | |
tap({ | |
// when the Observable returned by the project function completes it | |
// means that there is no processing on the fly | |
complete: () => { | |
_bufferIndex++; | |
processing = false; | |
}, | |
}), | |
); | |
}), | |
); | |
}); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment