Skip to content

Instantly share code, notes, and snippets.

@soujiro32167
Created January 13, 2023 20:19
Show Gist options
  • Save soujiro32167/8c913df449702bfbf24ab0c7514cee01 to your computer and use it in GitHub Desktop.
Save soujiro32167/8c913df449702bfbf24ab0c7514cee01 to your computer and use it in GitHub Desktop.
Doobie postgres listen + notify CDC
import cats.effect.{IO, IOApp, Resource}
import doobie.postgres.PHC
import doobie.{ConnectionIO, HC, LogHandler, Transactor}
import fs2.Pipe
import org.postgresql.PGNotification
import scala.concurrent.duration.*
import cats.syntax.all.*
import doobie.implicits.*
import fs2.Stream
object pgnotify extends IOApp.Simple {
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver", "jdbc:postgresql://localhost:5432/postgres", "postgres", "postgres"
)
/** A resource that listens on a channel and unlistens when we're done. */
def channel(name: String): Resource[ConnectionIO, Unit] =
Resource.make(PHC.pgListen(name) *> HC.commit)(_ => PHC.pgUnlisten(name) *> HC.commit)
/**
* Stream of PGNotifications on the specified channel, polling at the specified
* rate. Note that this stream, when run, will commit the current transaction.
*/
def notificationStream(
channelName: String,
pollingInterval: FiniteDuration
): Stream[IO, PGNotification] = {
val inner: Pipe[ConnectionIO, FiniteDuration, PGNotification] = ticks => for {
_ <- Stream.resource(channel(channelName))
_ <- ticks
ns <- Stream.eval(PHC.pgGetNotifications <* HC.commit)
n <- Stream.emits(ns)
} yield n
Stream.awakeEvery[IO](pollingInterval).through(inner.transact(xa))
}
val create =
sql"""
create table if not exists t (
state text
)
""".update.run
val insert = sql"insert into t (state) values ('b')".updateWithLogHandler(LogHandler.jdkLogHandler).run
val update =
sql"""
update t set state = state || '1' where state = 'b'
returning (select pg_notify('state_change', 'old: ' || old.state || ', new: ' || t.state) from t as old where state = 'b' limit 1)
""".updateWithLogHandler(LogHandler.jdkLogHandler).withGeneratedKeys[Unit]("1").compile.last
val app = for {
f <- notificationStream("state_change", 500 millis)
.map(n => show"${n.getPID} ${n.getName} ${n.getParameter}")
.take(1)
.evalTap(IO.println)
.compile
.drain
.start
_ <- (create *> insert *> update).transact(xa)
_ <- f.join
} yield ()
override def run: IO[Unit] =
app.void
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment