Skip to content

Instantly share code, notes, and snippets.

@preetham-salehundam
Created May 9, 2021 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save preetham-salehundam/012f5e0bcc1ebbdbd4244e81ecdce7a4 to your computer and use it in GitHub Desktop.
Save preetham-salehundam/012f5e0bcc1ebbdbd4244e81ecdce7a4 to your computer and use it in GitHub Desktop.
pyspark actions
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = spark.sparkContext.parallelize(data)
listRdd = spark.sparkContext.parallelize([1,2,3,4,5,3,2])
#aggregate
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=listRdd.aggregate(0, seqOp, combOp)
print(agg) # output 20
#aggregate 2
seqOp2 = (lambda x, y: (x[0] + y, x[1] + 1))
combOp2 = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
agg2=listRdd.aggregate((0, 0), seqOp2, combOp2)
print(agg2) # output (20,7)
agg2=listRdd.treeAggregate(0,seqOp, combOp)
print(agg2) # output 20
#fold
from operator import add
foldRes=listRdd.fold(0, add)
print(foldRes) # output 20
#reduce
redRes=listRdd.reduce(add)
print(redRes) # output 20
#treeReduce. This is similar to reduce
add = lambda x, y: x + y
redRes=listRdd.treeReduce(add)
print(redRes) # output 20
#Collect
data = listRdd.collect()
print(data)
#count, countApprox, countApproxDistinct
print("Count : "+str(listRdd.count()))
#Output: Count : 20
print("countApprox : "+str(listRdd.countApprox(1200)))
#Output: countApprox : (final: [7.000, 7.000])
print("countApproxDistinct : "+str(listRdd.countApproxDistinct()))
#Output: countApproxDistinct : 5
print("countApproxDistinct : "+str(inputRDD.countApproxDistinct()))
#Output: countApproxDistinct : 5
#countByValue, countByValueApprox
print("countByValue : "+str(listRdd.countByValue()))
#first
print("first : "+str(listRdd.first()))
#Output: first : 1
print("first : "+str(inputRDD.first()))
#Output: first : (Z,1)
#top
print("top : "+str(listRdd.top(2)))
#Output: take : 5,4
print("top : "+str(inputRDD.top(2)))
#Output: take : (Z,1),(C,40)
#min
print("min : "+str(listRdd.min()))
#Output: min : 1
print("min : "+str(inputRDD.min()))
#Output: min : (A,20)
#max
print("max : "+str(listRdd.max()))
#Output: max : 5
print("max : "+str(inputRDD.max()))
#Output: max : (Z,1)
#take, takeOrdered, takeSample
print("take : "+str(listRdd.take(2)))
#Output: take : 1,2
print("takeOrdered : "+ str(listRdd.takeOrdered(2)))
#Output: takeOrdered : 1,2
print("take : "+str(listRdd.takeSample()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment