This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import udf | |
# Use udf to define a row-at-a-time udf | |
@udf('double') | |
# Input/output are both a single double value | |
def plus_one(v): | |
return v + 1 | |
df.withColumn('v2', plus_one(df.v)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import pandas_udf, PandasUDFType | |
# Use pandas_udf to define a Pandas UDF | |
@pandas_udf('double', PandasUDFType.SCALAR) | |
# Input/output are both a pandas.Series of doubles | |
def pandas_plus_one(v): | |
return v + 1 | |
df.withColumn('v2', pandas_plus_one(df.v)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from scipy import stats | |
@pandas_udf('double') | |
def cdf(v): | |
return pd.Series(stats.norm.cdf(v)) | |
df.withColumn('cumulative_probability', cdf(df.v)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) | |
# Input/output are both a pandas.DataFrame | |
def subtract_mean(pdf): | |
return pdf.assign(v=pdf.v - pdf.v.mean()) | |
df.groupby('id').apply(subtract_mean) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import statsmodels.api as sm | |
# df has four columns: id, y, x1, x2 | |
group_column = 'id' | |
y_column = 'y' | |
x_columns = ['x1', 'x2'] | |
schema = df.select(group_column, *x_columns).schema | |
@pandas_udf(schema, PandasUDFType.GROUPED_MAP) | |
# Input/output are both a pandas.DataFrame |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sample = df.filter(id == 1).toPandas() | |
# Run as a standalone function on a pandas.DataFrame and verify result | |
subtract_mean.func(sample) | |
# Now run with Spark | |
df.groupby('id').apply(substract_mean) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import pandas_udf | |
df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 1000).cast(IntegerType())) | |
df.cache() | |
df.count() | |
from pyspark.sql.functions import PandasUDFType | |
@pandas_udf(df.schema, PandasUDFType.GROUP_MAP) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df = spark.range(0, 1000 * 1000).toDF('v') | |
df.cache() | |
df.count() | |
from pyspark.sql import Window | |
w = Window.rowsBetween(-1000, 0) | |
from pyspark.sql.functions import sum, mean, count | |
from pyspark.sql.functions import pandas_udf, PandasUDFType | |
@pandas_udf('double', PandasUDFType.GROUPED_AGG) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ts.flint import FlintContext | |
flintContext = FlintContext(sqlContext) | |
df = spark.createDataFrame( | |
[('2018-08-20', 1.0), ('2018-08-21', 2.0), ('2018-08-24', 3.0)], | |
['time', 'v'] | |
).withColumn('time', from_utc_timestamp(col('time'), 'UTC')) | |
# Convert to Flint DataFrame | |
flint_df = flintContext.read.dataframe(df) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
left = ... | |
# time, v1 | |
# 20180101, 100 | |
# 20180102, 50 | |
# 20180104, -50 | |
# 20180105, 100 | |
right = ... | |
# time, v2 | |
# 20171231, 100.0 |
OlderNewer