Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Do the same thing in Spark and Pandas
"""
Convert Pandas DFs in an HDFStore to parquet files for better compatibility
with Spark.
Run from the command line with:
spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py
"""
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons'].reset_index()
households = store['households'].reset_index()
store.close()
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.createDataFrame(households)
p_spark = sql.createDataFrame(persons)
hh_spark.write.parquet('households.parquet')
p_spark.write.parquet('persons.parquet')
import time
import pandas as pd
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons']
households = store['households']
store.close()
t1 = time.time()
persons = persons.merge(households, left_on='household_id', right_index=True)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
persons = persons.query('age >= 18 and income >= 10000')
assert len(persons) > 0, 'no people left after query'
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = persons.groupby('sex').income.mean()
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print 'total time: {}'.format(t4 - t1)
print income_by_sex
"""
Ran from the command line with:
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
"""
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.read.parquet('households.parquet')
p_spark = sql.read.parquet('persons.parquet')
t1 = time.time()
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
filtered = merged.filter('age >= 18 and income >= 10000')
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print income_by_sex.collect()
t5 = time.time()
print 'time to collect: {}'.format(t5 - t4)
print 'total time: {}'.format(t5 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.