Skip to content

Instantly share code, notes, and snippets.

@jeroenvandijk
Created July 19, 2012 09:39
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jeroenvandijk/3142700 to your computer and use it in GitHub Desktop.
Save jeroenvandijk/3142700 to your computer and use it in GitHub Desktop.
Cascalog JDBC example
(ns queries.postgres
(:gen-class)
(:require [cascalog.ops :as c])
(:use [cascalog.api])
(:import [com.twitter.maple.jdbc JDBCTap JDBCScheme TableDesc]
[cascading.tap.SinkMode]))
(defn db-tap [table]
(let [tap (com.twitter.maple.jdbc.JDBCTap.
"jdbc:postgresql://my-db-uri?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory"
"username"
"password"
"org.postgresql.Driver"
(com.twitter.maple.jdbc.TableDesc. table (into-array ["id" "timestamp"]) (into-array ["bigserial" "timestamp"]) (into-array ["id"]))
(com.twitter.maple.jdbc.JDBCScheme.
(into-array ["id" "timestamp"])
(str "select id, timestamp from " table " order by id DESC")
(str "select id from " table " order by id DESC limit 1"
10000) ;; SET LIMIT
(cascading.tap.SinkMode/UPDATE)))]
(.setConcurrentReads tap 100) ;; means 100 of limit 100 (limit / concurrent reads)
tap))
(defn -main [& args]
(?<-
(hfs-textline "output.txt")
[?id ?timestamp]
((db-tap "my_table") ?id ?timestamp)
(c/count ?count))
(defproject my-analysis "0.1.0-SNAPSHOT"
:description "TODO: Project Description."
:source-paths ["src/clj"]
:jvm-opts ["-Xmx768m" "-server"]
:repositories {"conjars" "http://conjars.org/repo"}
:dependencies [
[org.clojure/clojure "1.3.0"]
[cascalog "1.9.0"]
;; Cascalog JDBC tap
[com.twitter/maple "0.2.2"]
;; JDBC driver for postgres
[postgresql/postgresql "9.1-901.jdbc4"]
;; Command line args processing
[org.clojure/tools.cli "0.2.1"]
]
:profiles {
:dev {
:dependencies [
[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[midje-cascalog "0.4.0"]
[lein-midje "1.0.10"]
]}})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment