Skip to content

Instantly share code, notes, and snippets.

@khanetor
Last active March 22, 2022 18:38
Show Gist options
  • Save khanetor/5a2471001adcf8ee3da235b3ca635180 to your computer and use it in GitHub Desktop.
Save khanetor/5a2471001adcf8ee3da235b3ca635180 to your computer and use it in GitHub Desktop.
Play-WS streaming JSON body
name := "di-demo"
version := "0.0.1"
scalaVersion := "2.13.8"
Compile / scalacOptions ++= Seq(
"-deprecation", // Warning and location for usages of deprecated APIs.
"-encoding",
"utf-8", // Specify character encoding used by source files.
"-explaintypes", // Explain type errors in more detail.
"-feature", // For features that should be imported explicitly.
"-unchecked", // Generated code depends on assumptions.
"-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access.
// "-Ypartial-unification", // Enable partial unification in type constructor inference
// "-Ywarn-dead-code", // Warn when dead code is identified.
"-Ywarn-extra-implicit", // More than one implicit parameter section is defined.
// "-Ywarn-numeric-widen", // Numerics are implicitly widened.
"-Ywarn-unused:implicits", // An implicit parameter is unused.
"-Ywarn-unused:imports", // An import selector is not referenced.
"-Ywarn-unused:locals", // A local definition is unused.
// "-Ywarn-unused:params", // A value parameter is unused.
"-Ywarn-unused:patvars", // A variable bound in a pattern is unused.
// "-Ywarn-value-discard", // Non-Unit expression results are unused.
"-Ywarn-unused:privates" // A private member is unused.
)
Global / scalacOptions += "-Ymacro-annotations"
val AkkaVersion = "2.6.18"
val AkkaHttpVersion = "10.2.8"
val PlayWSStandAloneVersion = "2.2.0-M1"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-json-streaming" % "3.0.4",
"com.typesafe.play" %% "play-ahc-ws-standalone" % PlayWSStandAloneVersion,
"com.typesafe.play" %% "play-ws-standalone-json" % PlayWSStandAloneVersion
)
import play.api.libs.ws.StandaloneWSClient
import play.api.libs.ws.ahc.StandaloneAhcWSClient
import akka.stream.SystemMaterializer
import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.Behaviors
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.NotUsed
import scala.concurrent.ExecutionContext
object Main extends App {
implicit val system = ActorSystem(Behaviors.empty, "system")
implicit val ec = system.executionContext
system.whenTerminated map { _ =>
System.exit(0)
}
implicit val mat = SystemMaterializer(system).materializer
val api = new API(StandaloneAhcWSClient())
val dataSource = api.fetchDataAsSource("https://mocki.io/v1/9f7f5c39-c6b9-4e9b-9c04-84d8c80c52c5")
for {
_ <- dataSource.grouped(3).runForeach(println)
_ <- dataSource.runForeach(println)
} {
system.terminate()
}
}
class API(ws: StandaloneWSClient) {
import play.api.libs.json.JsResult
import akka.stream.scaladsl.Source
import API._
def fetchDataAsSource(
url: String
)(implicit ec: ExecutionContext): Source[JsResult[Data], _] = {
val futureFetchBody = ws.url(url).get() map (_.bodyAsSource)
Source
.futureSource(futureFetchBody)
.via(extractDataFlow)
.via(parseData)
}
}
object API {
import play.api.libs.json.Json
import play.api.libs.json.JsResult
import akka.stream.alpakka.json.scaladsl.JsonReader
case class Data(name: String, id: Int)
implicit final val dataReads = Json.reads[Data]
val extractDataFlow: Flow[ByteString, ByteString, NotUsed] = JsonReader.select("$.data[*]")
val parseData: Flow[ByteString, JsResult[Data], NotUsed] = Flow[ByteString].map { bs =>
Json.parse(bs.utf8String).validate[Data](dataReads)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment