Skip to content

Instantly share code, notes, and snippets.

@patryk-oleniuk
Last active February 19, 2023 19:28
Show Gist options
  • Save patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc to your computer and use it in GitHub Desktop.
Save patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc to your computer and use it in GitHub Desktop.
koalas / pandas / pyspark profiling
import pandas as pd
import numpy as np
import random
### create_df wil create a sample pandas df with nr_rows rows
def create_df( nr_rows ):
nr_pods = int(nr_rows/4000)
nr_trips = int(nr_rows/4000)
pods = ["pod_" + str(i) for i in range(nr_pods)]
trips = ["trip_" + str(i) for i in range(nr_pods)]
df = pd.DataFrame({
"pod_id": [random.choice(pods) for _ in range(nr_rows)],
"trip_id": [random.choice(trips) for _ in range(nr_rows)],
"timestamp":np.random.rand(nr_rows)*35*60,
"speed_mph": np.random.rand(nr_rows)*670.0
})
return df
import databricks.koalas as ks
from pyspark.sql.functions import desc
from pyspark.sql import functions as F
import gc
def do_pandas(df):
grouped = df.groupby(['pod_id','trip_id'])
gdf = df.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
x = gdf.describe()
return gdf, x
def do_koalas(df):
gdf, x = do_pandas(df)
return gdf.to_pandas(), x.to_pandas()
def do_spark(sdf):
sdf2 = sdf.groupBy("pod_id", "trip_id").agg(F.max('timestamp').alias('timestamp_last'), F.min('timestamp').alias('timestamp_first'))
sdf3 = sdf2.withColumn('trip_time_sec', sdf2['timestamp_last'] - sdf2['timestamp_first'])
sdf4 = sdf3.withColumn('trip_time_hours', sdf3['trip_time_sec']/3600.0)
return sdf4.toPandas(), sdf4.select(F.col('timestamp_last'),F.col('timestamp_first'),F.col('trip_time_sec'),F.col('trip_time_hours')).summary().toPandas()
nr_retries = 10
timings = pd.DataFrame(columns = ['nr_data','pandas','pyspark','koalas'])
for size in [1e8, 3.2e7, 1e7, 3.2e6]:
df = create_df(int(size))
time1 = []
for i in range(nr_retries):
print(size, 'pandas',i)
t = time.time()
do_pandas(df)
time1.append( (time.time() - t))
gc.collect()
sdf = spark.createDataFrame(df)
time2 = []
for i in range(nr_retries):
print(size, 'pyspark',i)
t = time.time()
do_spark(sdf)
time2.append((time.time() - t))
gc.collect()
kdf = ks.from_pandas(df)
time3 = []
for i in range(nr_retries):
print(size, 'koalas',i)
t = time.time()
do_koalas(kdf)
time3.append( (time.time() - t))
gc.collect()
print('\nsize :', size, 'time_pandas', time1, 'time_pyspark', time2, 'time_koalas', time3)
timings = timings.append(pd.Series({'nr_data':size,'pandas':time1,'pyspark':time2,'koalas':time3}), ignore_index = True)
from pyspark.sql.functions import pandas_udf, PandasUDFType, desc
import pyspark.sql.functions as F
from pyspark.sql.types import *
import gc
def do_pandas(df):
def calc_distance_from_speed( gdf ):
gdf = gdf.sort_values('timestamp')
gdf['time_diff'] = gdf['timestamp'].diff().fillna(0.0)
return pd.DataFrame({
'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
'travel_time_sec': [ gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0] ]
})
results = df.groupby(['pod_id','trip_id']).apply( calc_distance_from_speed)
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
return results.describe()
def do_koalas(df):
def calc_distance_from_speed_ks( gdf ) -> ks.DataFrame[ str, str, float , float]:
gdf = gdf.sort_values('timestamp')
gdf['meanspeed'] = (gdf['timestamp'].diff().fillna(0.0)*gdf['speed_mph']).sum()
gdf['triptime'] = (gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0])
return gdf[['pod_id','trip_id','meanspeed','triptime']].iloc[0:1]
results = kdf.groupby(['pod_id','trip_id']).apply( calc_distance_from_speed_ks)
# due to current limitations of the package, groupby.apply() returns c0 .. c3 column names
results.columns = ['pod_id', 'trip_id', 'distance_miles', 'travel_time_sec']
# spark groupby does not set the groupby cols as index and does not sort them
results = results.set_index(['pod_id','trip_id']).sort_index()
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe().to_pandas()
def do_spark(sdf):
schema = StructType([
StructField("pod_id", StringType()),
StructField("trip_id", StringType()),
StructField("distance_miles", DoubleType()),
StructField("travel_time_sec", DoubleType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calc_distance_from_speed( gdf ):
gdf = gdf.sort_values('timestamp')
print(gdf)
gdf['time_diff'] = gdf['timestamp'].diff().fillna(0.0)
return pd.DataFrame({
'pod_id':[gdf['pod_id'].iloc[0]],
'trip_id':[gdf['trip_id'].iloc[0]],
'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
'travel_time_sec': [ gdf['timestamp'].iloc[-1]-gdf['timestamp'].iloc[0] ]
})
sdf = sdf.groupby("pod_id","trip_id").apply(calc_distance_from_speed)
sdf = sdf.withColumn('distance_km',F.col('distance_miles') * 1.609)
sdf = sdf.withColumn('avg_speed_mph',F.col('distance_miles')/ F.col('travel_time_sec') / 60.0)
sdf = sdf.withColumn('avg_speed_kph',F.col('avg_speed_mph') * 1.609)
return sdf.summary().toPandas()
nr_retries = 10
timings_udf = pd.DataFrame(columns = ['nr_data','pandas','pyspark','koalas'])
for size in [ 1e6, 3.2e5, 1e5, 3.2e4]:
df = create_df(int(size))
time1 = []
for i in range(nr_retries):
print(size, 'pandas',i)
t = time.time()
do_pandas(df)
time1.append( (time.time() - t))
gc.collect()
sdf = spark.createDataFrame(df)
time2 = []
for i in range(nr_retries):
print(size, 'pyspark',i)
t = time.time()
do_spark(sdf)
time2.append((time.time() - t))
gc.collect()
kdf = ks.from_pandas(df)
time3 = []
for i in range(nr_retries):
print(size, 'koalas',i)
t = time.time()
do_koalas(kdf)
time3.append( (time.time() - t))
gc.collect()
print('size :', size, 'time_pandas', time1, 'time_pyspark', time2, 'time_koalas', time3)
timings_udf = timings_udf.append(pd.Series({'nr_data':size,'pandas':time1,'pyspark':time2,'koalas':time3}), ignore_index = True)
import plotly.graph_objects as go
import plotly.offline as py
import numpy as np
fig = go.Figure(data=[
go.Bar(name='pandas', x=timings_udf['nr_data'].apply(lambda x: " 1e" + str(np.log10(x))[:3]), y=timings_udf['pandas_mean'],
error_y=dict(
type='data',
symmetric=False,
array=timings_udf['pandas_max'],
arrayminus=timings_udf['pandas_min'],
)),
go.Bar(name='koalas', x=timings_udf['nr_data'].apply(lambda x: " 1e" + str(np.log10(x))[:3]), y=timings_udf['koalas_mean'],
error_y=dict(
type='data',
symmetric=False,
array=timings_udf['koalas_max'],
arrayminus=timings_udf['koalas_min'],
)),
go.Bar(name='pyspark', x=timings_udf['nr_data'].apply(lambda x: " 1e" + str(np.log10(x))[:3]), y=timings_udf['pyspark_mean'],
error_y=dict(
type='data',
symmetric=False,
array=timings_udf['pyspark_max'],
arrayminus=timings_udf['pyspark_min'],
))
])
# Change the bar mode
fig.update_layout(barmode='group')
fig.update_layout(
title=go.layout.Title(
text="pandas / pyspark / koalas profiling - UDF & others <br>(the lower the better)",
xref="container",
x=0.5
),
xaxis=go.layout.XAxis(
title=go.layout.xaxis.Title(
text="rows [#]"
)
),
yaxis=go.layout.YAxis(
title=go.layout.yaxis.Title(
text="time [s]"
)
)
)
displayHTML(py.plot( fig, output_type='div') )
@0587816560
Copy link

คืออะไร

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment