Skip to content

Instantly share code, notes, and snippets.

@vmuriart
Forked from jiffyclub/hdf_to_parquet.py
Created May 18, 2017 00:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.
Save vmuriart/43f3754cae58c014300bc68b11d09409 to your computer and use it in GitHub Desktop.
Do the same thing in Spark and Pandas
"""
Convert Pandas DFs in an HDFStore to parquet files for better compatibility
with Spark.
Run from the command line with:
spark-submit --driver-memory 4g --master 'local[*]' hdf5_to_parquet.py
"""
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons'].reset_index()
households = store['households'].reset_index()
store.close()
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.createDataFrame(households)
p_spark = sql.createDataFrame(persons)
hh_spark.write.parquet('households.parquet')
p_spark.write.parquet('persons.parquet')
import time
import pandas as pd
store = pd.HDFStore('/Users/jiffyclub/synth/spark-demo/mtc_asim.h5')
persons = store['persons']
households = store['households']
store.close()
t1 = time.time()
persons = persons.merge(households, left_on='household_id', right_index=True)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
persons = persons.query('age >= 18 and income >= 10000')
assert len(persons) > 0, 'no people left after query'
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = persons.groupby('sex').income.mean()
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print 'total time: {}'.format(t4 - t1)
print income_by_sex
"""
Ran from the command line with:
time spark-submit --driver-memory 4g --master 'local[*]' spark_run.py 2> spark.log
"""
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
spark_conf = (
SparkConf()
.setAppName('SparkRunDemo')
# .setMaster('local[*]')
# .set('spark.driver.memory', '8g')
.set('spark.executor.memory', '8g')
.set('spark.python.worker.memory', '8g')
.set('spark.storage.memoryFraction', 0.2)
.set('spark.logConf', True))
print spark_conf.toDebugString()
sc = SparkContext(conf=spark_conf)
sql = SQLContext(sc)
hh_spark = sql.read.parquet('households.parquet')
p_spark = sql.read.parquet('persons.parquet')
t1 = time.time()
merged = hh_spark.join(p_spark, hh_spark.HHID == p_spark.household_id)
t2 = time.time()
print 'time to merge: {}'.format(t2 - t1)
# filtered = merged.filter((merged.age <= 18) & (merged.income >= 100000))
filtered = merged.filter('age >= 18 and income >= 10000')
t3 = time.time()
print 'time to filter: {}'.format(t3 - t2)
income_by_sex = filtered.groupby('sex').agg({'income': 'mean'})
t4 = time.time()
print 'time to groupby agg: {}'.format(t4 - t3)
print income_by_sex.collect()
t5 = time.time()
print 'time to collect: {}'.format(t5 - t4)
print 'total time: {}'.format(t5 - t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment