Skip to content

Instantly share code, notes, and snippets.

@zarkone
Last active February 18, 2021 08:25
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zarkone/98eb53e4e1f0833b22faa28a1da77ed7 to your computer and use it in GitHub Desktop.
Save zarkone/98eb53e4e1f0833b22faa28a1da77ed7 to your computer and use it in GitHub Desktop.
Clojure + PostgreSQL: NOTIFY example
(ns database
(:require [hikari-cp.core :as connection-pool]
[clojure.java.jdbc :as jdbc]
;; ...
))
(defn- to-datasource-options [{:keys [host port username password database-name]}]
{:auto-commit true
:read-only false
:connection-timeout 30000
:idle-timeout 600000
:max-lifetime 1800000
:minimum-idle 10
:maximum-pool-size 10
:adapter "postgresql"
:server-name host
:port-number port
:username username
:password password
:database-name database-name})
(db/defdb db
{:datasource (->> (:db config/config)
to-datasource-options
connection-pool/make-datasource)})
;; `connect-and-listen-notifications` polls for notifications,
;; because stock jdbc doesn't allow to listen for them.
;; https://jdbc.postgresql.org/documentation/81/listennotify.html
;; To achieve true event-driven behaviour, you need to use custom jdbc:
;; http://impossibl.github.io/pgjdbc-ng/
;; https://stackoverflow.com/questions/21632243/how-do-i-get-asynchronous-event-driven-listen-notify-support-in-java-using-a-p
(defn- connect-and-listen-notifications [channel callback]
(try
(log/info "Init notification listen.")
(while true
(let [conn (-> db :pool :datasource .getConnection)
pg-conn (.unwrap conn org.postgresql.PGConnection)]
(jdbc/execute! {:connection conn} [(str "LISTEN " channel)])
(->> pg-conn
.getNotifications
(map #(.getParameter %))
callback)
(.close conn)
(Thread/sleep 1000)))
(catch Exception e
(log/error "Listen notify error: \n" e)))
(recur channel callback))
(defn listen [channel callback]
(.start
(Thread.
#(connect-and-listen-notifications channel callback))))
;; ......
;; ----------------------------------------------------------------------
;; USAGE
(defn do-index [db-task]
(let [{:keys [query_entity id query_type file writeup file_card writeup_card removed_on folder user]} db-task
operation_type (if removed_on :delete query_type) ;; <-- because we dont DELETE, we set remove_on
]
(condp = [operation_type query_entity]
[:insert "users"] (index-user-with-id id)
[:update "users"] (update-user id)
[:delete "users"] (delete-user id)
[:insert "files"] (index-file-with-id id)
[:update "files"] (index-file-with-id id)
[:delete "files"] (doseq [fc-id (files/get-file-card-ids-by-file id)]
(delete :file fc-id))
;; and etc..
;; and etc..
;; and etc..
;; and etc..
;; otherwise
:do-nothing)))
(defn- parse-and-do-index [message]
(-> message
(json/parse-string case/->snake_case_keyword)
(update :query_type (comp keyword str/lower-case))
do-index))
(defn on-notify [messages]
(async/go
(let [do-index-results (->> messages (map parse-and-do-index) set)
need-to-refresh-index? (not-every? #(= % :do-nothing) do-index-results)]
(when need-to-refresh-index?
(es/refresh-index)
(clean-search-cache!)
(live/mark-handler-updated :api.cards/query-all {})))))
(defonce _ (database/listen "index_update" #'on-notify))
;; -----------------------------------------------------------------------------------
;; MIGRATIONS
;; util.clj
(ns util
(:require [camel-snake-kebab.core :as case]
;; ...
))
(defn- get-notify-trigger-name [table]
(str "notify_" (case/->snake_case (name table)) "_trigger"))
(defn drop-notify-index-trigger [table]
(let [trigger-name (get-notify-trigger-name table)
table (str "\"" (name table) "\"")]
(exec-raw
(str "DROP TRIGGER IF EXISTS " trigger-name " ON " table))))
(defn notify-index-trigger [table]
(drop-notify-index-trigger table)
(let [trigger-name (get-notify-trigger-name table)
table (str "\"" (name table) "\"")]
(exec-raw
(str "CREATE TRIGGER " trigger-name " AFTER UPDATE OR INSERT OR DELETE ON " table " FOR EACH ROW EXECUTE PROCEDURE notify_index_update();"))))
;; ......
;; migrations.clj
(defmigration notify-triggers
(up
(exec-raw
"CREATE OR REPLACE FUNCTION notify_index_update()
RETURNS TRIGGER AS $$
DECLARE
json_record jsonb;
BEGIN
IF (TG_OP = 'DELETE') THEN
json_record := to_jsonb(OLD) || jsonb_build_object('query_entity', TG_TABLE_NAME) || jsonb_build_object('query_type', TG_OP);
PERFORM pg_notify('index_update', json_record::text);
ELSE
json_record := to_jsonb(NEW) || jsonb_build_object('query_entity', TG_TABLE_NAME) || jsonb_build_object('query_type', TG_OP);
PERFORM pg_notify('index_update', json_record::text);
END IF;
-- [entity id type file writeup file_card writeup_card]
RETURN NULL;
END;
$$ language 'plpgsql';")
(util/notify-index-trigger :users)
(util/notify-index-trigger :files)
(util/notify-index-trigger :file-cards)
(util/notify-index-trigger :writeups)
(util/notify-index-trigger :writeup-cards)
(util/notify-index-trigger :uploads)
(util/notify-index-trigger :folders)
(util/notify-index-trigger :file-card-downloads)
(util/notify-index-trigger :file-ratings)
(util/notify-index-trigger :writeup-card-downloads)
(util/notify-index-trigger :writeup-ratings))
(down
(util/drop-notify-index-trigger :users)
(util/drop-notify-index-trigger :files)
(util/drop-notify-index-trigger :file-cards)
(util/drop-notify-index-trigger :writeups)
(util/drop-notify-index-trigger :writeup-cards)
(util/drop-notify-index-trigger :uploads)
(util/drop-notify-index-trigger :folders)
(util/drop-notify-index-trigger :file-card-downloads)
(util/drop-notify-index-trigger :file-ratings)
(util/drop-notify-index-trigger :writeup-card-downloads)
(util/drop-notify-index-trigger :writeup-ratings)
(exec-raw "DROP FUNCTION IF EXISTS notify_index_update()")))
;; Also see jsonb protocols, you might need to eval it before:
(ns utils.postgres
(:require [clojure.java.jdbc :as proto]
[cheshire.core :as json]
[clj-time.coerce :as time-coerce]
)
(:import [org.postgresql.util PGobject])) ;; from [org.postgresql/postgresql "9.4-1200-jdbc41" :exclusions [org.slf4j/slf4j-simple]]
(extend-protocol proto/ISQLValue
clojure.lang.IPersistentMap
(sql-value [val]
(doto (PGobject.)
(.setType "jsonb")
(.setValue (json/generate-string val))))
org.joda.time.DateTime
(sql-value [val]
(doto (PGobject.)
(.setType "timestamp")
(.setValue (time-coerce/to-string val)))))
(extend-protocol proto/ISQLParameter
clojure.lang.IPersistentMap
(set-parameter [val stmt idx]
(.setObject stmt idx (proto/sql-value val)))
org.joda.time.DateTime
(set-parameter [val stmt idx]
(.setObject stmt idx (proto/sql-value val)))
clojure.lang.IPersistentVector
(set-parameter [val stmt idx]
(let [conn (.getConnection stmt)
meta (.getParameterMetaData stmt)
type-name (.getParameterTypeName meta idx)]
(if-let [elem-type (when (= (first type-name) \_) (apply str (rest type-name)))]
(.setObject stmt idx (.createArrayOf conn elem-type (to-array val)))
(.setObject stmt idx val)))))
(extend-protocol proto/IResultSetReadColumn
PGobject
(result-set-read-column [pgobj metadata idx]
(let [type (.getType pgobj)
value (.getValue pgobj)]
(case type
"jsonb" (json/parse-string value true) ; with keywords
:else value)))
java.sql.Timestamp
(result-set-read-column [value metadata idx]
(time-coerce/from-sql-time value))
org.postgresql.jdbc4.Jdbc4Array
(result-set-read-column [value metadata idx]
(vec (.getArray value))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment