Skip to content

Instantly share code, notes, and snippets.

@pbuyle
Last active September 2, 2021 13:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pbuyle/66b6ec40eb0360993af829c50fe08035 to your computer and use it in GitHub Desktop.
Save pbuyle/66b6ec40eb0360993af829c50fe08035 to your computer and use it in GitHub Desktop.
Untested Scala code to group com.twitter.concurrent.AsyncStream elements toghether. I wrote it thinking I needed it but was wrong. Manbe I will eventually need it. Maybe not. Maybe I will not remember this gist when I do. Maybe I got it all wrong.
import com.twitter.concurrent.AsyncStream
object AsyncStreamHelpers {
implicit class RichAsyncStream[T](val stream: AsyncStream[T]) {
/**
* Partitions this stream into sequences of adjacent elements grouped according to some discriminator function.
*
* This method will evaluation of this stream until its first element not belonging to the first group (ie. if the
* first element of the returned stream is a size of `n` then `n + 1` first element of this stream will be
* evaluated). Accessing each subsequent element of the resulting stream will evaluate another group of elements
* from thies stream.
*
* {{{
* val stream = AsyncStream(("a",1),("a",2),("b",1),("a", 3),("b", 2"),("b",3))
* stream.groupBy(_._1) = AsyncStream(
* ("a" -> ("a",1) :: ("a",2) :: Nil),
* ("b" -> ("b",1") :: Nil),
* ("a" -> ("a",3") :: Nil),
* ("b" -> ("b",2) :: ("b",3) :: Nil)
* }}}
*
* @param f the discriminator function.
* @tparam K the type of keys returned by the discriminator function.
*/
def groupBy[K](f: T => K): AsyncStream[(K, Seq[T])] = {
val head = AsyncStream.fromFuture(stream.uncons.flatMap {
case Some((head, tail)) =>
val headKey = f(head)
tail().takeWhile(f(_) == headKey).toSeq().map(headKey -> _).map(Some(_))
case None => Future.None
})
head.flatMap {
case Some((key, seq)) => AsyncStream.mk((key, seq), stream.drop(seq.size).groupBy(f))
case None => AsyncStream.empty
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment