Created
May 20, 2014 20:21
-
-
Save robertsosinski/40855253798319f1cdbb to your computer and use it in GitHub Desktop.
Listening to Asynchronous Listen/Notify with Scala and Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactive.sandbox.actors | |
import java.sql.DriverManager | |
import org.postgresql.PGConnection | |
import akka.actor.{Actor, ActorLogging, ActorRef} | |
case class Subscription(channel: String) | |
case class UnSubscription(channel: String) | |
class Notifier extends Actor with ActorLogging { | |
val conn = new NotifierConnection(self) | |
def receive = { | |
case Subscription(channel) => conn.listen(channel) | |
case UnSubscription(channel) => conn.unListen(channel) | |
case Notification(channel, payload, pid) => log.info(s"$channel sent $payload from $pid") | |
case exception: Exception => log.error(s"${exception.getClass}: ${exception.getMessage}") | |
} | |
override def preStart() { | |
conn.start() | |
} | |
override def postStop() { | |
conn.shutdown() | |
} | |
// Notification Connection to PostgreSQL | |
protected case class Notification(channel: String, payload: String, pid: Int) | |
protected class NotifierConnection(notifier: ActorRef) extends Thread { | |
var isRunning = true | |
val javaConn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/sandbox", "sandbox", "") | |
val psqlConn = javaConn.asInstanceOf[PGConnection] | |
def listen(channel: String) { | |
execute(s"LISTEN $channel") | |
} | |
def unListen(channel: String) { | |
execute(s"UNLISTEN $channel") | |
} | |
def shutdown() { | |
this.isRunning = false | |
} | |
private def execute(statement: String) { | |
val stmt = javaConn.createStatement | |
stmt.execute(statement) | |
stmt.close() | |
} | |
override def run() { | |
while (isRunning) { | |
try { | |
val notifications = psqlConn.getNotifications | |
if (notifications != null) { | |
for (notification <- notifications) { | |
notifier ! Notification(notification.getName, notification.getParameter, notification.getPID) | |
} | |
} | |
Thread.sleep(10) | |
} | |
catch { | |
case exception: Exception => notifier ! exception | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.reactive.sandbox | |
import akka.actor.{ActorSystem, Props} | |
import io.reactive.sandbox.actors.{Notifier, Subscription} | |
object Sandbox { | |
def main(args: Array[String]) { | |
val system = ActorSystem("sandbox") | |
val notifier = system.actorOf(Props(new Notifier), "notifier") | |
notifier ! Subscription("mychannel") | |
system.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment