Created
March 23, 2019 17:32
-
-
Save L7R7/46615daa6d1a41d0499c261db8a2f3bb to your computer and use it in GitHub Desktop.
Sample Reader for reading a paginated a HTTP resource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.l7r7.lab.feed.client | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.Multipart.General | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.headers.Link | |
import akka.http.scaladsl.unmarshalling.Unmarshal | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{ Flow, Source } | |
import scala.concurrent.{ ExecutionContextExecutor, Future } | |
object Reader { | |
implicit val actorSystem: ActorSystem = ActorSystem() | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher | |
def read(request: HttpRequest): Source[HttpResponse, _] = | |
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl) | |
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse] | |
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General])) | |
.flatMapConcat(_.parts) | |
def main(args: Array[String]): Unit = { | |
val initialRequest: HttpRequest = ??? | |
read(initialRequest) | |
.via(parse) | |
.runForeach(p => println(p.entity.contentType)) | |
} | |
} | |
object Crawl { | |
import Reader.{ actorSystem, executionContext } | |
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match { | |
case Some(req) => | |
Http().singleRequest(req).map { response => | |
if (response.status.isFailure()) Some((None, response)) | |
else nextRequest(response, HttpMethods.GET) | |
} | |
case None => Future.successful(None) | |
} | |
val baseUrl: Uri = ??? | |
def nextRequest(r: HttpResponse, m: HttpMethod): Option[(Option[HttpRequest], HttpResponse)] = | |
Some((nextLink(r).map(url => HttpRequest(uri = baseUrl.copy(path = url.path))), r)) | |
def nextLink(r: HttpResponse): Option[Uri] = | |
r.headers[Link] | |
.map(_.values) | |
.filter(_.size == 1) | |
.map(_.head) | |
.filter(linkValue => linkValue.params.head.key == "rel" && linkValue.params.head.value() == "next") | |
.map(_.uri) | |
.headOption | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment