Skip to content

Instantly share code, notes, and snippets.

@acthp
Created November 25, 2014 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acthp/d60cd29e4b23a4c39326 to your computer and use it in GitHub Desktop.
Save acthp/d60cd29e4b23a4c39326 to your computer and use it in GitHub Desktop.
experimental clojure wrapper for SparkSQL
(ns try-spark.spark
(:refer-clojure :exclude [map filter reduce first name take min distinct max count])
(:require [clojure.core :as core])
(:require [clojure.reflect :as r])
(:import org.apache.spark.sql.api.java.JavaSchemaRDD))
; Parameter classes that we have to wrap in order to
; call rdd methods with plain clojure functions.
(def func-types
(into #{}
(core/map #(symbol (str "org.apache.spark.api.java.function." %))
["Function" "DoubleFunction" "FlatMapFunction2" "FlatMapFunction"
"Function2" "DoubleFlatMapFunction" "VoidFunction"])))
; Arities of the 'call' methods of the above function classes.
(def func-arities
(zipmap
func-types
(core/map
#(core/count (:parameter-types
(core/first (core/filter (fn [m] (= (:name m) 'call))
(:members (r/reflect (Class/forName (str %))))))))
func-types)))
; true if the given method should be wrapped. Skipping private and
; inner class methods.
(defn should-wrap [m]
(and
((:flags m) :public)
(= (.indexOf (str (:name m)) "$") -1)))
(defn arg-list [n]
(core/map #(symbol (str "x" %)) (range n)))
(defn wrap-param [param type]
(if-let [interface (func-types type)] ; Wrap a fn parameter in a proxy class.
(let [args (arg-list (func-arities interface))] ; Build arg list for the proxy.
`(proxy [~type] [] (~'call [~@args] (~param ~@args))))
param))
(defn wrap-arity [{:keys [name parameter-types]}]
(let [params (core/map #(symbol (str "arg" %)) (range (core/count parameter-types)))
param-list (conj (vec params) 'rdd)
call-list (cons 'rdd (core/map wrap-param params parameter-types))
interop (symbol (str "." name))]
`(~param-list (~interop ~@call-list))))
(defn valid-arity [overloads]
(apply = (core/map #(core/map func-types (:parameter-types %)) overloads)))
; We can wrap an arity if func-types appear in
; the same positions for all overloads. If this isn't
; true, we will need run-time checks of parameter types.
; For now, throw if we would need run-time checks.
(defn wrap-member [name overloads]
(let [arities (group-by #(core/count (:parameter-types %)) overloads)
pick-first (core/map (fn [[name arity-overloads]]
(when (not (valid-arity arity-overloads))
(throw (Exception. (str "Arity with ambiguous types:" name arity-overloads))))
(core/first arity-overloads)) arities) ; Assuming they're all the same, use the first.
declare-arities (core/map wrap-arity pick-first)]
`(defn ~name ~@declare-arities)))
(def rdd-methods
(let [rdd-methods (->> (r/reflect JavaSchemaRDD)
(:members)
(core/filter should-wrap)
(group-by :name))]
(core/map #(apply wrap-member %) rdd-methods)))
(defmacro wrap-rdd-methods []
`(do
~@rdd-methods))
(wrap-rdd-methods)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment