Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created April 16, 2019 19:56
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Daenyth/a3190525c7163a8724d79c486dc46479 to your computer and use it in GitHub Desktop.
Save Daenyth/a3190525c7163a8724d79c486dc46479 to your computer and use it in GitHub Desktop.
Jsoniter fs2 pipe with streaming
import cats.effect.implicits._
import cats.effect.{ConcurrentEffect, ContextShift, IO, Sync}
import cats.implicits._
import com.github.plokhotnyuk.jsoniter_scala.core.{
JsonValueCodec,
scanJsonValuesFromStream,
writeToArray
}
import com.github.plokhotnyuk.jsoniter_scala.macros.{
CodecMakerConfig,
JsonCodecMaker
}
import fs2.concurrent.Queue
import fs2.{Chunk, Pipe, Stream}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.ExecutionContext
class JsoniterSpec extends FlatSpec with Matchers {
import ExecutionContext.Implicits.global
implicit val CS: ContextShift[IO] = IO.contextShift(global)
behavior of "jsoniter"
it should "parse streaming values" in {
case class Foo(name: String, num: Int)
implicit val codec: JsonValueCodec[Foo] =
JsonCodecMaker.make[Foo](CodecMakerConfig())
val input = Stream(Foo("fred", 1), Foo("wilma", 2))
val bytes = input
.map(writeToArray(_))
.flatMap(bs => Stream.chunk(Chunk.bytes(bs)))
.covary[IO]
val results = bytes.through(parse[IO, Foo](global))
results.compile.toList
.map(result => result shouldEqual input.toList)
.unsafeRunSync()
}
def parse[F[_]: ConcurrentEffect: ContextShift, A: JsonValueCodec](
blockingExecutionContext: ExecutionContext,
maxBuffered: Int = 1,
shouldParseNext: A => Boolean = (_: A) => true
): Pipe[F, Byte, A] = { in =>
Stream.eval(Queue.boundedNoneTerminated[F, A](maxBuffered)).flatMap { q =>
in.through(fs2.io.toInputStream[F]).flatMap { inputStream =>
def eachA(a: A): Boolean =
q.enqueue1(Some(a)).as(shouldParseNext(a)).toIO.unsafeRunSync()
val parseBytes = ContextShift[F].evalOn(blockingExecutionContext) {
Sync[F].delay(scanJsonValuesFromStream(inputStream)(eachA))
} *> q.offer1(None)
val emitValues = q.dequeue
emitValues concurrently Stream.eval(parseBytes)
}
}
}
}
@Daenyth
Copy link
Author

Daenyth commented Apr 16, 2019

The example jsoniter/fs2 integration for http4s here uses the Array[Byte] encoding rather than something streaming, so here's one streaming instead of in memory

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment