Last active
September 8, 2021 16:20
-
-
Save frwdrik/56b64ccdd0493c6ee0f0e0c20628daf7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns fredrik.datomic | |
(:require [clojure.core.async :as a] | |
[datomic.api :as d]) | |
(:import (java.util.concurrent LinkedBlockingQueue TimeUnit))) | |
;;; Add the attribute :tx/request-made? onto transactions to signal | |
;;; they need additional processing at some point. | |
;;; Create test database and connection | |
(def db-mem-uri "datomic:mem://deebee") | |
(d/create-database db-mem-uri) | |
(def conn-mem (d/connect db-mem-uri)) | |
(d/transact conn-mem [{:db/ident :sensor/humidity | |
:db/valueType :db.type/float | |
:db/cardinality :db.cardinality/one} | |
{:db/ident :tx/request-made? | |
:db/valueType :db.type/boolean | |
:db/cardinality :db.cardinality/one}]) | |
;;; Create a report queue | |
(def ^LinkedBlockingQueue rq (d/tx-report-queue conn-mem)) | |
;;; Read values from report queue onto a channel. Later we'll create a | |
;;; multplexer on that channel that other channels can tap (subscribe) | |
;;; from. | |
(defn queue->channel | |
"Puts values from q onto a channel. Returns abort-channel." | |
[^LinkedBlockingQueue q channel] | |
(let [abort-channel (a/promise-chan)] | |
(a/thread | |
(loop [] | |
(if (a/poll! abort-channel) | |
(println "stopping queue->channel") | |
(do | |
(when-let [val (try (. q (poll 100 TimeUnit/MILLISECONDS)) | |
(catch InterruptedException _e nil))] | |
(a/>!! channel val)) | |
(recur))))) | |
abort-channel)) | |
;; (defn report-mult-from-queue | |
;; "Returns [mult abort-chan], where mult is a multiplexer reading from | |
;; the report queue of conn. Putting a value on abort-chan closes the | |
;; mult channel and stops polling the report queue." | |
;; [queue] | |
;; (queue->mult queue)) | |
(defn tap-and-run-handler | |
"Taps from the multiplexer m and calls handler with values from m." | |
[m handler] | |
(let [c (a/chan) ;;if handler is slow, use buffering | |
_ (a/tap m c)] | |
(a/go-loop [] | |
(if-let [v (a/<! c)] | |
(do (handler v) (recur)) | |
(a/close! c))))) | |
;;; Functions for finding transactions needing http requests to be | |
;;; made. This can be used on eg. startup of app to find transactions | |
;;; which didn't get processed before the app closed. | |
(defn datom-tx [db datom] | |
(-> (d/entity db (.tx datom)))) | |
(defn tx-needs-req? [tx] | |
(false? (:tx/request-made? tx))) | |
(defn txs-needing-http-calls | |
"Returns the entity ids of transactions needing http calls." | |
[db] | |
(let [ds (d/filter db (comp tx-needs-req? datom-tx)) | |
ts (d/q '[:find [?t ...] | |
:where [?t :db/txInstant]] | |
ds)] | |
ts)) | |
(defn tx-handler | |
"Input is values from the report queue. Do side effects here." | |
[{:keys [db-after]}] | |
(let [tx (d/t->tx (d/basis-t db-after)) | |
tx-entity (d/entity db-after tx)] | |
;;;; DO STUFF HERE, LIKE MAKING HTTP REQUESTS | |
(try | |
(println "needs request?") | |
(println (tx-needs-req? tx-entity)) | |
(catch Exception _e)))) | |
(comment | |
;;; Trying out the above | |
;; Create a report queue. Only transactions happening from this | |
;; point and on will be put on the queue. | |
(def rq (d/tx-report-queue conn-mem)) | |
;; Create a mult from the report queue. | |
(def tx-report-channel (a/chan 10)) | |
(def tx-report-mult (a/mult tx-report-channel)) | |
;; Taps the mult and runs the handler on values from the report queue. | |
(tap-and-run-handler tx-report-mult tx-handler) | |
;; Now connect the mult to the report queue | |
(def report-mult-abort (queue->channel rq tx-report-channel)) | |
(d/transact conn-mem [{:sensor/humidity 123.0} | |
{:db/id "datomic.tx" | |
:tx/request-made? false}]) | |
;; Evalute the below to close the mult channel and stop polling the | |
;;report queue. | |
(a/put! report-mult-abort true) | |
) | |
(comment | |
(def printer (let [c (a/chan)] | |
(a/go-loop [v (a/<! c)] | |
(if (nil? v) | |
(a/close! c) | |
(do (println v) | |
(recur (a/<! c))))) | |
c)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment