Created
April 1, 2019 06:43
-
-
Save L7R7/2601a71300dbd4700d3eda5acdad048e to your computer and use it in GitHub Desktop.
Updated/fixed version for: https://gist.github.com/L7R7/46615daa6d1a41d0499c261db8a2f3bb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.l7r7.lab.feed.client | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.headers.Link | |
import akka.http.scaladsl.unmarshalling.Unmarshal | |
import akka.stream.scaladsl.Source | |
import akka.stream.{ ActorMaterializer, Materializer } | |
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future } | |
object Reader { | |
implicit val actorSystem: ActorSystem = ActorSystem() | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher | |
def read(initialRequest: HttpRequest): Source[(HttpResponse, Multipart.General), _] = | |
Source.unfoldAsync[Option[HttpRequest], (HttpResponse, Multipart.General)](Some(initialRequest))(Crawl.crawl) | |
def main(args: Array[String]): Unit = { | |
val initialRequest: HttpRequest = ??? | |
read(initialRequest) | |
.map(_._2) | |
.flatMapConcat(_.parts) | |
.runForeach(p => println(p.entity.contentType)) | |
} | |
} | |
object Crawl { | |
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = { | |
reqOption match { | |
case Some(request) => | |
Http().singleRequest(request) | |
.flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart))) | |
.map { | |
case tuple@(response, multipart) => | |
if (response.status.isFailure()) Some((None, tuple)) | |
else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) } | |
} | |
case None => Future.successful(None) | |
} | |
} | |
val baseUrl: Uri = ??? | |
def nextRequest(r: HttpResponse, m: HttpMethod): Option[(Option[HttpRequest], HttpResponse)] = | |
Some((nextLink(r).map(url => HttpRequest(uri = baseUrl.copy(path = url.path))), r)) | |
def nextLink(r: HttpResponse): Option[Uri] = | |
r.headers[Link] | |
.map(_.values) | |
.filter(_.size == 1) | |
.map(_.head) | |
.filter(linkValue => linkValue.params.head.key == "rel" && linkValue.params.head.value() == "next") | |
.map(_.uri) | |
.headOption | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment