Skip to content

Instantly share code, notes, and snippets.

@CremboC
Created October 17, 2019 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CremboC/e0515b5f15f17ee40a92659db78cbfd3 to your computer and use it in GitHub Desktop.
Save CremboC/e0515b5f15f17ee40a92659db78cbfd3 to your computer and use it in GitHub Desktop.
fs2.Stream.mergeWith
import cats.data.Ior
import cats.Monad
import cats.syntax.all._
import fs2.{Pipe, Pull, Stream}
def mergeWith[F[_]: Monad, A, B, C](
ls: Stream[F, A],
rs: Stream[F, B],
)(
ok: (A, B) => Boolean,
gen: (A Ior B) => Either[A, B],
)(
f: (A, B) => C,
): Stream[F, C] = {
def loop(ls: Stream[F, A], rs: Stream[F, B]): Pull[F, C, Unit] = {
val left = ls.pull.uncons1
val right = rs.pull.uncons1
val o = (left, right).mapN {
case (Some((a, lnext)), Some((b, rnext))) =>
if (ok(a, b)) Pull.output1(f(a, b)) >> loop(lnext, rnext)
else
gen(Ior.Both(a, b)) match {
case Left(a_) =>
Pull.output1(f(a_, b)) >> loop(Stream.emit(a) ++ lnext, rnext)
case Right(b_) =>
Pull.output1(f(a, b_)) >> loop(lnext, Stream.emit(b) ++ rnext)
}
case (Some((a, lnext)), None) =>
gen(Ior.Left(a)) match {
case Left(_) =>
??? // shouldn't be possible but types allow it..
case Right(b_) =>
Pull.output1(f(a, b_)) >> loop(lnext, Stream.empty)
}
case (None, Some((b, rnext))) =>
gen(Ior.Right(b)) match {
case Left(a_) =>
Pull.output1(f(a_, b)) >> loop(Stream.empty, rnext)
case Right(_) =>
??? // shouldn't be possible but types allow it..
}
case (None, None) =>
Pull.done
}
o.flatten
}
loop(ls, rs).stream
}
import java.time.Instant
import cats.Id
import cats.data.Ior
import org.scalatest.{FlatSpec, Matchers}
import fs2.Stream
class StreamsSpec extends FlatSpec with Matchers {
behavior.of("Streams")
private case class Example(
time: Instant,
id: String,
value: String,
)
private case class Atom(
time: Instant,
ids: List[String],
value: String,
)
it should "mergeWith correctly, missing right" in {
val left =
Stream.emits(
List(
Example(
time = Instant.parse("2019-10-16T09:00:00Z"),
id = "id1",
value = "guzzguzz"
),
Example(
time = Instant.parse("2019-10-16T08:00:00Z"),
id = "id1",
value = "barbar"
),
Example(
time = Instant.parse("2019-10-16T07:00:00Z"),
id = "id1",
value = "foofoo"
),
)
)
val right =
Stream.emits(
List(
Example(
time = Instant.parse("2019-10-16T09:00:00Z"),
id = "id2",
value = "hoho"
),
Example(
time = Instant.parse("2019-10-16T07:00:00Z"),
id = "id2",
value = "fizzfizz"
),
)
)
val out = Streams.mergeWith[Id, Example, Example, Atom](left, right)(
ok = _.time == _.time,
gen = {
case Ior.Left(a) =>
Right(Example(a.time, "id2", ""))
case Ior.Right(b) =>
Left(Example(b.time, "id1", ""))
case Ior.Both(a, b) =>
a.time.compareTo(b.time) match {
case -1 => // a is earlier
Left(Example(b.time, "id1", ""))
case 0 => // they are equal, should never happen
???
case 1 => // b is earlier
Right(Example(a.time, "id2", ""))
}
}
) { (a, b) =>
Atom(a.time, List(a.id, b.id), a.value ++ b.value)
}
val result = out.compile.toList
result should ===(
List(
Atom(
time = Instant.parse("2019-10-16T09:00:00Z"),
ids = List("id1", "id2"),
value = "guzzguzzhoho"
),
Atom(
time = Instant.parse("2019-10-16T08:00:00Z"),
ids = List("id1", "id2"),
value = "barbar"
),
Atom(
time = Instant.parse("2019-10-16T07:00:00Z"),
ids = List("id1", "id2"),
value = "foofoofizzfizz"
),
)
)
}
it should "mergeWith correctly, missing left" in {
val left =
Stream.emits(
List(
Example(
time = Instant.parse("2019-10-16T09:00:00Z"),
id = "id1",
value = "guzzguzz"
),
Example(
time = Instant.parse("2019-10-16T07:00:00Z"),
id = "id1",
value = "foofoo"
),
)
)
val right =
Stream.emits(
List(
Example(
time = Instant.parse("2019-10-16T09:00:00Z"),
id = "id2",
value = "hoho"
),
Example(
time = Instant.parse("2019-10-16T08:00:00Z"),
id = "id2",
value = "geegee"
),
Example(
time = Instant.parse("2019-10-16T07:00:00Z"),
id = "id2",
value = "fizzfizz"
),
)
)
val out = Streams.mergeWith[Id, Example, Example, Atom](left, right)(
ok = _.time == _.time,
gen = {
case Ior.Left(a) =>
Right(Example(a.time, "id2", ""))
case Ior.Right(b) =>
Left(Example(b.time, "id1", ""))
case Ior.Both(a, b) =>
a.time.compareTo(b.time) match {
case -1 => // a is earlier
Left(Example(b.time, "id1", ""))
case 0 => // they are equal, should never happen
???
case 1 => // b is earlier
Right(Example(a.time, "id2", ""))
}
}
) { (a, b) =>
Atom(a.time, List(a.id, b.id), a.value ++ b.value)
}
val result = out.compile.toList
result should ===(
List(
Atom(
time = Instant.parse("2019-10-16T09:00:00Z"),
ids = List("id1", "id2"),
value = "guzzguzzhoho"
),
Atom(
time = Instant.parse("2019-10-16T08:00:00Z"),
ids = List("id1", "id2"),
value = "geegee"
),
Atom(
time = Instant.parse("2019-10-16T07:00:00Z"),
ids = List("id1", "id2"),
value = "foofoofizzfizz"
),
)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment