Skip to content

Instantly share code, notes, and snippets.

The difference between map, flatMap is a little confusing for beginers - this example might help:
This can be tested on a spark shell or scala CLI:
scala> val l = List(1,2,3,4,5)
scala> l.map(x => List(x-1, x, x+1))
res1: List[List[Int]] = List(List(0, 1, 2), List(1, 2, 3), List(2, 3, 4), List(3, 4, 5), List(4, 5, 6))
scala> l.flatMap(x => List(x-1, x, x+1))
res2: List[Int] = List(0, 1, 2, 1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6)
@idris75
idris75 / RDD-To-DF-To-DS
Created March 2, 2020 13:37 — forked from jamesrajendran/RDD-To-DF-To-DS
Rdd --> DF --> Table --> SQL --> DS
val lrdd =sc.parallelize( List(1,2,3,4,5,3,5))
//without case class
val namedDF = sqlContext.createDataFrame(lrdd.map(Tuple1.apply)).toDF("Id")
//with case class
case class Dummy(Id: Int)
val namedDF = lrdd.map(x => Dummy(x.toInt)).toDF()
//one liner DF
val ldf = List(1,2,3,4,5,3,5).toDS().toDF()
@idris75
idris75 / reduceVsreduceByKey
Created March 2, 2020 13:37 — forked from jamesrajendran/reduceVsreduceByKey
Demystifying reduce and reduceByKey
val lrdd =sc.parallelize( List(1,2,3,4,5,3,5))
lrdd.reduce((a,b) => a+b)
//short version of the syntax below
lrdd.reduce( _+_)
// try changing reduce to reduceByKey, which will fail
// as reduceByKey is applicable to key-value pairs
lrdd.reduceByKey( _+_)
@idris75
idris75 / SparkStreaming
Created March 2, 2020 13:37 — forked from jamesrajendran/SparkStreaming
Spark Streaming - quick tips
#Analysing from Kafka topic:
#This below script can be put in a scala script name.scala and run from spark-shell
#This can be created as a scala project as well, remove the comments, use the dependencies below in build.sbt and compile
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.sql._
@idris75
idris75 / Hive performance Tuning
Created March 2, 2020 13:36 — forked from jamesrajendran/Hive performance Tuning
hive tuning hints mapjoin bucketmapjoin - partition-bucket design
1.MapJoin:
small tables can be loaded in memory and joined with bigger tables.
1. use hint /*+ MAPJOIN(table_name) */
2. 'better' option - let hive do automatically by setting these properties:
hive.auto.convert.join - true
hive.mapjoin.smalltable.filesize = <> default is 25MB
2.Partition Design
Low cardinality column -eg, regiou, year
@idris75
idris75 / kafka gist
Created March 2, 2020 13:35 — forked from jamesrajendran/kafka gist
kafka notes concepts points to remember
-------------kafka notes-----------
why?
better throughput
Replication
built-in partitioning
Fault tolerance
topics are unique!!
location of a message -> topic - partition - offset
@idris75
idris75 / Kafka performance tuning
Created March 2, 2020 13:35 — forked from jamesrajendran/Kafka performance tuning
kafka producer - consumer - broker tuning
1.Producer
1.request.required.acks=[0,1,all/-1] 0 no acknowledgement but ver fast, 1 acknowledged after leader commits, all acknowledged after replicated
2.use Async producer - use callback for the acknowledgement, using property producer.type=1
3.Batching data - send multiple messages together.
batch.num.messages
queue.buffer.max.ms
4.Compression for Large files - gzip, snappy supported
very large files can be stored in shared location and just the file path can be logged by the kafka producer.
@idris75
idris75 / LinuxCommands
Created March 2, 2020 13:35 — forked from jamesrajendran/LinuxCommands
Useful linux commands
env # to get all env variables
#Find class files in a jar
all_hdp_classes () {
find -L /usr/hdp/current -maxdepth 20 -name "*.jar" -print | while read line; do
for i in `jar tf $line | grep .class`
do
echo $line : $i
done
done
@idris75
idris75 / Spark Tuning
Created March 2, 2020 13:34 — forked from jamesrajendran/Spark Tuning
Spark performance Tuning
1.mapPartition() instead of map() - when some expensive initializations like DBconnection need to be done
2.RDD Parallelism: for No parent RDDs, example, sc.parallelize(',,,',4),Unless specified YARN will try to use as many CPU cores as available
This could be tuned using spark.default.parallelism property.
- to find default parallelism use sc.defaultParallelism
rdd.getNumPartitions()
rdd = sc.parallelize(<value>, numSlices=4)
rdd.getNumPartitions() will return 4
env # to get all env variables
*********to work as root*************
su -
**************ifconfig synonyms------------
ip address show or ip a s or ip a s eth0
************formatted file name************
cp a.txt a_$(date +%F).txt