Skip to content

Instantly share code, notes, and snippets.

@jiffyclub
Last active September 24, 2022 16:05
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save jiffyclub/905bf5e8bf17ec59ab8f to your computer and use it in GitHub Desktop.
Save jiffyclub/905bf5e8bf17ec59ab8f to your computer and use it in GitHub Desktop.
Do the same thing in Spark and Pandas
"""
Convert Pandas DFs in an HDFStore to parquet files for better compatibility
with Spark.
Run from the command line with:
spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py
"""
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons'].reset_index()
households = store['households'].reset_index()
store.close()
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.createDataFrame(households)
p_spark = sql.createDataFrame(persons)
hh_spark.write.parquet('households.parquet')
p_spark.write.parquet('persons.parquet')
import time
import pandas as pd
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons']
households = store['households']
store.close()
t1 = time.time()
persons = persons.merge(households, left_on='household_id', right_index=True)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
persons = persons.query('age >= 18 and income >= 10000')
assert len(persons) > 0, 'no people left after query'
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = persons.groupby('sex').income.mean()
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print 'total time: {}'.format(t4 - t1)
print income_by_sex
"""
Ran from the command line with:
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
"""
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.read.parquet('households.parquet')
p_spark = sql.read.parquet('persons.parquet')
t1 = time.time()
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
filtered = merged.filter('age >= 18 and income >= 10000')
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print income_by_sex.collect()
t5 = time.time()
print 'time to collect: {}'.format(t5 - t4)
print 'total time: {}'.format(t5 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment