Skip to content

Instantly share code, notes, and snippets.

@L7R7
Created April 1, 2019 06:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save L7R7/2601a71300dbd4700d3eda5acdad048e to your computer and use it in GitHub Desktop.
Save L7R7/2601a71300dbd4700d3eda5acdad048e to your computer and use it in GitHub Desktop.
package com.l7r7.lab.feed.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Link
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.Source
import akka.stream.{ ActorMaterializer, Materializer }
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future }
object Reader {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
def read(initialRequest: HttpRequest): Source[(HttpResponse, Multipart.General), _] =
Source.unfoldAsync[Option[HttpRequest], (HttpResponse, Multipart.General)](Some(initialRequest))(Crawl.crawl)
def main(args: Array[String]): Unit = {
val initialRequest: HttpRequest = ???
read(initialRequest)
.map(_._2)
.flatMapConcat(_.parts)
.runForeach(p => println(p.entity.contentType))
}
}
object Crawl {
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
reqOption match {
case Some(request) =>
Http().singleRequest(request)
.flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
.map {
case tuple@(response, multipart) =>
if (response.status.isFailure()) Some((None, tuple))
else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
}
case None => Future.successful(None)
}
}
val baseUrl: Uri = ???
def nextRequest(r: HttpResponse, m: HttpMethod): Option[(Option[HttpRequest], HttpResponse)] =
Some((nextLink(r).map(url => HttpRequest(uri = baseUrl.copy(path = url.path))), r))
def nextLink(r: HttpResponse): Option[Uri] =
r.headers[Link]
.map(_.values)
.filter(_.size == 1)
.map(_.head)
.filter(linkValue => linkValue.params.head.key == "rel" && linkValue.params.head.value() == "next")
.map(_.uri)
.headOption
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment