Created
November 13, 2017 19:22
-
-
Save joefromct/138d64217f070a57ddf07fdef60d9ecf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns db-async-test.core-test | |
(:require [jdbc.core :as j] ;; this is the namespace for funcool/clojure.jdbc | |
;;[clojure.java.jdbc :as j] | |
[while-let.core :refer [while-let]] | |
[clojure.java.io :as io] | |
[clojure.data.csv :as csv] | |
[clojure.core.async :as a :refer [>! >!! <! <!! go-loop go chan thread]] | |
[clojure.string :as str])) | |
(defn db->chan [ch {:keys [sql db-spec]} ] | |
"Given input channel ch, sql, and db-spec connection info, put db hash-maps | |
onto ch." | |
(println "starting reader...") | |
(let [ | |
row-count (atom 0) ; For state on rows | |
row-fn (fn[r] (do (>!! ch r) | |
;; everything below is just for printing to stdout and | |
;; trying to understand where my non-lazy bottleneck is. | |
(swap! row-count inc) | |
(when (zero? (mod @row-count 100)) | |
(println "Fetched " @row-count " rows."))))] | |
(with-open [conn (j/connection db-spec)] | |
(j/atomic conn | |
(with-open [cursor (j/fetch-lazy conn sql)] | |
(doseq [row (j/cursor->lazyseq cursor)] | |
(row-fn row))))) | |
(a/close! ch))) | |
(defn chan->csv [ch csv-file ] | |
"With input channel ch and output file csv-file, read values off ch and write | |
to csv file." | |
(println "starting writer...") | |
(def row-count (atom 0)) | |
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")] | |
(while-let [data (<!! ch)] | |
(swap! row-count inc) | |
(csv/write-csv writer [data] :quote? (fn[x] false) ) | |
(when (zero? (mod @row-count 100)) | |
(do | |
(println "Wrote " @row-count " rows.") | |
(.flush writer) | |
(flush)))))) | |
(def config {:db-spec {:classname "org.postgres.Driver" | |
:subprotocol "postgres" | |
:subname "//my-database-host:5432/mydb" | |
:user "me" | |
:password "****"} | |
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50000"}) | |
(do | |
(def ch (chan 1)) | |
(thread (db->chan ch config)) | |
(thread (chan->csv ch "./test.csv"))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment