Skip to content

Instantly share code, notes, and snippets.

View icexelloss's full-sized avatar

Li Jin icexelloss

View GitHub Profile
from pyspark.sql.functions import udf
# Use udf to define a row-at-a-time udf
@udf('double')
# Input/output are both a single double value
def plus_one(v):
return v + 1
df.withColumn('v2', plus_one(df.v))
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles
def pandas_plus_one(v):
return v + 1
df.withColumn('v2', pandas_plus_one(df.v))
import pandas as pd
from scipy import stats
@pandas_udf('double')
def cdf(v):
return pd.Series(stats.norm.cdf(v))
df.withColumn('cumulative_probability', cdf(df.v))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
return pdf.assign(v=pdf.v - pdf.v.mean())
df.groupby('id').apply(subtract_mean)
import statsmodels.api as sm
# df has four columns: id, y, x1, x2
group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
sample = df.filter(id == 1).toPandas()
# Run as a standalone function on a pandas.DataFrame and verify result
subtract_mean.func(sample)
# Now run with Spark
df.groupby('id').apply(substract_mean)
@icexelloss
icexelloss / gist:88f6c6fdaf04aac39d68d74cd0942c07
Created January 24, 2018 22:30
Groupby apply group key benchmark
from pyspark.sql.functions import pandas_udf
df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 1000).cast(IntegerType()))
df.cache()
df.count()
from pyspark.sql.functions import PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUP_MAP)
df = spark.range(0, 1000 * 1000).toDF('v')
df.cache()
df.count()
from pyspark.sql import Window
w = Window.rowsBetween(-1000, 0)
from pyspark.sql.functions import sum, mean, count
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
@icexelloss
icexelloss / flint1.py
Last active January 2, 2021 04:39
Flint1
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)
df = spark.createDataFrame(
[('2018-08-20', 1.0), ('2018-08-21', 2.0), ('2018-08-24', 3.0)],
['time', 'v']
).withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
# Convert to Flint DataFrame
flint_df = flintContext.read.dataframe(df)
@icexelloss
icexelloss / flint2.py
Created October 18, 2018 19:14
Flint2
left = ...
# time, v1
# 20180101, 100
# 20180102, 50
# 20180104, -50
# 20180105, 100
right = ...
# time, v2
# 20171231, 100.0