Skip to content

Instantly share code, notes, and snippets.

@whamtet
Last active June 24, 2016 06:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save whamtet/29d4a82145f08b156601a6675654a8e2 to your computer and use it in GitHub Desktop.
Save whamtet/29d4a82145f08b156601a6675654a8e2 to your computer and use it in GitHub Desktop.
import clojure.lang.IFn
import org.apache.spark.sql.Row
import clojure.java.api.Clojure
import org.apache.spark.sql.types.StructType
def map_clojure(df: DataFrame, source: String, schema: StructType) = {
var clojure_compiled: IFn = null
val rdd = df.map({row =>
if (clojure_compiled == null) {
clojure_compiled = Clojure.`var`("clojure.core", "load-string").invoke(source).asInstanceOf[IFn]
}
Row.fromSeq(clojure_compiled.invoke(row.toSeq.toArray).asInstanceOf[Array[Any]])
})
sqlContext.createDataFrame(rdd, schema)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment