Skip to content

Instantly share code, notes, and snippets.

View DirectParquetOutputCommitter.clj
(let [ctx (spark/spark-context conf)
hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)]
(.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))
View DirectOutputCommiter.clj
(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")
View loadParquet.scala
val file ="mergeSchema", "false").parquet(path)
View load-parquet.clj
(-> ^SQLContext sqtx
(.format "parquet")
(.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
View rdd_to_parquet.clj
(let [..
schema (trans/extract-dataframe-schema (record-builder nil))
rdd (spark/map record-builder some-rdd-we-have)
rows (spark/map trans/as-rows rdd)
dataframe (spark/create-data-frame sql-context rows schema)
(spark/save-parquert dataframe output-path :overwrite))
View transformations.clj
(defn extract-dataframe-schema
(let [fields (reduce (fn [lst schema-line]
(let [k (first schema-line)
t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
(conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
arr (ArrayList. fields)]
(DataTypes/createStructType arr)))
(defn as-rows
View schema_sample.clj
(defn record-builder
(let [..
raw-device-params (extract event-record "raw_device_params")
result [...
[:operator (get raw-device-params "operator")]
[:model (get raw-device-params "model")]
[:launch_counter counter DataTypes/LongType]]]
View json-parquet.scala
val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data =
View gist:b6d53e7740864d2d8b15
import sqlContext.createSchemaRDD
val weeklyAggregates =
View gist:c8db8f45b3d077e5f6b5
val entryBuilder=PhoneEntry.newBuilder()
val aggregateBuilder = Aggregate.newBuilder()
val phoneBook = new mutable.HashMap[Long,PhoneEntry]()
val balances = new ListBuffer[Double]()
val hoursBreakdown = new Array[Int](24)
val sample = calls.head
var sumDuration = 0L
var sumPricing = 0.0
var minDuration = Long.MaxValue
var maxDuration = Long.MinValue