secret
Created

Demonstrate Korma upsert and hack needed for Heroku

  • Download Gist
example.clj
Clojure
1 2 3 4 5 6 7 8 9 10 11 12 13
(ns scraper.db
(:require [scraper.korma-extended])
(:use [korma.core]
[korma.db]))
 
(defdb db (postgres {:db "korma"
:user "db"
:password "dbpass"}))
 
(defentity rss_apps)
 
; Insert or update when already exist
(upsert rss_apps (values {:id "106" :store "OZ"}))
korma_extended.clj
Clojure
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
(ns scraper.korma-extended)
 
; Hack into clojure.java.jdbc.internal to make it work on OpenJDK (Heroku)
; https://github.com/clojure/java.jdbc/blob/master/src/main/clojure/clojure/java/jdbc/internal.clj#L314
(ns clojure.java.jdbc.internal
(:use clojure.java.jdbc.internal))
 
(defn prepare-statement*
"Create a prepared statement from a connection, a SQL string and an
optional list of parameters:
:return-keys true | false - default false
:result-type :forward-only | :scroll-insensitive | :scroll-sensitive
:concurrency :read-only | :updatable
:fetch-size n
:max-rows n"
[^Connection con ^String sql & {:keys [return-keys result-type concurrency cursors fetch-size max-rows]}]
(let [^PreparedStatement stmt (cond
return-keys ;(try
; FIXME this line is screwing up our query when used on openJDK!!
; (.prepareStatement con sql java.sql.Statement/RETURN_GENERATED_KEYS)
;(catch Exception _
(do
(println "WARNING: hack is active here")
;; assume it is unsupported and try basic PreparedStatement:
(.prepareStatement con sql)
)
;)
;)
(and result-type concurrency) (if cursors
(.prepareStatement con sql
(result-type result-set-type)
(concurrency result-set-concurrency)
(cursors result-set-holdability))
(.prepareStatement con sql
(result-type result-set-type)
(concurrency result-set-concurrency)))
:else (.prepareStatement con sql))]
(when fetch-size (.setFetchSize stmt fetch-size))
(when max-rows (.setMaxRows stmt max-rows))
stmt))
 
;*****************************************************
; Korma.core
;*****************************************************
 
(ns korma.core
"Core querying and entity functions"
(:use korma.core))
 
(defn upsert* [ent]
"Create an empty upsert query. Ent can either be an entity defined by defentity,
or a string of the table name."
[ent]
(if (:type ent)
ent
(let [q (empty-query ent)]
(merge q {:type :upsert
:values []
:upsert-keys [(:pk ent)]
:results :keys}))))
(defmacro upsert
"Creates an upsert query, applies any modifying functions in the body and then
executes it. `ent` is either a string or an entity created by defentity.
 
ex: (upsert user s
(values {:name \"chris\"})
(set-upsert-keys :user_id))
 
Results in the following upsert query.
 
BEGIN WORK;
LOCK TABLE rss_apps IN SHARE MODE;
WITH new_values (store, id) as ( VALUES ('NL', '1') ),
upsert as (
UPDATE rss_apps m SET store = nv.store
FROM new_values nv
WHERE m.id = nv.id
RETURNING m.*)
INSERT INTO rss_apps (store, id)
SELECT store, id
FROM new_values
WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = new_values.id);
COMMIT WORK"
[ent & body]
`(let [query# (-> (upsert* ~ent)
~@body)]
(exec query#)))
(defn set-upsert-keys
"Set the fields and values for an update query."
[query ks]
(merge query {:upsert-keys (if (vector? ks ) ks [ks]) }))
 
;;*****************************************************
;; korma.sql.engine
;;*****************************************************
 
(ns korma.sql.engine
(:require [clojure.string :as string]
[korma.sql.utils :as utils]
[korma.config :as conf]
[clojure.walk :as walk]))
 
; http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-postgresql
(defn sql-upsert [query]
(let [upsert-keys (:upsert-keys query)
upsert-keys-str (map name upsert-keys)
upsert-key-fn (fn [t1 t2] (clojure.string/join " AND " (map #(str t1 "." % " = " t2 "." %) upsert-keys-str)))
ins-keys (keys (first (:values query)))
upd-keys (map name (disj (set ins-keys) upsert-keys))
upd-clause (clojure.string/join ", " (map #(str % " = nv." %) upd-keys))
keys-clause (utils/comma (map name ins-keys))
ins-values (insert-values-clause ins-keys (:values query))
values-clause (utils/comma ins-values)
table (:table query)
neue-sql (str "LOCK TABLE " table " IN SHARE ROW EXCLUSIVE MODE;\n"
"WITH new_values " (utils/wrap keys-clause) " as ( VALUES " values-clause " ),\n"
"upsert as (\n"
"UPDATE " table " m SET " upd-clause "\n"
"FROM new_values nv\n"
"WHERE " (upsert-key-fn "m" "nv") "\n"
"RETURNING m.*"
")\n"
"INSERT INTO " table " " (utils/wrap keys-clause) "\n"
"SELECT " keys-clause "\n"
"FROM new_values\n"
"WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE " (upsert-key-fn "up" "new_values") ");"
)]
(assoc query :sql-str neue-sql)))
 
; Add upsert type
(defmethod ->sql :upsert [query]
(bind-params
(-> query
(sql-upsert))))

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.