Skip to content

Instantly share code, notes, and snippets.

val data = sc.textFile("src/test/resources/*.csv")
val calls = data.map (Call(_)).cache()
val hourlyPairs = calls.map(c => (c.getHourly,c))
val weeklyPairs = calls.map(c => (c.getWeekly,c))
val groupedHourly = hourlyPairs.groupByKey()
val groupedWeekly = weeklyPairs.groupByKey()
ParquetOutputFormat.setWriteSupportClass(job,classOf[ProtoWriteSupport[Aggregate]])
ProtoParquetOutputFormat.setProtobufClass(job,classOf[Aggregate])
hourlyAggregates.saveAsNewAPIHadoopFile(outputDir,
classOf[Void],
classOf[Aggregate],
classOf[ParquetOutputFormat[Aggregate]],
job.getConfiguration)
val entryBuilder=PhoneEntry.newBuilder()
val aggregateBuilder = Aggregate.newBuilder()
val phoneBook = new mutable.HashMap[Long,PhoneEntry]()
val balances = new ListBuffer[Double]()
val hoursBreakdown = new Array[Int](24)
val sample = calls.head
var sumDuration = 0L
var sumPricing = 0.0
var minDuration = Long.MaxValue
var maxDuration = Long.MinValue
import sqlContext.createSchemaRDD
val weeklyAggregates = groupedWeekly.values.map(CalcAggregations(_))
weeklyAggregates.saveAsParquetFile("weekly.parquet")
(defn record-builder
[event-record]
(let [..
raw-device-params (extract event-record "raw_device_params")
result [...
[:operator (get raw-device-params "operator")]
[:model (get raw-device-params "model")]
...
[:launch_counter counter DataTypes/LongType]]]
result))
(defn extract-dataframe-schema
[rec]
(let [fields (reduce (fn [lst schema-line]
(let [k (first schema-line)
t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
(conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
arr (ArrayList. fields)]
(DataTypes/createStructType arr)))
(defn as-rows
(let [..
schema (trans/extract-dataframe-schema (record-builder nil))
..
rdd (spark/map record-builder some-rdd-we-have)
rows (spark/map trans/as-rows rdd)
dataframe (spark/create-data-frame sql-context rows schema)
]
(spark/save-parquert dataframe output-path :overwrite))
(-> ^SQLContext sqtx
(.read)
(.format "parquet")
(.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
(.load))
val file = sqx.read.option("mergeSchema", "false").parquet(path)
(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")