Skip to content

Instantly share code, notes, and snippets.

@skogsmaskin
Created January 27, 2023 10:49
Show Gist options
  • Save skogsmaskin/5cf68813cf1040fe92d70d6dfeb90878 to your computer and use it in GitHub Desktop.
Save skogsmaskin/5cf68813cf1040fe92d70d6dfeb90878 to your computer and use it in GitHub Desktop.
import {switchMap, tap} from 'rxjs/operators'
import {EMPTY, Observable, OperatorFunction, defer, of} from 'rxjs'
export function bufferUntil<T>(
emitWhen: (currentBuffer: T[]) => boolean
): OperatorFunction<T, T[]> {
return (source: Observable<T>) =>
defer(() => {
let buffer: T[] = [] // custom buffer
return source.pipe(
tap((v) => buffer.push(v)), // add values to buffer
switchMap(() => (emitWhen(buffer) ? of(buffer) : EMPTY)), // emit the buffer when the condition is met
tap(() => (buffer = [])) // clear the buffer
)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment