Skip to content

Instantly share code, notes, and snippets.

@josdirksen
Created October 4, 2015 18:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save josdirksen/97c6d7deb8012272a0af to your computer and use it in GitHub Desktop.
Save josdirksen/97c6d7deb8012272a0af to your computer and use it in GitHub Desktop.
import akka.actor.{Props, ActorSystem, Actor}
import org.apache.commons.dbcp.{PoolingDataSource, DelegatingConnection}
import org.json4s.DefaultFormats
import org.postgresql.{PGNotification, PGConnection}
import scalikejdbc._
import org.json4s.native.JsonMethods._
import scala.concurrent.duration._
/**
* Simple case class to marshall to from received event.
*/
case class Product(id : Long, name: String, quantity: Long)
/**
* Main runner. Just setups the connection pool and the actor system
*/
object PostgresNotifications extends App {
// initialize JDBC driver & connection pool
Class.forName("org.postgresql.Driver")
ConnectionPool.singleton("jdbc:postgresql://localhost:5432/triggers", "jos", "######")
ConnectionPool.dataSource().asInstanceOf[PoolingDataSource].setAccessToUnderlyingConnectionAllowed(true)
// initialize the actor system
val system = ActorSystem("Hello")
val a = system.actorOf(Props[Poller], "poller")
// wait for the user to stop the server
println("Press <enter> to exit.")
Console.in.read.toChar
system.terminate
}
class Poller extends Actor {
// execution context for the ticks
import context.dispatcher
val connection = ConnectionPool.borrow()
val db: DB = DB(connection)
val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
override def preStart() = {
// make sure connection isn't closed when executing queries
// we setup the
db.autoClose(false)
db.localTx { implicit session =>
sql"LISTEN events".execute().apply()
}
}
override def postStop() = {
tick.cancel()
db.close()
}
def receive = {
case "tick" => {
db.readOnly { implicit session =>
val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection]
val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
notifications.foreach( not => {
println(s"Received for: ${not.getName} from process with PID: ${not.getPID}")
println(s"Received data: ${not.getParameter} ")
// convert to object
implicit val formats = DefaultFormats
val json = parse(not.getParameter) \\ "data"
val prod = json.extract[Product]
println(s"Received as object: $prod\n")
}
)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment