Skip to content

Instantly share code, notes, and snippets.

@janlisse
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save janlisse/f2672bf8bbee009ef009 to your computer and use it in GitHub Desktop.
Save janlisse/f2672bf8bbee009ef009 to your computer and use it in GitHub Desktop.
Paginated scraper flow
def scrape(implicit charset: Charset): Flow[URL, Document, Unit] = Flow[URL].mapAsyncUnordered(10)(httpClient.getDocument(charset))
.buffer(10, OverflowStrategy.backpressure)
.map(toDocument)
def scrapePaginated[T](f: Document => (T, Option[URL]))(implicit charset: Charset) = Flow() { implicit b =>
import akka.stream.scaladsl.FlowGraph.Implicits._
val merge = b.add(MergePreferred[URL](1))
val unzip = b.add(Unzip[T, Option[URL]]())
merge ~> scrape(charset).map(f) ~> unzip.in
unzip.out1.filter(_.isDefined).map(_.get) ~> Flow[URL].buffer(100, OverflowStrategy.fail) ~> merge.preferred
(merge.in(0), unzip.out0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment