Skip to content

Instantly share code, notes, and snippets.

@pyetras
Created September 9, 2015 15:04
Show Gist options
  • Save pyetras/62df0b79db79f140d0f5 to your computer and use it in GitHub Desktop.
Save pyetras/62df0b79db79f140d0f5 to your computer and use it in GitHub Desktop.
chunkUntil for scalaz-stream
def chunkUntil[I](emit: I => Boolean): Process1[I, Vector[I]] = {
def go(acc: Vector[I]): Process1[I, Vector[I]] =
Process.receive1Or[I,Vector[I]](Process.emit(acc)) { i =>
val chunk = acc :+ i
if (emit(i)) Process.emit(chunk) ++ go(Vector())
else go(chunk)
}
go(Vector())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment