Skip to content

Instantly share code, notes, and snippets.

@taozhuo
Last active December 3, 2021 17:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taozhuo/d9d5d58b5f82529130c195b21f9450b7 to your computer and use it in GitHub Desktop.
Save taozhuo/d9d5d58b5f82529130c195b21f9450b7 to your computer and use it in GitHub Desktop.
multiple output columns in pyspark udf #pyspark
from pyspark.sql import Row
import pyspark.sql.functions as F
def append_payer_spend(context_ts, collected_col):
if len(collected_col) == 1:
if collected_col[0] == Row(None,None):
return Row('is_payer', 'spend')(0.0, 0.0)
collected_col = sorted(collected_col, key=lambda x: x.txTimestamp, reverse=False)
is_payer = 0.0
total_spend = 0.0
for entry in collected_col:
diff = (entry.txTimestamp - context_ts).days
if diff >= 0 and diff < 7:
is_payer = 1.0
total_spend += entry.receiptUsdAmount
return Row('is_payer', 'spend')(is_payer, total_spend)
# struct to store multiple values
schema_added = StructType([
StructField("is_payer", FloatType(), False),
StructField("spend", FloatType(), False)])
append_payer_spend_udf = F.udf(append_payer_spend, schema_added)
new_df = df_likely_payer.withColumn("output", \
append_payer_spend_udf(df_likely_payer['ts'], df_likely_payer['collected_col']))\
.select(*(df_likely_payer.columns), 'output.*').drop('collected_col')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment