Skip to content

Instantly share code, notes, and snippets.

@debasishg
Forked from rrodseth/UnfoldPages.scala
Created June 14, 2021 18:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debasishg/716ce9b7db27a5f91819991cf12ba662 to your computer and use it in GitHub Desktop.
Save debasishg/716ce9b7db27a5f91819991cf12ba662 to your computer and use it in GitHub Desktop.
Create akka-stream Source from a pagination, using Source.unfoldAsync
// Inspired by a tweet from @trautonen 1/13/2016
// Use Source.unfoldAsync to turn paginated database results into an akka-streams Source
// unfold is the inverse of fold
case class Page[T](pageNumber:Long, totalPages:Long, contents:List[T])
case class Thing(id: Long, name: String = "foo")
val totalPages = 5 //
val pageSize = 3
// Imagine instead a Slick 3 Database call. The total pages would come from the base query, rather than being supplied
def fetchPage(pageNumber: Long, pageSize: Long, totalPages: Long): Future[Page[Thing]] = Future {
val start = pageNumber * pageSize
Page(pageNumber, totalPages, List(Thing(start), Thing(start + 1), Thing(start + 2)))
}
val sink: Sink[Any, Future[Unit]] = Sink.foreach(println)
// Create a Source using Source.unfoldAsync
val startPage = 1
val source = Source.unfoldAsync(startPage) { pageNum =>
val futurePage: Future[Page[Thing]] = fetchPage(pageNum, pageSize, totalPages)
val next = futurePage.map(page => if (page.pageNumber > page.totalPages) None else Some((pageNum + 1, page)))
next
}
val result: Future[Unit] = source.runWith(sink)
result.onComplete { _ => system.shutdown() }
/*
Page(1,5,List(Thing(3,foo), Thing(4,foo), Thing(5,foo)))
Page(2,5,List(Thing(6,foo), Thing(7,foo), Thing(8,foo)))
Page(3,5,List(Thing(9,foo), Thing(10,foo), Thing(11,foo)))
Page(4,5,List(Thing(12,foo), Thing(13,foo), Thing(14,foo)))
Page(5,5,List(Thing(15,foo), Thing(16,foo), Thing(17,foo)))
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment