Last active
October 3, 2024 15:58
-
-
Save krisjan-oldekamp/5961dc1ea876700e9e64a5ca54b6e3d0 to your computer and use it in GitHub Desktop.
Get actionable insights on your channel performance by building custom attribution models using Google Analytics 4 data in BigQuery. Full article on stacktonic.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################### | |
# Author Krisjan Oldekamp / Stacktonic.com | |
# Email krisjan@stacktonic.com | |
# Article https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python | |
#################################################### | |
#pip install marketing_attribution_models | |
#pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]' | |
#pip install pyarrow -> newest version! | |
import pandas as pd | |
import numpy as np | |
import pyarrow | |
import pyarrow.parquet as pq | |
from google.cloud import bigquery | |
from google.oauth2 import service_account | |
from marketing_attribution_models import MAM | |
#################################################### | |
# Settings | |
#################################################### | |
GBQ_PROJECT = "<your-project>" # Google Cloud Project-ID | |
GBQ_KEYPATH = "./<your-keyfile>.json" # Path to Service Account JSON keyfile | |
GBQ_SQL_PATHS = "./<your-sql-file>.sql" # Path to SQL file -> querying the conversions paths (see tutorial) | |
GBQ_TABLE_CHANNELS = "<your-project>.<your-dataset>.conversions_attr_paths" # Table name for storing attributed conversions on journey level | |
GBQ_TABLE_CHANNELS_GROUPED = "<your-project>.<your-dataset>.conversions_attr_channels" # Table name for storing conversions grouped on channel and date level | |
#################################################### | |
# Write dataframe to BigQuery | |
def write_to_bigquery(df, table): | |
writer = pyarrow.BufferOutputStream() | |
pyarrow.parquet.write_table( | |
pyarrow.Table.from_pandas(df), | |
writer, | |
use_compliant_nested_type=True | |
) | |
reader = pyarrow.BufferReader(writer.getvalue()) | |
parquet_options = bigquery.format_options.ParquetOptions() | |
parquet_options.enable_list_inference = True | |
job_config = bigquery.LoadJobConfig() | |
job_config.source_format = bigquery.SourceFormat.PARQUET | |
job_config.write_disposition = "WRITE_TRUNCATE" | |
job_config.parquet_options = parquet_options | |
job = client.load_table_from_file( | |
reader, table, job_config=job_config | |
) | |
# Group journeys to conversion date and channel | |
def group_by_channel(df, conversion_date, path, conversion_value, models): | |
channels_grouped = None | |
for model in models: | |
dates = df.apply(lambda row: [row[conversion_date] for credit in row[model]], axis=1) | |
#channels = df[path].apply(lambda x: x.split(" > ")) | |
channels = df[path] | |
channels_conversions = df[model] | |
channels_values = df.apply(lambda row: [credit * row[conversion_value] for credit in row[model]], axis=1) | |
dates_list = [] | |
channels_list = [] | |
conversions_list = [] | |
values_list = [] | |
dates.apply(dates_list.extend) | |
channels.apply(channels_list.extend) | |
channels_conversions.apply(conversions_list.extend) | |
channels_values.apply(values_list.extend) | |
frame = pd.DataFrame({ | |
"conversion_date": dates_list, | |
"channels": channels_list, | |
"conversions": conversions_list, | |
"conversions_value": values_list | |
}) | |
frame = frame.groupby(["conversion_date","channels"]).agg('sum') | |
frame[model] = frame.apply(lambda row: [row["conversions"], row["conversions_value"]], axis=1) | |
frame = frame.drop(columns=["conversions", "conversions_value"], axis=1) | |
if isinstance(channels_grouped, pd.DataFrame): | |
frame = frame.reset_index() | |
frame.columns = ["conversion_date","channels", model] | |
channels_grouped = pd.merge( | |
channels_grouped, frame, how="outer", on=["conversion_date","channels"] | |
).fillna(0) | |
else: | |
channels_grouped = frame.reset_index() | |
channels_grouped.columns = ["conversion_date","channels", model] | |
return channels_grouped | |
# Create BigQuey Client | |
credentials = service_account.Credentials.from_service_account_file( | |
GBQ_KEYPATH, scopes=["https://www.googleapis.com/auth/cloud-platform"], | |
) | |
client = bigquery.Client(credentials=credentials, project=credentials.project_id) | |
# Open the .sql file | |
with open(GBQ_SQL_PATHS,"r") as f: | |
sql = f.read() | |
# Query BigQuery and convert to dataframe | |
df_conversion_paths = client.query(sql).to_dataframe() | |
# (Optional) - Save dataframe to CSV and read from CSV (speeds up testing instead of fetching data from BigQuery) | |
#df_conversion_paths.to_csv("conversion_paths.csv", index=False) | |
#df_conversion_paths = pd.read_csv("conversion_paths.csv") | |
# View information | |
df_conversion_paths.info() | |
# Create and configure the attribution lib instance | |
attributions = MAM(df_conversion_paths, | |
group_channels=False, | |
channels_colname = "path_channels", | |
time_till_conv_colname = "path_timestamps", | |
group_channels_by_id_list=["journey_id"] | |
) | |
# Models and model settings + running the models | |
models = { | |
"attr_first_click": attributions.attribution_first_click(), | |
"attr_last_click": attributions.attribution_last_click(), | |
"attr_last_non_direct_click": attributions.attribution_last_click_non(but_not_this_channel='direct'), | |
"attr_position": attributions.attribution_position_based(list_positions_first_middle_last=[0.3, 0.3, 0.4]), | |
"attr_time_decay": attributions.attribution_time_decay(decay_over_time=0.6, frequency=7), | |
"attr_linear": attributions.attribution_linear(), | |
"attr_markov": attributions.attribution_markov(transition_to_same_state=False), | |
#"attr_shapley": attributions.attribution_shapley(size=4, order=True, values_col='conversions'), -> Not available on path level | |
} | |
# View results grouped by channel for complete conversion period | |
print(attributions.group_by_channels_models) | |
# Join models into one dataframe | |
models_list = [df_conversion_paths] | |
for model in models: | |
attr = pd.DataFrame(models[model][0]).iloc[:,-1:] | |
attr = attr.rename(columns={attr.columns[0]: model}) | |
models_list.append(attr) | |
df_attributions = models_list[0].join(models_list [1:]) | |
df_attributions["path_channels"] = df_attributions["path_channels"].apply(lambda x: x.split(" > ")) | |
df_attributions["path_timestamps"] = df_attributions["path_timestamps"].apply(lambda x: x.split(" > ")) | |
# View resulting dataframe | |
df_attributions.info() | |
print(df_attributions.head()) | |
# Group conversions and value for every model by date and channel | |
channels_grouped = group_by_channel( | |
df_attributions, | |
"conversion_date", # Conversion date column | |
"path_channels", # Conversion path column | |
"conversion_value", # Conversion value column | |
list(models) # Models to group | |
) | |
# View grouped results | |
print(channels_grouped.head()) | |
# Save to Google BigQuery | |
write_to_bigquery(df_attributions, GBQ_TABLE_CHANNELS) | |
write_to_bigquery(channels_grouped, GBQ_TABLE_CHANNELS_GROUPED) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Author: Krisjan Oldekamp | |
-- https://stacktonic.com/article/build-a-data-driven-attribution-model-using-google-analytics-4-big-query-and-python | |
declare conversion_period int64 default 90; -- select conversions in last x days | |
declare lookback_window int64 default 30; -- how many days to lookback from the moment the conversion occurred; | |
with | |
-- group event level google analytics 4 data to sessions (visits) | |
sessions as ( | |
select | |
user_pseudo_id as ga_client_id, | |
--user_id as custom_user_id, -- use a custom user-id instead, like a customer-id | |
concat(user_pseudo_id,'.',(select cast(value.int_value as string) from unnest(event_params) where key = 'ga_session_id')) as session_id, -- combine user_pseudo_id and session_id for a unique session-id | |
timestamp_micros(min(event_timestamp)) as session_start, | |
array_agg( | |
if(event_name in('page_view','user_engagement','scroll'), struct( | |
event_timestamp, | |
lower((select value.string_value from unnest(event_params) where key = 'source')) as source, | |
lower((select value.string_value from unnest(event_params) where key = 'medium')) as medium, | |
lower((select value.string_value from unnest(event_params) where key = 'campaign')) as campaign, | |
(select value.int_value from unnest(event_params) where key = 'entrances') as is_entrance, | |
(select value.int_value from unnest(event_params) where key = 'ignore_referrer') as ignore_referrer | |
), null) | |
ignore nulls) as channels_in_session, | |
--countif(event_name = '<name-of-some-other-conversion-event>') as conversions_in_session, | |
countif(event_name = 'purchase') as conversions, | |
sum(ecommerce.purchase_revenue) as conversion_value | |
from `<your-project>.analytics_<your-dataset>.events_*` | |
where | |
-- select conversions based on <conversion_period> + additional daterange to construct the path of a conversion (based on <lookback_window>) | |
_table_suffix between | |
format_date('%Y%m%d', date_sub(current_date(), interval (conversion_period + lookback_window) day)) | |
and format_date('%Y%m%d', date_sub(current_date(), interval 1 day)) | |
group by 1, 2 | |
), | |
-- build conversion paths for all sessions with at least 1 conversion within the last <conversion_period> days | |
sessions_converted as ( | |
select | |
s.session_start, | |
s.session_id, | |
string_agg( | |
-- select first channel / campaign within session | |
`<your-project>.<your-dataset>.channel_grouping`( | |
(select t.source from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1), | |
(select t.medium from unnest(s_lb.channels_in_session) as t where t.ignore_referrer is null order by t.event_timestamp asc limit 1), | |
null | |
), | |
' > ' | |
order by s_lb.session_start asc | |
) as path_channels, | |
string_agg(cast(timestamp_diff(timestamp(s.session_start), timestamp(s_lb.session_start), hour) as string), ' > ' order by s_lb.session_start asc) as path_timestamps, -- hours till conversion | |
string_agg(cast(s_lb.session_start as string), ' > ' order by s_lb.session_start asc) as path_timestamps_check, | |
max(s.conversions) as conversions_in_session, | |
max(s.conversion_value) as conversion_value | |
from sessions as s | |
left join | |
-- joining historical sessions to construct the conversion path (with a max path length of <lookback_window>) | |
sessions as s_lb | |
on s.ga_client_id = s_lb.ga_client_id | |
and s.session_start >= s_lb.session_start -- only join current session and sessions before current session | |
and datetime(s_lb.session_start) >= date_sub(datetime(s.session_start), interval lookback_window day) -- only join sessions not older than <lookback_window> days counted from conversion | |
where | |
s.conversions > 0 | |
and date(s.session_start) >= date_sub(current_date(), interval conversion_period day) | |
group by 1, 2 | |
order by | |
s.session_start asc | |
) | |
-- query data on user (journey) level | |
select | |
date(session_start) as conversion_date, | |
session_start as conversion_timestamp, | |
session_id as journey_id, | |
path_channels, | |
path_timestamps, | |
true as conversion, | |
conversions_in_session, | |
conversion_value as conversion_value | |
from sessions_converted |
Hi, indeed you're right. Good to know, the attribution models take into account the conversion flag (not the number of conversions), so attribution was calculated correctly, only the values were not. Thanks for checking, much appreciated! I've adjusted the code.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
For the SQL code, line 53/54. Shouldn't it be max() instead of sum()? Because if we look at what the join does without grouping: the conversions_in_session and the conversion_value are going to be duplicated by the number of sessions that happened before the conversion, or am I missing something ?
To illustrate my point with a specific ga_client_id that I know has converted twice for a total value of 349$(the input table being the result of the first CTE):
Before grouping
Once it's grouped
The values are indeed taken into account too many times. I know the tracking isn't perfect so it's not helping but I was wondering if it's the sole reason for the discrepancies.
Thanks and happy holidays!