Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active March 2, 2020 12:02
Show Gist options
  • Save atamborrino/c09fad3001dcd15af2ec723842c9cf8d to your computer and use it in GitHub Desktop.
Save atamborrino/c09fad3001dcd15af2ec723842c9cf8d to your computer and use it in GitHub Desktop.
Cassandra Akka Stream on top of Java client with no other deps
import akka.stream.scaladsl.Source
import com.datastax.driver.core.{Row, Session, Statement}
import com.google.common.util.concurrent.{Futures, FutureCallback, ListenableFuture}
import scala.concurrent.{Promise, Future, ExecutionContext}
object StreamCassandraHelper {
import scala.collection.JavaConversions._
def executeAsStream(stmt: Statement)(implicit ec: ExecutionContext, session: Session): Source[Row, Unit] = {
lazy val futRs = guavaFutToScalaFut(session.executeAsync(stmt))
seededLazyAsync(futRs) { rs =>
val firstPage = rs.iterator().take(rs.getAvailableWithoutFetching).toList
val nextPagesSource = Source.unfoldAsync(()) { _ =>
if (rs.isExhausted) {
Future.successful(None)
} else {
guavaFutToScalaFut(rs.fetchMoreResults()).map { _ =>
val page = rs.iterator().take(rs.getAvailableWithoutFetching).toList
Some(() -> page)
}
}
}
.mapConcat(identity)
Source(firstPage) ++ nextPagesSource
}
}
private def guavaFutToScalaFut[A](guavaFut: ListenableFuture[A]): Future[A] = {
val p = Promise[A]()
val callback = new FutureCallback[A] {
def onSuccess(a: A): Unit = p.success(a)
def onFailure(err: Throwable): Unit = p.failure(err)
}
Futures.addCallback(guavaFut, callback)
p.future
}
// seededLazyAsync allows to perform the Cassandra query only at the materialization of the akka stream
private def seededLazyAsync[A, B](fut: => Future[A])(f: A => Source[B, Unit]): Source[B, Unit] = {
Source.single(() => fut)
.mapAsync(1)(_.apply())
.flatMapConcat(f)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment