Skip to content

Instantly share code, notes, and snippets.

@VedAustin
Last active March 24, 2018 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VedAustin/5a93453793a10650943f83fad48b78ce to your computer and use it in GitHub Desktop.
Save VedAustin/5a93453793a10650943f83fad48b78ce to your computer and use it in GitHub Desktop.
How to join disparate data sources and map the customer journey through various touch points
from pyspark.sql import functions as F
from pyspark.sql import Window
# Read data
user_guid_email = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-email.json")
user_guid_paid_search = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-paid-search.json")
user_guid_social = spark.read.json("/mnt/public-blobs/attribution-modelling/data2/id-maps/id-map-social.json")
guid_event_email = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-email")
guid_event_paid_search = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-paid-search")
guid_event_social = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/events-social")
purchases_data = spark.read.parquet("/mnt/public-blobs/attribution-modelling/data2/purchases")
# Create a new column `touchpoint` (for concatination and identification purposes)
purchases_data = purchases_data.withColumn("touchpoint",F.lit("purchase"))
email_event = user_guid_email.join(guid_event_email,["uuid"],"right")\
.withColumn("touchpoint",F.lit("marketing_email")).drop("uuid")
paid_search_event = user_guid_paid_search.join(guid_event_paid_search,["uuid"],"right")\
.withColumn("touchpoint",F.lit("paid_search")).drop("uuid")
social_event = user_guid_social.join(guid_event_social,["uuid"],"right")\
.withColumn("touchpoint",F.lit("social_media")).drop("uuid")
temp_event = email_event.union(paid_search_event).union(social_event)
temp_event = temp_event.withColumn("purchaseAmount",F.lit(0))
# for concatination
temp_purchases = purchases_data.withColumnRenamed("purchaseDatetime","eventDateTime")
temp_purchases = temp_purchases.select("user","eventDateTime","touchpoint","purchaseAmount")
# Now concatenate them
event_data = temp_purchases.union(temp_event)
# Process to identify journey between each purchase for every user
event_data = event_data.withColumn("purchased",F.when(F.col("touchpoint")=='purchase',1).otherwise(0))
win = Window.partitionBy("user").orderBy("eventDatetime").rangeBetween(Window.unboundedPreceding,0)
event_data_journey_ = event_data\
.withColumn("cumsum",F.sum(F.col("purchased")).over(win))\
.withColumn("cumsum",F.when(F.col("purchased") == 1, F.col("cumsum") - 1).otherwise(F.col("cumsum")))
@F.udf(ArrayType(StringType()))
def remove_purchases(arr):
return [elem for elem in arr if elem != 'purchase']
event_data_journey = event_data_journey_\
.groupBy(["user","cumsum"])\
.agg(F.collect_list(F.col("touchpoint")).alias("journey"),
F.max(F.col("purchaseAmount")).alias("purchaseAmount_"))\
.withColumn("journey_",F.when(F.array_contains(F.col("journey"),"purchase"),
remove_purchases(F.col("journey"))).otherwise(F.col("journey")))\
.drop("cumsum","journey")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment