Skip to content

Instantly share code, notes, and snippets.

@jeans11

jeans11/core.clj Secret

Created May 15, 2023 13:57
Show Gist options
  • Save jeans11/845fdeedbf1257a8a3355d03f6bd4b31 to your computer and use it in GitHub Desktop.
Save jeans11/845fdeedbf1257a8a3355d03f6bd4b31 to your computer and use it in GitHub Desktop.
Missionary & Datomic
(ns core
(:require
[datomic.api :as d]
[missionary.core :as m])
(:import (java.util.concurrent LinkedBlockingQueue)))
(defn poll-queue
"missionary task to retrieve the last transaction event.
The .take method block the current thread. This method wait until
there is a new element in the queue. In order to not block the thread
we use the missionary blk function to prevent this behaviour.
This allow to run the process in a specific executor."
[queue]
(m/via m/blk (.take ^LinkedBlockingQueue queue)))
(defn polling
"missionary discret flow of transaction event.
ap allow to create a discret flow. loop over the poll-queue process."
[queue]
(m/ap
(loop []
(m/amb (m/? (poll-queue queue))
(recur)))))
(defn latest-db>
"Return a missionary flow of the lastest
db snapshot of the current !conn"
[!conn]
(let [queue (d/tx-report-queue !conn)]
(->> (polling queue)
(m/reductions {} nil)
(m/relieve {})
(m/latest (fn [_] (d/db !conn))))))
@dustingetz
Copy link

dustingetz commented Jun 5, 2023

This is idiomatic but care must be taken that the flow has only a single instance, because Datomic's tx-report-queue will only notify once per txn, so all connected sessions must share the same singleton tx-report-queue flow. See discussion and example usage that has a top level entrypoint for a flow like this: https://clojurians.slack.com/archives/CL85MBPEF/p1684166929251079

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment