Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Odd Slick streams
package com.examples.streaming_one
import akka.Done
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.PostgresProfile.api._
import slick.jdbc.{ResultSetConcurrency, ResultSetType}
import scala.concurrent._
object Boot5 extends App {
implicit val system: ActorSystem = ActorSystem("HelloStreaming")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val db = Database.forConfig("postgresql")
val action =
sql"""SELECT id as user_id, user_uuid FROM users
WHERE updated_at >= now() - INTERVAL '10 MINUTES'
ORDER BY users.updated_at DESC LIMIT 10
""".as[(Int, String)]
.transactionally
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 1000)
val source = Source.fromPublisher(db.stream(action))
/* Some enrichment flow */
val enrich = Flow[(Int, String)].named("hello")
.mapAsyncUnordered(5) { case (id, email) => Future((id, email.toUpperCase())) }
.zipWithIndex
.map { case ((id, email), index) => (id, email, index) }
.take(5)
val r: Future[Done] = source.via(enrich).runWith(Sink.foreach(println))
r.onComplete(_ => system.terminate())
}
package com.examples.streaming_one
import akka.Done
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.PostgresProfile.api._
import slick.jdbc.{ResultSetConcurrency, ResultSetType}
import scala.concurrent._
object Boot5 extends App {
implicit val system: ActorSystem = ActorSystem("HelloStreaming")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val db = Database.forConfig("postgresql")
val action =
sql"""SELECT id as user_id, user_uuid FROM users
ORDER BY users.updated_at DESC LIMIT 10
""".as[(Int, String)]
.transactionally
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 1000)
val source = Source.fromPublisher(db.stream(action))
/* Some enrichment flow */
val enrich = Flow[(Int, String)].named("hello")
.mapAsyncUnordered(5) { case (id, email) => Future((id, email.toUpperCase())) }
.zipWithIndex
.map { case ((id, email), index) => (id, email, index) }
.take(5)
val r: Future[Done] = source.via(enrich).runWith(Sink.foreach(println))
r.onComplete(_ => system.terminate())
}
# Read:
postgresql = {
# Reference: https://github.com/brettwooldridge/HikariCP
driver = "org.postgresql.Driver"
# Please read this documentation
# - https://devcenter.heroku.com/articles/connecting-to-relational-databases-on-heroku-with-java#using-the-database_url-in-plain-jdbc
# - http://slick.lightbend.com/doc/3.2.0/gettingstarted.html#quick-introduction
url = ${DATABASE_URL}
numThreads = 5
maxConnections = 5
poolName = "my-pool"
autoCommit = true
# readOnly = false
# connectionTimeout = 50000
# validationTimeout = 50000
# initializationFailFast = true
# allowPoolSuspension = true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.