Last active
February 18, 2021 08:25
-
-
Save zarkone/98eb53e4e1f0833b22faa28a1da77ed7 to your computer and use it in GitHub Desktop.
Clojure + PostgreSQL: NOTIFY example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns database | |
(:require [hikari-cp.core :as connection-pool] | |
[clojure.java.jdbc :as jdbc] | |
;; ... | |
)) | |
(defn- to-datasource-options [{:keys [host port username password database-name]}] | |
{:auto-commit true | |
:read-only false | |
:connection-timeout 30000 | |
:idle-timeout 600000 | |
:max-lifetime 1800000 | |
:minimum-idle 10 | |
:maximum-pool-size 10 | |
:adapter "postgresql" | |
:server-name host | |
:port-number port | |
:username username | |
:password password | |
:database-name database-name}) | |
(db/defdb db | |
{:datasource (->> (:db config/config) | |
to-datasource-options | |
connection-pool/make-datasource)}) | |
;; `connect-and-listen-notifications` polls for notifications, | |
;; because stock jdbc doesn't allow to listen for them. | |
;; https://jdbc.postgresql.org/documentation/81/listennotify.html | |
;; To achieve true event-driven behaviour, you need to use custom jdbc: | |
;; http://impossibl.github.io/pgjdbc-ng/ | |
;; https://stackoverflow.com/questions/21632243/how-do-i-get-asynchronous-event-driven-listen-notify-support-in-java-using-a-p | |
(defn- connect-and-listen-notifications [channel callback] | |
(try | |
(log/info "Init notification listen.") | |
(while true | |
(let [conn (-> db :pool :datasource .getConnection) | |
pg-conn (.unwrap conn org.postgresql.PGConnection)] | |
(jdbc/execute! {:connection conn} [(str "LISTEN " channel)]) | |
(->> pg-conn | |
.getNotifications | |
(map #(.getParameter %)) | |
callback) | |
(.close conn) | |
(Thread/sleep 1000))) | |
(catch Exception e | |
(log/error "Listen notify error: \n" e))) | |
(recur channel callback)) | |
(defn listen [channel callback] | |
(.start | |
(Thread. | |
#(connect-and-listen-notifications channel callback)))) | |
;; ...... | |
;; ---------------------------------------------------------------------- | |
;; USAGE | |
(defn do-index [db-task] | |
(let [{:keys [query_entity id query_type file writeup file_card writeup_card removed_on folder user]} db-task | |
operation_type (if removed_on :delete query_type) ;; <-- because we dont DELETE, we set remove_on | |
] | |
(condp = [operation_type query_entity] | |
[:insert "users"] (index-user-with-id id) | |
[:update "users"] (update-user id) | |
[:delete "users"] (delete-user id) | |
[:insert "files"] (index-file-with-id id) | |
[:update "files"] (index-file-with-id id) | |
[:delete "files"] (doseq [fc-id (files/get-file-card-ids-by-file id)] | |
(delete :file fc-id)) | |
;; and etc.. | |
;; and etc.. | |
;; and etc.. | |
;; and etc.. | |
;; otherwise | |
:do-nothing))) | |
(defn- parse-and-do-index [message] | |
(-> message | |
(json/parse-string case/->snake_case_keyword) | |
(update :query_type (comp keyword str/lower-case)) | |
do-index)) | |
(defn on-notify [messages] | |
(async/go | |
(let [do-index-results (->> messages (map parse-and-do-index) set) | |
need-to-refresh-index? (not-every? #(= % :do-nothing) do-index-results)] | |
(when need-to-refresh-index? | |
(es/refresh-index) | |
(clean-search-cache!) | |
(live/mark-handler-updated :api.cards/query-all {}))))) | |
(defonce _ (database/listen "index_update" #'on-notify)) | |
;; ----------------------------------------------------------------------------------- | |
;; MIGRATIONS | |
;; util.clj | |
(ns util | |
(:require [camel-snake-kebab.core :as case] | |
;; ... | |
)) | |
(defn- get-notify-trigger-name [table] | |
(str "notify_" (case/->snake_case (name table)) "_trigger")) | |
(defn drop-notify-index-trigger [table] | |
(let [trigger-name (get-notify-trigger-name table) | |
table (str "\"" (name table) "\"")] | |
(exec-raw | |
(str "DROP TRIGGER IF EXISTS " trigger-name " ON " table)))) | |
(defn notify-index-trigger [table] | |
(drop-notify-index-trigger table) | |
(let [trigger-name (get-notify-trigger-name table) | |
table (str "\"" (name table) "\"")] | |
(exec-raw | |
(str "CREATE TRIGGER " trigger-name " AFTER UPDATE OR INSERT OR DELETE ON " table " FOR EACH ROW EXECUTE PROCEDURE notify_index_update();")))) | |
;; ...... | |
;; migrations.clj | |
(defmigration notify-triggers | |
(up | |
(exec-raw | |
"CREATE OR REPLACE FUNCTION notify_index_update() | |
RETURNS TRIGGER AS $$ | |
DECLARE | |
json_record jsonb; | |
BEGIN | |
IF (TG_OP = 'DELETE') THEN | |
json_record := to_jsonb(OLD) || jsonb_build_object('query_entity', TG_TABLE_NAME) || jsonb_build_object('query_type', TG_OP); | |
PERFORM pg_notify('index_update', json_record::text); | |
ELSE | |
json_record := to_jsonb(NEW) || jsonb_build_object('query_entity', TG_TABLE_NAME) || jsonb_build_object('query_type', TG_OP); | |
PERFORM pg_notify('index_update', json_record::text); | |
END IF; | |
-- [entity id type file writeup file_card writeup_card] | |
RETURN NULL; | |
END; | |
$$ language 'plpgsql';") | |
(util/notify-index-trigger :users) | |
(util/notify-index-trigger :files) | |
(util/notify-index-trigger :file-cards) | |
(util/notify-index-trigger :writeups) | |
(util/notify-index-trigger :writeup-cards) | |
(util/notify-index-trigger :uploads) | |
(util/notify-index-trigger :folders) | |
(util/notify-index-trigger :file-card-downloads) | |
(util/notify-index-trigger :file-ratings) | |
(util/notify-index-trigger :writeup-card-downloads) | |
(util/notify-index-trigger :writeup-ratings)) | |
(down | |
(util/drop-notify-index-trigger :users) | |
(util/drop-notify-index-trigger :files) | |
(util/drop-notify-index-trigger :file-cards) | |
(util/drop-notify-index-trigger :writeups) | |
(util/drop-notify-index-trigger :writeup-cards) | |
(util/drop-notify-index-trigger :uploads) | |
(util/drop-notify-index-trigger :folders) | |
(util/drop-notify-index-trigger :file-card-downloads) | |
(util/drop-notify-index-trigger :file-ratings) | |
(util/drop-notify-index-trigger :writeup-card-downloads) | |
(util/drop-notify-index-trigger :writeup-ratings) | |
(exec-raw "DROP FUNCTION IF EXISTS notify_index_update()"))) | |
;; Also see jsonb protocols, you might need to eval it before: | |
(ns utils.postgres | |
(:require [clojure.java.jdbc :as proto] | |
[cheshire.core :as json] | |
[clj-time.coerce :as time-coerce] | |
) | |
(:import [org.postgresql.util PGobject])) ;; from [org.postgresql/postgresql "9.4-1200-jdbc41" :exclusions [org.slf4j/slf4j-simple]] | |
(extend-protocol proto/ISQLValue | |
clojure.lang.IPersistentMap | |
(sql-value [val] | |
(doto (PGobject.) | |
(.setType "jsonb") | |
(.setValue (json/generate-string val)))) | |
org.joda.time.DateTime | |
(sql-value [val] | |
(doto (PGobject.) | |
(.setType "timestamp") | |
(.setValue (time-coerce/to-string val))))) | |
(extend-protocol proto/ISQLParameter | |
clojure.lang.IPersistentMap | |
(set-parameter [val stmt idx] | |
(.setObject stmt idx (proto/sql-value val))) | |
org.joda.time.DateTime | |
(set-parameter [val stmt idx] | |
(.setObject stmt idx (proto/sql-value val))) | |
clojure.lang.IPersistentVector | |
(set-parameter [val stmt idx] | |
(let [conn (.getConnection stmt) | |
meta (.getParameterMetaData stmt) | |
type-name (.getParameterTypeName meta idx)] | |
(if-let [elem-type (when (= (first type-name) \_) (apply str (rest type-name)))] | |
(.setObject stmt idx (.createArrayOf conn elem-type (to-array val))) | |
(.setObject stmt idx val))))) | |
(extend-protocol proto/IResultSetReadColumn | |
PGobject | |
(result-set-read-column [pgobj metadata idx] | |
(let [type (.getType pgobj) | |
value (.getValue pgobj)] | |
(case type | |
"jsonb" (json/parse-string value true) ; with keywords | |
:else value))) | |
java.sql.Timestamp | |
(result-set-read-column [value metadata idx] | |
(time-coerce/from-sql-time value)) | |
org.postgresql.jdbc4.Jdbc4Array | |
(result-set-read-column [value metadata idx] | |
(vec (.getArray value)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment