Skip to content

Instantly share code, notes, and snippets.

@robertsosinski
Created May 20, 2014 20:21
Show Gist options
  • Save robertsosinski/40855253798319f1cdbb to your computer and use it in GitHub Desktop.
Save robertsosinski/40855253798319f1cdbb to your computer and use it in GitHub Desktop.
Listening to Asynchronous Listen/Notify with Scala and Akka
package io.reactive.sandbox.actors
import java.sql.DriverManager
import org.postgresql.PGConnection
import akka.actor.{Actor, ActorLogging, ActorRef}
case class Subscription(channel: String)
case class UnSubscription(channel: String)
class Notifier extends Actor with ActorLogging {
val conn = new NotifierConnection(self)
def receive = {
case Subscription(channel) => conn.listen(channel)
case UnSubscription(channel) => conn.unListen(channel)
case Notification(channel, payload, pid) => log.info(s"$channel sent $payload from $pid")
case exception: Exception => log.error(s"${exception.getClass}: ${exception.getMessage}")
}
override def preStart() {
conn.start()
}
override def postStop() {
conn.shutdown()
}
// Notification Connection to PostgreSQL
protected case class Notification(channel: String, payload: String, pid: Int)
protected class NotifierConnection(notifier: ActorRef) extends Thread {
var isRunning = true
val javaConn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/sandbox", "sandbox", "")
val psqlConn = javaConn.asInstanceOf[PGConnection]
def listen(channel: String) {
execute(s"LISTEN $channel")
}
def unListen(channel: String) {
execute(s"UNLISTEN $channel")
}
def shutdown() {
this.isRunning = false
}
private def execute(statement: String) {
val stmt = javaConn.createStatement
stmt.execute(statement)
stmt.close()
}
override def run() {
while (isRunning) {
try {
val notifications = psqlConn.getNotifications
if (notifications != null) {
for (notification <- notifications) {
notifier ! Notification(notification.getName, notification.getParameter, notification.getPID)
}
}
Thread.sleep(10)
}
catch {
case exception: Exception => notifier ! exception
}
}
}
}
}
package io.reactive.sandbox
import akka.actor.{ActorSystem, Props}
import io.reactive.sandbox.actors.{Notifier, Subscription}
object Sandbox {
def main(args: Array[String]) {
val system = ActorSystem("sandbox")
val notifier = system.actorOf(Props(new Notifier), "notifier")
notifier ! Subscription("mychannel")
system.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment