Skip to content

Instantly share code, notes, and snippets.

@joefromct
Created November 13, 2017 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joefromct/138d64217f070a57ddf07fdef60d9ecf to your computer and use it in GitHub Desktop.
Save joefromct/138d64217f070a57ddf07fdef60d9ecf to your computer and use it in GitHub Desktop.
(ns db-async-test.core-test
(:require [jdbc.core :as j] ;; this is the namespace for funcool/clojure.jdbc
;;[clojure.java.jdbc :as j]
[while-let.core :refer [while-let]]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as a :refer [>! >!! <! <!! go-loop go chan thread]]
[clojure.string :as str]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql, and db-spec connection info, put db hash-maps
onto ch."
(println "starting reader...")
(let [
row-count (atom 0) ; For state on rows
row-fn (fn[r] (do (>!! ch r)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 100))
(println "Fetched " @row-count " rows."))))]
(with-open [conn (j/connection db-spec)]
(j/atomic conn
(with-open [cursor (j/fetch-lazy conn sql)]
(doseq [row (j/cursor->lazyseq cursor)]
(row-fn row)))))
(a/close! ch)))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file."
(println "starting writer...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 100))
(do
(println "Wrote " @row-count " rows.")
(.flush writer)
(flush))))))
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50000"})
(do
(def ch (chan 1))
(thread (db->chan ch config))
(thread (chan->csv ch "./test.csv")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment