Skip to content

Instantly share code, notes, and snippets.

@tomwadeson
Last active November 15, 2019 00:35
Show Gist options
  • Save tomwadeson/82f3e5996cf81ca0cc2e9a8eaea45f3a to your computer and use it in GitHub Desktop.
Save tomwadeson/82f3e5996cf81ca0cc2e9a8eaea45f3a to your computer and use it in GitHub Desktop.
Streaming generation of arbitrary (and optionally distinct) data, with a scary-looking `toIterator` to integrate with Gatling
package newbeeper.performance
import cats.effect.concurrent.{MVar, Ref}
import cats.effect.syntax.concurrent._
import cats.effect.{Concurrent, ConcurrentEffect, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import fs2.Stream
import org.scalacheck.Arbitrary
object Random {
implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) {
def unsafeToIterator(implicit F: ConcurrentEffect[F]): F[Iterator[A]] =
toIterator.map(_.map(F.toIO(_).unsafeRunSync()))
def toIterator(implicit F: Concurrent[F]): F[Iterator[F[A]]] =
for {
cell <- MVar[F].empty[A]
_ <- stream.evalMap(cell.put).compile.drain.start
} yield Iterator.continually(cell.take)
}
def of[F[_], A](implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] =
Stream.eval(F.delay(A.arbitrary.sample)).repeat.unNone
def distinct[F[_], A](implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] =
distinctBy[F, A, A](identity)
def distinctBy[F[_], A, B](f: A => B)(implicit F: Sync[F], A: Arbitrary[A]): Stream[F, A] =
for {
seen <- Stream.eval(Ref[F].of(Set.empty[B]))
filtered <- of[F, A].evalFilter(a => { val b = f(a); seen.modify(seen => (seen + b, !seen.contains(b))) })
} yield filtered
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment