One of the cool things PostgreSQL gives you is a simple notification system that you can use to let clients know that something interesting has happened. For instance you can set up rules that broadcast notifications when a table is updated, and client applications can update displays in response. The JDBC driver provides access to this API so I thought I would see what it would look like in doobie.
The program below constructs a Process[ConnectionIO, PGNotification]
that registers for events, polls for them periodically (this is the best we can do with current driver support), and unregisters when the stream terminates for any reason. We use a Transactor
to replace ConnectionIO
with Task
which gives us something we can actually run.
package doobie.example
import doobie.imports._
import org.postgresql._
import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._, Process.{ eval, eval_, repeatEval, emitAll}
/**
* Example exposing PostrgreSQL NOTIFY as a Process[ConnectionIO, PGNotification]. This will
* likely be provided as a standard service in doobie-contrib-postgresql in a future version.
* To play with this program, run it and then in another window do:
*
* > psql -d world -U postgres -c "notify foo, 'abc'"
*
* to send a notification. The program will exit after reading five notifications.
*/
object PostgresNotify {
/** Program that retrieves the underlying PGConnection */
val getPGConnection: ConnectionIO[PGConnection] =
FC.unwrap(classOf[PGConnection])
/** Program that gets all new notifications. */
val getNotifications: ConnectionIO[List[PGNotification]] =
getPGConnection.flatMap(c => HC.delay(c.getNotifications).map {
case null => Nil
case as => as.toList
})
/** Construct a program that execs a no-param statement and discards the return value */
def execVoid(sql: String): ConnectionIO[Unit] =
HC.prepareStatement(sql)(HPS.executeUpdate).void
/** Construct a program that starts listening on the given channel. */
def listen(channel: String): ConnectionIO[Unit] =
execVoid("LISTEN " + channel)
/** Construct a program that stops listening on the given channel. */
def unlisten(channel: String): ConnectionIO[Unit] =
execVoid("UNLISTEN " + channel)
/**
* Construct a program that pauses the current thread. This doesn't scale, but neither do
* long- running connection-bound operations like NOTIFY/LISTEN. So the approach here is to
* burn a thread reading the events and multplex somewhere downstream.
*/
def sleep(ms: Long): ConnectionIO[Unit] =
HC.delay(Thread.sleep(ms))
/**
* Construct a stream of PGNotifications on the specified channel, polling at the specified
* rate. Note that this process, when run, will commit the current transaction.
*/
def notificationStream(channel: String, ms: Long): Process[ConnectionIO, PGNotification] =
(for {
_ <- eval(listen(channel) *> HC.commit)
ns <- repeatEval(sleep(ms) *> getNotifications <* HC.commit)
n <- emitAll(ns)
} yield n).onComplete(eval_(unlisten(channel) *> HC.commit))
/** A transactor that knows how to connect to a PostgreSQL database. */
val xa: Transactor[Task] =
DriverManagerTransactor("org.postgresql.Driver", "jdbc:postgresql:world", "postgres", "")
/**
* Construct a stream of PGNotifications that prints to the console. Transform it to a
* runnable process using the transcactor above, and run it.
*/
def main(args: Array[String]): Unit =
notificationStream("foo", 1000)
.map(n => s"${n.getPID} ${n.getName} ${n.getParameter}")
.take(5)
.sink(s => HC.delay(Console.println(s)))
.transact(xa)
.void
.run
}
Running this program in the REPL displays the first five notifications on channel foo
.
> runMain doobie.example.pgnotify
[info] Compiling 1 Scala source to /Users/rnorris/Scala/doobie/core/target/scala-2.11/classes...
[info] Running doobie.example.pgnotify
35257 foo hi
35257 foo there
35257 foo this
35257 foo is
35257 foo some
[success] Total time: 28 s, completed Mar 15, 2015 10:50:31 PM
>
Produced by doing this in another window:
doobie (pg-fun *)$ psql -d world -U postgres
psql (9.4.1)
Type "help" for help.
world=> notify foo, 'hi';
NOTIFY
world=> notify foo, 'there';
NOTIFY
world=> notify foo, 'this';
NOTIFY
world=> notify foo, 'is';
NOTIFY
world=> notify foo, 'some';
NOTIFY
world=> notify foo, 'crazy shit';
NOTIFY
world=>