Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Get actionable insights on your channel performance by building custom attribution models using Google Analytics 4 data in BigQuery. Full article on stacktonic.com
###################################################
# Author Krisjan Oldekamp / Stacktonic.com
# Email krisjan@stacktonic.com
# Article https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python
####################################################
#pip install marketing_attribution_models
#pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'
#pip install pyarrow -> newest version!
import pandas as pd
import numpy as np
import pyarrow
import pyarrow.parquet as pq
from google.cloud import bigquery
from google.oauth2 import service_account
from marketing_attribution_models import MAM
####################################################
# Settings
####################################################
GBQ_PROJECT = "<your-project>" # Google Cloud Project-ID
GBQ_KEYPATH = "./<your-keyfile>.json" # Path to Service Account JSON keyfile
GBQ_SQL_PATHS = "./<your-sql-file>.sql" # Path to SQL file -> querying the conversions paths (see tutorial)
GBQ_TABLE_CHANNELS = "<your-project>.<your-dataset>.conversions_attr_paths" # Table name for storing attributed conversions on journey level
GBQ_TABLE_CHANNELS_GROUPED = "<your-project>.<your-dataset>.conversions_attr_channels" # Table name for storing conversions grouped on channel and date level
####################################################
# Write dataframe to BigQuery
def write_to_bigquery(df, table):
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(
pyarrow.Table.from_pandas(df),
writer,
use_compliant_nested_type=True
)
reader = pyarrow.BufferReader(writer.getvalue())
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.write_disposition = "WRITE_TRUNCATE"
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
reader, table, job_config=job_config
)
# Group journeys to conversion date and channel
def group_by_channel(df, conversion_date, path, conversion_value, models):
channels_grouped = None
for model in models:
dates = df.apply(lambda row: [row[conversion_date] for credit in row[model]], axis=1)
#channels = df[path].apply(lambda x: x.split(" > "))
channels = df[path]
channels_conversions = df[model]
channels_values = df.apply(lambda row: [credit * row[conversion_value] for credit in row[model]], axis=1)
dates_list = []
channels_list = []
conversions_list = []
values_list = []
dates.apply(dates_list.extend)
channels.apply(channels_list.extend)
channels_conversions.apply(conversions_list.extend)
channels_values.apply(values_list.extend)
frame = pd.DataFrame({
"conversion_date": dates_list,
"channels": channels_list,
"conversions": conversions_list,
"conversions_value": values_list
})
frame = frame.groupby(["conversion_date","channels"]).agg('sum')
frame[model] = frame.apply(lambda row: [row["conversions"], row["conversions_value"]], axis=1)
frame = frame.drop(columns=["conversions", "conversions_value"], axis=1)
if isinstance(channels_grouped, pd.DataFrame):
frame = frame.reset_index()
frame.columns = ["conversion_date","channels", model]
channels_grouped = pd.merge(
channels_grouped, frame, how="outer", on=["conversion_date","channels"]
).fillna(0)
else:
channels_grouped = frame.reset_index()
channels_grouped.columns = ["conversion_date","channels", model]
return channels_grouped
# Create BigQuey Client
credentials = service_account.Credentials.from_service_account_file(
GBQ_KEYPATH, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# Open the .sql file
with open(GBQ_SQL_PATHS,"r") as f:
sql = f.read()
# Query BigQuery and convert to dataframe
df_conversion_paths = client.query(sql).to_dataframe()
# (Optional) - Save dataframe to CSV and read from CSV (speeds up testing instead of fetching data from BigQuery)
#df_conversion_paths.to_csv("conversion_paths.csv", index=False)
#df_conversion_paths = pd.read_csv("conversion_paths.csv")
# View information
df_conversion_paths.info()
# Create and configure the attribution lib instance
attributions = MAM(df_conversion_paths,
group_channels=False,
channels_colname = "path_channels",
time_till_conv_colname = "path_timestamps",
group_channels_by_id_list=["journey_id"]
)
# Models and model settings + running the models
models = {
"attr_first_click": attributions.attribution_first_click(),
"attr_last_click": attributions.attribution_last_click(),
"attr_last_non_direct_click": attributions.attribution_last_click_non(but_not_this_channel='direct'),
"attr_position": attributions.attribution_position_based(list_positions_first_middle_last=[0.3, 0.3, 0.4]),
"attr_time_decay": attributions.attribution_time_decay(decay_over_time=0.6, frequency=7),
"attr_linear": attributions.attribution_linear(),
"attr_markov": attributions.attribution_markov(transition_to_same_state=False),
#"attr_shapley": attributions.attribution_shapley(size=4, order=True, values_col='conversions'), -> Not available on path level
}
# View results grouped by channel for complete conversion period
print(attributions.group_by_channels_models)
# Join models into one dataframe
models_list = [df_conversion_paths]
for model in models:
attr = pd.DataFrame(models[model][0]).iloc[:,-1:]
attr = attr.rename(columns={attr.columns[0]: model})
models_list.append(attr)
df_attributions = models_list[0].join(models_list [1:])
df_attributions["path_channels"] = df_attributions["path_channels"].apply(lambda x: x.split(" > "))
df_attributions["path_timestamps"] = df_attributions["path_timestamps"].apply(lambda x: x.split(" > "))
# View resulting dataframe
df_attributions.info()
print(df_attributions.head())
# Group conversions and value for every model by date and channel
channels_grouped = group_by_channel(
df_attributions,
"conversion_date", # Conversion date column
"path_channels", # Conversion path column
"conversion_value", # Conversion value column
list(models) # Models to group
)
# View grouped results
print(channels_grouped.head())
# Save to Google BigQuery
write_to_bigquery(df_attributions, GBQ_TABLE_CHANNELS)
write_to_bigquery(channels_grouped, GBQ_TABLE_CHANNELS_GROUPED)
-- Author: Krisjan Oldekamp
-- https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python
declare conversion_period int64 default 90; -- select conversions in last x days
declare lookback_window int64 default 30; -- how many days to lookback from the moment the conversion occurred;
with
-- group event level google analytics 4 data to sessions (visits)
sessions as (
select
user_pseudo_id as ga_client_id,
--user_id as custom_user_id, -- use a custom user-id instead, like a customer-id
concat(user_pseudo_id,'.',(select cast(value.int_value as string) from unnest(event_params) where key = 'ga_session_id')) as session_id, -- combine user_pseudo_id and session_id for a unique session-id
timestamp_micros(min(event_timestamp)) as session_start,
array_agg(
if(event_name in('page_view','user_engagement','scroll'), struct(
event_timestamp,
lower((select value.string_value from unnest(event_params) where key = 'source')) as source,
lower((select value.string_value from unnest(event_params) where key = 'medium')) as medium,
lower((select value.string_value from unnest(event_params) where key = 'campaign')) as campaign,
(select value.int_value from unnest(event_params) where key = 'entrances') as is_entrance,
(select value.int_value from unnest(event_params) where key = 'ignore_referrer') as ignore_referrer
), null)
ignore nulls) as channels_in_session,
--countif(event_name = '<name-of-some-other-conversion-event>') as conversions_in_session,
countif(event_name = 'purchase') as conversions,
sum(ecommerce.purchase_revenue) as conversion_value
from `<your-project>.analytics_<your-dataset>.events_*`
where
-- select conversions based on <conversion_period> + additional daterange to construct the path of a conversion (based on <lookback_window>)
_table_suffix between
format_date('%Y%m%d', date_sub(current_date(), interval (conversion_period + lookback_window) day))
and format_date('%Y%m%d', date_sub(current_date(), interval 1 day))
group by 1, 2
),
-- build conversion paths for all sessions with at least 1 conversion within the last <conversion_period> days
sessions_converted as (
select
s.session_start,
s.session_id,
string_agg(
-- select first channel / campaign within session
`<your-project>.<your-dataset>.channel_grouping`(
(select t.source from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1),
(select t.medium from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1),
null
),
' > '
order by s_lb.session_start asc
) as path_channels,
string_agg(cast(timestamp_diff(timestamp(s.session_start), timestamp(s_lb.session_start), hour) as string), ' > ' order by s_lb.session_start asc) as path_timestamps, -- hours till conversion
string_agg(cast(s_lb.session_start as string), ' > ' order by s_lb.session_start asc) as path_timestamps_check,
max(s.conversions) as conversions_in_session,
max(s.conversion_value) as conversion_value
from sessions as s
left join
-- joining historical sessions to construct the conversion path (with a max path length of <lookback_window>)
sessions as s_lb
on s.ga_client_id = s_lb.ga_client_id
and s.session_start >= s_lb.session_start -- only join current session and sessions before current session
and datetime(s_lb.session_start) >= date_sub(datetime(s.session_start), interval lookback_window day) -- only join sessions not older than <lookback_window> days counted from conversion
where
s.conversions > 0
and date(s.session_start) >= date_sub(current_date(), interval conversion_period day)
group by 1, 2
order by
s.session_start asc
)
-- query data on user (journey) level
select
date(session_start) as conversion_date,
session_start as conversion_timestamp,
session_id as journey_id,
path_channels,
path_timestamps,
true as conversion,
conversions_in_session,
conversion_value as conversion_value
from sessions_converted
@dataUnNest
Copy link

dataUnNest commented Dec 21, 2021

Hi,
For the SQL code, line 53/54. Shouldn't it be max() instead of sum()? Because if we look at what the join does without grouping: the conversions_in_session and the conversion_value are going to be duplicated by the number of sessions that happened before the conversion, or am I missing something ?

To illustrate my point with a specific ga_client_id that I know has converted twice for a total value of 349$(the input table being the result of the first CTE):

Before grouping
image

Once it's grouped
image

The values are indeed taken into account too many times. I know the tracking isn't perfect so it's not helping but I was wondering if it's the sole reason for the discrepancies.

Thanks and happy holidays!

@krisjan-oldekamp
Copy link
Author

krisjan-oldekamp commented Dec 29, 2021

Hi, indeed you're right. Good to know, the attribution models take into account the conversion flag (not the number of conversions), so attribution was calculated correctly, only the values were not. Thanks for checking, much appreciated! I've adjusted the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment