Skip to content

Instantly share code, notes, and snippets.

@khanetor
Created March 4, 2022 10:03
Show Gist options
  • Save khanetor/8aa9bb992adc439e8489cb737c1d6c07 to your computer and use it in GitHub Desktop.
Save khanetor/8aa9bb992adc439e8489cb737c1d6c07 to your computer and use it in GitHub Desktop.
Akka HTTP Client nested JSON streaming
name := "di-demo"
version := "0.0.1"
scalaVersion := "2.13.8"
Compile / scalacOptions ++= Seq(
"-deprecation", // Warning and location for usages of deprecated APIs.
"-encoding",
"utf-8", // Specify character encoding used by source files.
"-explaintypes", // Explain type errors in more detail.
"-feature", // For features that should be imported explicitly.
"-unchecked", // Generated code depends on assumptions.
"-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access.
// "-Ypartial-unification", // Enable partial unification in type constructor inference
// "-Ywarn-dead-code", // Warn when dead code is identified.
"-Ywarn-extra-implicit", // More than one implicit parameter section is defined.
// "-Ywarn-numeric-widen", // Numerics are implicitly widened.
"-Ywarn-unused:implicits", // An implicit parameter is unused.
"-Ywarn-unused:imports", // An import selector is not referenced.
"-Ywarn-unused:locals", // A local definition is unused.
// "-Ywarn-unused:params", // A value parameter is unused.
"-Ywarn-unused:patvars", // A variable bound in a pattern is unused.
// "-Ywarn-value-discard", // Non-Unit expression results are unused.
"-Ywarn-unused:privates" // A private member is unused.
)
val AkkaVersion = "2.6.18"
val AkkaHttpVersion = "10.2.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-json-streaming" % "3.0.4"
)
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import scala.util.Success
import scala.util.Failure
import akka.http.scaladsl.common.EntityStreamingSupport
import spray.json._
import DefaultJsonProtocol._
import akka.stream.alpakka.json.scaladsl.JsonReader
object Main extends App {
implicit val system = ActorSystem(Behaviors.empty, "SingleRequest")
implicit val ec = system.executionContext
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
case class User(name: String, id: Int)
implicit val userFormat = jsonFormat(User.apply, "name", "id")
/*
```
# Response JSON
{
"data": [
{
"name": "A",
"id": 1
},
{
"name": "B",
"id": 2
},
{
"name": "C",
"id": 3
},
{
"name": "D",
"id": 4
},
{
"name": "E",
"id": 5
}
]
}
```
*/
val request = HttpRequest(uri = "https://mocki.io/v1/9f7f5c39-c6b9-4e9b-9c04-84d8c80c52c5")
val response = Http().singleRequest(request)
response andThen {
case Success(res) =>
val resp = res.entity.dataBytes.via(JsonReader.select("$.data[*]")).map { bs =>
bs.utf8String.parseJson.convertTo[User]
}
resp.runForeach(println)
// val resp = res.entity.dataBytes.map { bs =>
// val data = bs.utf8String.parseJson.asJsObject.getFields("data").head
// data.convertTo[List[User]]
// }
// resp.runForeach{println}
case Failure(exception) => println(exception)
} onComplete { _ =>
system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment