Skip to content

Instantly share code, notes, and snippets.

@guymers
Created June 20, 2022 11:15
Show Gist options
  • Save guymers/c73df30ac09f0a589d17c6e24ee0d615 to your computer and use it in GitHub Desktop.
Save guymers/c73df30ac09f0a589d17c6e24ee0d615 to your computer and use it in GitHub Desktop.
ZStream.distributedWith on a sum type
type DistributedSumType[E, Tup <: Tuple] <: Tuple = Tup match {
case EmptyTuple => EmptyTuple
case h *: t => Dequeue[Exit[Option[E], h]] *: DistributedSumType[E, t]
}
extension [R, E, O](v: ZStream[R, E, O]) {
inline def distributedSumType[E1 >: E](maximumLag: Int)(using
m: Mirror.SumOf[O],
): ZManaged[R, Nothing, DistributedSumType[E1, m.MirroredElemTypes]] = {
val n = constValue[Tuple.Size[m.MirroredElemTypes]]
def decide(o: O): UIO[Int => Boolean] = ZIO.succeed(_ == m.ordinal(o))
v.distributedWith[E1](n, maximumLag = maximumLag, decide).map { ls =>
Tuple.fromArray(ls.toArray).asInstanceOf[DistributedSumType[E1, m.MirroredElemTypes]]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment