Created
April 16, 2019 19:56
-
-
Save Daenyth/a3190525c7163a8724d79c486dc46479 to your computer and use it in GitHub Desktop.
Jsoniter fs2 pipe with streaming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.implicits._ | |
import cats.effect.{ConcurrentEffect, ContextShift, IO, Sync} | |
import cats.implicits._ | |
import com.github.plokhotnyuk.jsoniter_scala.core.{ | |
JsonValueCodec, | |
scanJsonValuesFromStream, | |
writeToArray | |
} | |
import com.github.plokhotnyuk.jsoniter_scala.macros.{ | |
CodecMakerConfig, | |
JsonCodecMaker | |
} | |
import fs2.concurrent.Queue | |
import fs2.{Chunk, Pipe, Stream} | |
import org.scalatest.{FlatSpec, Matchers} | |
import scala.concurrent.ExecutionContext | |
class JsoniterSpec extends FlatSpec with Matchers { | |
import ExecutionContext.Implicits.global | |
implicit val CS: ContextShift[IO] = IO.contextShift(global) | |
behavior of "jsoniter" | |
it should "parse streaming values" in { | |
case class Foo(name: String, num: Int) | |
implicit val codec: JsonValueCodec[Foo] = | |
JsonCodecMaker.make[Foo](CodecMakerConfig()) | |
val input = Stream(Foo("fred", 1), Foo("wilma", 2)) | |
val bytes = input | |
.map(writeToArray(_)) | |
.flatMap(bs => Stream.chunk(Chunk.bytes(bs))) | |
.covary[IO] | |
val results = bytes.through(parse[IO, Foo](global)) | |
results.compile.toList | |
.map(result => result shouldEqual input.toList) | |
.unsafeRunSync() | |
} | |
def parse[F[_]: ConcurrentEffect: ContextShift, A: JsonValueCodec]( | |
blockingExecutionContext: ExecutionContext, | |
maxBuffered: Int = 1, | |
shouldParseNext: A => Boolean = (_: A) => true | |
): Pipe[F, Byte, A] = { in => | |
Stream.eval(Queue.boundedNoneTerminated[F, A](maxBuffered)).flatMap { q => | |
in.through(fs2.io.toInputStream[F]).flatMap { inputStream => | |
def eachA(a: A): Boolean = | |
q.enqueue1(Some(a)).as(shouldParseNext(a)).toIO.unsafeRunSync() | |
val parseBytes = ContextShift[F].evalOn(blockingExecutionContext) { | |
Sync[F].delay(scanJsonValuesFromStream(inputStream)(eachA)) | |
} *> q.offer1(None) | |
val emitValues = q.dequeue | |
emitValues concurrently Stream.eval(parseBytes) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The example jsoniter/fs2 integration for http4s here uses the
Array[Byte]
encoding rather than something streaming, so here's one streaming instead of in memory