Skip to content

Instantly share code, notes, and snippets.

@abhsrivastava
Last active July 1, 2017 23:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhsrivastava/5acd709b435fe780eaf9f827888b8c97 to your computer and use it in GitHub Desktop.
Save abhsrivastava/5acd709b435fe780eaf9f827888b8c97 to your computer and use it in GitHub Desktop.
This is a graph stage source for cassandra data database
import com.datastax.driver.core._
import akka.stream.stage._
import akka.stream._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object AkkaStreamTest2 extends App {
val system = ActorSystem.create("StreamsExamples")
import system.dispatcher
implicit val mat = ActorMaterializer.create(system)
val session = getSession()
val query = """
|select a, b, c, from foo
| where token(id) >= ?
| and token(id) <= ?;
""".stripMargin
val rows = Source.fromGraph(new CassandraSource(session, query))
val runnableGraph = rows.take(2).toMat(Sink.foreach(r => println(r.getLong(0))))(Keep.right)
runnableGraph.run().onComplete(_ => system.terminate())
System.exit(0)
}
class CassandraSource(session: Session, query: String) extends GraphStage[SourceShape[Row]] {
val out : Outlet[Row] = Outlet.create[Row]("CassandraSource.out")
val sourceShape = SourceShape.of(out)
val scanStmt = session.prepare(query)
var start = System.currentTimeMillis()
val ps = scanStmt.bind(new java.lang.Long(Long.MinValue), new java.lang.Long(Long.MaxValue))
.setFetchSize(1000)
.setConsistencyLevel(ConsistencyLevel.QUORUM)
val rs = session.execute(ps)
val iter = rs.iterator()
override def shape() : SourceShape[Row] = {
sourceShape
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(sourceShape) {
val abstractOutHandler = new AbstractOutHandler {
override def onPull() = {
if (!rs.isExhausted) {
rs.fetchMoreResults()
val row = iter.next()
push[Row](out, row)
}
}
}
setHandler(out, abstractOutHandler)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment