Skip to content

Instantly share code, notes, and snippets.

@tpolecat
Last active August 29, 2015 14:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tpolecat/b5ec288eca42140b6286 to your computer and use it in GitHub Desktop.
Save tpolecat/b5ec288eca42140b6286 to your computer and use it in GitHub Desktop.

doobie experiment - postgres NOTIFY as scalaz-stream

One of the cool things PostgreSQL gives you is a simple notification system that you can use to let clients know that something interesting has happened. For instance you can set up rules that broadcast notifications when a table is updated, and client applications can update displays in response. The JDBC driver provides access to this API so I thought I would see what it would look like in doobie.

The program below constructs a Process[ConnectionIO, PGNotification] that registers for events, polls for them periodically (this is the best we can do with current driver support), and unregisters when the stream terminates for any reason. We use a Transactor to replace ConnectionIO with Task which gives us something we can actually run.

package doobie.example

import doobie.imports._

import org.postgresql._

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._, Process.{ eval, eval_, repeatEval, emitAll}

/**
 * Example exposing PostrgreSQL NOTIFY as a Process[ConnectionIO, PGNotification]. This will 
 * likely be provided as a standard service in doobie-contrib-postgresql in a future version. 
 * To play with this program, run it and then in another window do:
 *
 * > psql -d world -U postgres -c "notify foo, 'abc'"
 *
 * to send a notification. The program will exit after reading five notifications.
 */
object PostgresNotify {

  /** Program that retrieves the underlying PGConnection */
  val getPGConnection: ConnectionIO[PGConnection] =
    FC.unwrap(classOf[PGConnection])

  /** Program that gets all new notifications. */
  val getNotifications: ConnectionIO[List[PGNotification]] =
    getPGConnection.flatMap(c => HC.delay(c.getNotifications).map {
      case null => Nil
      case as   => as.toList  
    })

  /** Construct a program that execs a no-param statement and discards the return value */
  def execVoid(sql: String): ConnectionIO[Unit] =
    HC.prepareStatement(sql)(HPS.executeUpdate).void

  /** Construct a program that starts listening on the given channel. */
  def listen(channel: String): ConnectionIO[Unit] = 
    execVoid("LISTEN " + channel)

  /** Construct a program that stops listening on the given channel. */
  def unlisten(channel: String): ConnectionIO[Unit] = 
    execVoid("UNLISTEN " + channel)

  /** 
   * Construct a program that pauses the current thread. This doesn't scale, but neither do 
   * long- running connection-bound operations like NOTIFY/LISTEN. So the approach here is to
   * burn a thread reading the events and multplex somewhere downstream.
   */
  def sleep(ms: Long): ConnectionIO[Unit] =
    HC.delay(Thread.sleep(ms))

  /** 
   * Construct a stream of PGNotifications on the specified channel, polling at the specified
   * rate. Note that this process, when run, will commit the current transaction.
   */
  def notificationStream(channel: String, ms: Long): Process[ConnectionIO, PGNotification] =
    (for {
      _  <- eval(listen(channel) *> HC.commit)
      ns <- repeatEval(sleep(ms) *> getNotifications <* HC.commit)
      n  <- emitAll(ns)
    } yield n).onComplete(eval_(unlisten(channel) *> HC.commit))

  /** A transactor that knows how to connect to a PostgreSQL database. */
  val xa: Transactor[Task] = 
    DriverManagerTransactor("org.postgresql.Driver", "jdbc:postgresql:world", "postgres", "")

  /** 
   * Construct a stream of PGNotifications that prints to the console. Transform it to a 
   * runnable process using the transcactor above, and run it.
   */
  def main(args: Array[String]): Unit =
    notificationStream("foo", 1000)
      .map(n => s"${n.getPID} ${n.getName} ${n.getParameter}")
      .take(5)
      .sink(s => HC.delay(Console.println(s)))
      .transact(xa)
      .void
      .run

}

Running this program in the REPL displays the first five notifications on channel foo.

> runMain doobie.example.pgnotify
[info] Compiling 1 Scala source to /Users/rnorris/Scala/doobie/core/target/scala-2.11/classes...
[info] Running doobie.example.pgnotify 
35257 foo hi
35257 foo there
35257 foo this
35257 foo is
35257 foo some
[success] Total time: 28 s, completed Mar 15, 2015 10:50:31 PM
> 

Produced by doing this in another window:

doobie (pg-fun *)$ psql -d world -U postgres
psql (9.4.1)
Type "help" for help.

world=> notify foo, 'hi';
NOTIFY
world=> notify foo, 'there';
NOTIFY
world=> notify foo, 'this';
NOTIFY
world=> notify foo, 'is';
NOTIFY
world=> notify foo, 'some';
NOTIFY
world=> notify foo, 'crazy shit';
NOTIFY
world=> 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment