Skip to content

Instantly share code, notes, and snippets.

@frwdrik
Last active September 8, 2021 16:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frwdrik/56b64ccdd0493c6ee0f0e0c20628daf7 to your computer and use it in GitHub Desktop.
Save frwdrik/56b64ccdd0493c6ee0f0e0c20628daf7 to your computer and use it in GitHub Desktop.
(ns fredrik.datomic
(:require [clojure.core.async :as a]
[datomic.api :as d])
(:import (java.util.concurrent LinkedBlockingQueue TimeUnit)))
;;; Add the attribute :tx/request-made? onto transactions to signal
;;; they need additional processing at some point.
;;; Create test database and connection
(def db-mem-uri "datomic:mem://deebee")
(d/create-database db-mem-uri)
(def conn-mem (d/connect db-mem-uri))
(d/transact conn-mem [{:db/ident :sensor/humidity
:db/valueType :db.type/float
:db/cardinality :db.cardinality/one}
{:db/ident :tx/request-made?
:db/valueType :db.type/boolean
:db/cardinality :db.cardinality/one}])
;;; Create a report queue
(def ^LinkedBlockingQueue rq (d/tx-report-queue conn-mem))
;;; Read values from report queue onto a channel. Later we'll create a
;;; multplexer on that channel that other channels can tap (subscribe)
;;; from.
(defn queue->channel
"Puts values from q onto a channel. Returns abort-channel."
[^LinkedBlockingQueue q channel]
(let [abort-channel (a/promise-chan)]
(a/thread
(loop []
(if (a/poll! abort-channel)
(println "stopping queue->channel")
(do
(when-let [val (try (. q (poll 100 TimeUnit/MILLISECONDS))
(catch InterruptedException _e nil))]
(a/>!! channel val))
(recur)))))
abort-channel))
;; (defn report-mult-from-queue
;; "Returns [mult abort-chan], where mult is a multiplexer reading from
;; the report queue of conn. Putting a value on abort-chan closes the
;; mult channel and stops polling the report queue."
;; [queue]
;; (queue->mult queue))
(defn tap-and-run-handler
"Taps from the multiplexer m and calls handler with values from m."
[m handler]
(let [c (a/chan) ;;if handler is slow, use buffering
_ (a/tap m c)]
(a/go-loop []
(if-let [v (a/<! c)]
(do (handler v) (recur))
(a/close! c)))))
;;; Functions for finding transactions needing http requests to be
;;; made. This can be used on eg. startup of app to find transactions
;;; which didn't get processed before the app closed.
(defn datom-tx [db datom]
(-> (d/entity db (.tx datom))))
(defn tx-needs-req? [tx]
(false? (:tx/request-made? tx)))
(defn txs-needing-http-calls
"Returns the entity ids of transactions needing http calls."
[db]
(let [ds (d/filter db (comp tx-needs-req? datom-tx))
ts (d/q '[:find [?t ...]
:where [?t :db/txInstant]]
ds)]
ts))
(defn tx-handler
"Input is values from the report queue. Do side effects here."
[{:keys [db-after]}]
(let [tx (d/t->tx (d/basis-t db-after))
tx-entity (d/entity db-after tx)]
;;;; DO STUFF HERE, LIKE MAKING HTTP REQUESTS
(try
(println "needs request?")
(println (tx-needs-req? tx-entity))
(catch Exception _e))))
(comment
;;; Trying out the above
;; Create a report queue. Only transactions happening from this
;; point and on will be put on the queue.
(def rq (d/tx-report-queue conn-mem))
;; Create a mult from the report queue.
(def tx-report-channel (a/chan 10))
(def tx-report-mult (a/mult tx-report-channel))
;; Taps the mult and runs the handler on values from the report queue.
(tap-and-run-handler tx-report-mult tx-handler)
;; Now connect the mult to the report queue
(def report-mult-abort (queue->channel rq tx-report-channel))
(d/transact conn-mem [{:sensor/humidity 123.0}
{:db/id "datomic.tx"
:tx/request-made? false}])
;; Evalute the below to close the mult channel and stop polling the
;;report queue.
(a/put! report-mult-abort true)
)
(comment
(def printer (let [c (a/chan)]
(a/go-loop [v (a/<! c)]
(if (nil? v)
(a/close! c)
(do (println v)
(recur (a/<! c)))))
c))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment