Skip to content

Instantly share code, notes, and snippets.

(let [ctx (spark/spark-context conf)
hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)]
(.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))
(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")
val file = sqx.read.option("mergeSchema", "false").parquet(path)
(-> ^SQLContext sqtx
(.read)
(.format "parquet")
(.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
(.load))
(let [..
schema (trans/extract-dataframe-schema (record-builder nil))
..
rdd (spark/map record-builder some-rdd-we-have)
rows (spark/map trans/as-rows rdd)
dataframe (spark/create-data-frame sql-context rows schema)
]
(spark/save-parquert dataframe output-path :overwrite))
(defn extract-dataframe-schema
[rec]
(let [fields (reduce (fn [lst schema-line]
(let [k (first schema-line)
t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
(conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
arr (ArrayList. fields)]
(DataTypes/createStructType arr)))
(defn as-rows
(defn record-builder
[event-record]
(let [..
raw-device-params (extract event-record "raw_device_params")
result [...
[:operator (get raw-device-params "operator")]
[:model (get raw-device-params "model")]
...
[:launch_counter counter DataTypes/LongType]]]
result))
val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data = sqlContext.read.json(inputPath)
date.write.parquet(outputPath)
import sqlContext.createSchemaRDD
val weeklyAggregates = groupedWeekly.values.map(CalcAggregations(_))
weeklyAggregates.saveAsParquetFile("weekly.parquet")
val entryBuilder=PhoneEntry.newBuilder()
val aggregateBuilder = Aggregate.newBuilder()
val phoneBook = new mutable.HashMap[Long,PhoneEntry]()
val balances = new ListBuffer[Double]()
val hoursBreakdown = new Array[Int](24)
val sample = calls.head
var sumDuration = 0L
var sumPricing = 0.0
var minDuration = Long.MaxValue
var maxDuration = Long.MinValue