Skip to content

Instantly share code, notes, and snippets.

@L7R7
Created March 23, 2019 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save L7R7/46615daa6d1a41d0499c261db8a2f3bb to your computer and use it in GitHub Desktop.
Save L7R7/46615daa6d1a41d0499c261db8a2f3bb to your computer and use it in GitHub Desktop.
Sample Reader for reading a paginated a HTTP resource
package com.l7r7.lab.feed.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Multipart.General
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Link
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }
import scala.concurrent.{ ExecutionContextExecutor, Future }
object Reader {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
def main(args: Array[String]): Unit = {
val initialRequest: HttpRequest = ???
read(initialRequest)
.via(parse)
.runForeach(p => println(p.entity.contentType))
}
}
object Crawl {
import Reader.{ actorSystem, executionContext }
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
val baseUrl: Uri = ???
def nextRequest(r: HttpResponse, m: HttpMethod): Option[(Option[HttpRequest], HttpResponse)] =
Some((nextLink(r).map(url => HttpRequest(uri = baseUrl.copy(path = url.path))), r))
def nextLink(r: HttpResponse): Option[Uri] =
r.headers[Link]
.map(_.values)
.filter(_.size == 1)
.map(_.head)
.filter(linkValue => linkValue.params.head.key == "rel" && linkValue.params.head.value() == "next")
.map(_.uri)
.headOption
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment