Skip to content

Instantly share code, notes, and snippets.

@hivefans
Last active March 17, 2020 02:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hivefans/4a33e44d6532b03ccb9b6dbecec66801 to your computer and use it in GitHub Desktop.
Save hivefans/4a33e44d6532b03ccb9b6dbecec66801 to your computer and use it in GitHub Desktop.
pyspark rdd常用代码|-|{"files":{"pyspark-rdd.py":{"env":"plain"}},"tag":"bigdata"}
>>> from pyspark import SparkContext
>>> sc = SparkContext(master = 'local[2]')
Configurations
>>> from pyspark import SparkConf, SparkContext
>>> val conf = (SparkConf()
.setMaster("local[2]")
.setAppName("Edureka CheatSheet")
.set("spark.executor.memory", "1g"))
>>> val sc = SparkContext(conf = conf)
# SparkContext Version
>>> sc.version
# Python Version
>>> sc.pythonVer
# Application Name
>>> sc.appName
# Application ID
>>> sc.applicationId
# Master URL
>>> sc.master
# Installed Spark Path
>>> str(sc.sparkHome)
# Retreive Spark User Currently using SparkContext
>>> str(sc.sparkUser())
# Get default level of Parallelism
>>> sc.defaultParallelism
# Get minimum number of Partitions
>>> sc.defaultMinPartitions
# Using Parallelized Collections
>>> rdd = sc.parallelize([('Jim',24),('Hope', 25),('Sue', 26)])
>>> rdd = sc.parallelize([('a',9),('b',7),('c',10)])
>>> num_rdd = sc.parallelize(range(1,5000))
# From other RDDs
>>> new_rdd = rdd.groupByKey()
>>> new_rdd = rdd.map(lambda x: (x,1))
# From a text File
>>> tfile_rdd = sc.textFile("/path/of_file/*.txt")
# Reading directory of Text Files
>>> tfile_rdd = sc.wholeTextFiles("/path/of_directory/")
# Maximum Value of RDD elements
>>> rdd.max()
# Minimum Value of RDD elements
>>> rdd.min()
# Mean value of RDD elements
>>> rdd.mean()
# Standard Deviation of RDD elements
>>> rdd.stdev()
# Get the Summary Statistics
Count, Mean, Stdev, Max & Min
>>> rdd.stats()
# Number of Partitions
>>> rdd.getNumPartitions()
# map
Return a new RDD by applying a function to each element of this RDD
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.map(lambda x: (x, 1))
[('a', 1), ('b', 1), ('c', 1)]
# flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
>>> rdd = sc.parallelize([2, 3, 4])
>>> rdd.flatMap(lambda x: range(1, x))
[1, 1, 1, 2, 2, 3]
# mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
# filter
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
# distinct
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
# reduce
Reduces the elements of this RDD using the specified commutative
and associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10
# count
Return the number of elements in this RDD.
>>> sc.parallelize([2, 3, 4]).count()
3
# first
Return the first element in this RDD.
>>> sc.parallelize([2, 3, 4]).first()
2
# take
Take the first "n" num elements of the RDD.
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]
# countByValue
Return the count of each unique value in this RDD as a
dictionary of (value, count) pairs.
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]
# sortBy
Sorts this RDD by the given keyfunc
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# sortByKey
Sorts this RDD, which is assumed to consist of (key, value) pairs.
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# groupBy
Return an RDD of grouped items.
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
# groupByKey
Group the values for each key in the RDD into a single sequence.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])
# fold
Aggregate the elements of each partition, and then the results for
all the partitions, using a given associative function and a neutral "zero value."
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
# _add_
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
# subtract
Return each value in self that is not contained in other.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
# unioin
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
# intersection
Return the intersection of this RDD and another one
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]
# cartesian
Return the Cartesian product of this RDD and another one.
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
# saveAsTextFile
Save this RDD as a text file, using string representations of elements.
>>> rdd.saveAsTextFile("rdd.txt")
# saveAsHadoopFile
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
>>> rdd.saveAsHadoopFile("hdfs://namenodehost/parent_folder/child_folder",'org.apache.hadoop.mapred.TextOutputFormat')
# saveAsPickleFile
Save this RDD as a SequenceFile of serialized objects
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment