Skip to content

Instantly share code, notes, and snippets.

@zafercavdar
Last active June 12, 2018 22:32
Show Gist options
  • Save zafercavdar/4a234983d7d846d677fa4b6616f6eb9a to your computer and use it in GitHub Desktop.
Save zafercavdar/4a234983d7d846d677fa4b6616f6eb9a to your computer and use it in GitHub Desktop.
from pyspark import SparkContext, StorageLevel as SL
from timeit import repeat
from time import sleep as delay
data = [x for x in range(1000000)]
sc = SparkContext('local[*]')
rdd = sc.parallelize(data) \
.map(lambda x: 2*x - 1) \
.filter(lambda x: x > 100000) \
.map(lambda x: (x/10000, x)) \
.groupByKey()
def f(x):
return {
'id': x[0],
'value': x[1]
}
print("Without persistance")
print(repeat('rdd.map(lambda x: f(x)).collect()', 'from __main__ import rdd, f', repeat=5, number=1))
rdd.unpersist()
delay(2)
rdd.persist(SL.DISK_ONLY)
print("Persisting in disk")
print(repeat('rdd.map(lambda x: f(x)).collect()', 'from __main__ import rdd, f', repeat=5, number=1))
rdd.unpersist()
delay(2)
rdd.persist(SL.MEMORY_ONLY)
print("Persisting in main memory")
print(repeat('rdd.map(lambda x: f(x)).collect()', 'from __main__ import rdd, f', repeat=5, number=1))
rdd.unpersist()
delay(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment