Created
May 7, 2024 23:01
-
-
Save kenteross/0d49b07709282897bac267c6dff0b92b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import arviz as az | |
import matplotlib.pyplot as plt | |
from datetime import datetime, timedelta | |
import pymc as pm | |
import seaborn as sns | |
import xarray as xr | |
n_rows = 500 | |
# Dates | |
start_date = pd.to_datetime('2023-01-01') | |
dates = [start_date + timedelta(days=i) for i in range(n_rows)] | |
# Revenue and spend data | |
np.random.seed(42) # Set seed for reproducibility | |
revenue = np.random.normal(loc=1000, scale=300, size=n_rows) | |
spend_1 = np.random.normal(loc=200, scale=50, size=n_rows) | |
spend_2 = np.random.normal(loc=150, scale=40, size=n_rows) | |
df_burn_in = pd.DataFrame({'date': dates, 'revenue': revenue, 'spend_1': spend_1, 'spend_2': spend_2}) | |
burn_in = 30 | |
df = df_burn_in[burn_in:] | |
# From the PYMC MMM repository | |
import numpy as np | |
from typing import Union | |
import numpy.typing as npt | |
import pytensor.tensor as pt | |
from pytensor.tensor.random.utils import params_broadcast_shapes | |
def batched_convolution(x, w, axis: int = 0): | |
import pytensor.tensor as pt | |
from pytensor.tensor.random.utils import params_broadcast_shapes | |
"""Apply a 1D convolution in a vectorized way across multiple batch dimensions. | |
Parameters | |
---------- | |
x : | |
The array to convolve. | |
w : | |
The weight of the convolution. The last axis of ``w`` determines the number of steps | |
to use in the convolution. | |
axis : int | |
The axis of ``x`` along witch to apply the convolution | |
Returns | |
------- | |
y : | |
The result of convolving ``x`` with ``w`` along the desired axis. The shape of the | |
result will match the shape of ``x`` up to broadcasting with ``w``. The convolved | |
axis will show the results of left padding zeros to ``x`` while applying the | |
convolutions. | |
""" | |
# We move the axis to the last dimension of the array so that it's easier to | |
# reason about parameter broadcasting. We will move the axis back at the end | |
orig_ndim = x.ndim | |
axis = axis if axis >= 0 else orig_ndim + axis | |
w = pt.as_tensor(w) | |
x = pt.moveaxis(x, axis, -1) | |
l_max = w.type.shape[-1] | |
if l_max is None: | |
try: | |
l_max = w.shape[-1].eval() | |
except Exception: | |
pass | |
# Get the broadcast shapes of x and w but ignoring their last dimension. | |
# The last dimension of x is the "time" axis, which doesn't get broadcast | |
# The last dimension of w is the number of time steps that go into the convolution | |
x_shape, w_shape = params_broadcast_shapes([x.shape, w.shape], [1, 1]) | |
x = pt.broadcast_to(x, x_shape) | |
w = pt.broadcast_to(w, w_shape) | |
x_time = x.shape[-1] | |
shape = (*x.shape, w.shape[-1]) | |
# Make a tensor with x at the different time lags needed for the convolution | |
padded_x = pt.zeros(shape, dtype=x.dtype) | |
if l_max is not None: | |
for i in range(l_max): | |
padded_x = pt.set_subtensor( | |
padded_x[..., i:x_time, i], x[..., : x_time - i] | |
) | |
else: # pragma: no cover | |
raise NotImplementedError( | |
"At the moment, convolving with weight arrays that don't have a concrete shape " | |
"at compile time is not supported." | |
) | |
# The convolution is treated as an element-wise product, that then gets reduced | |
# along the dimension that represents the convolution time lags | |
conv = pt.sum(padded_x * w[..., None, :], axis=-1) | |
# Move the "time" axis back to where it was in the original x array | |
return pt.moveaxis(conv, -1, axis + conv.ndim - orig_ndim) | |
def delayed_adstock( | |
x, | |
alpha: float = 0.0, | |
theta: int = 0, | |
l_max: int = 12, | |
normalize: bool = False, | |
axis: int = 0, | |
): | |
import pytensor.tensor as pt | |
from pytensor.tensor.random.utils import params_broadcast_shapes | |
"""Delayed adstock transformation. | |
This transformation is similar to geometric adstock transformation, but it | |
allows for a delayed peak of the effect. The peak is assumed to occur at `theta`. | |
Parameters | |
---------- | |
x : tensor | |
Input tensor. | |
alpha : float, by default 0.0 | |
Retention rate of ad effect. Must be between 0 and 1. | |
theta : float, by default 0 | |
Delay of the peak effect. Must be between 0 and `l_max` - 1. | |
l_max : int, by default 12 | |
Maximum duration of carryover effect. | |
normalize : bool, by default False | |
Whether to normalize the weights. | |
Returns | |
------- | |
tensor | |
Transformed tensor. | |
References | |
---------- | |
.. [1] Jin, Yuxue, et al. "Bayesian methods for media mix modeling | |
with carryover and shape effects." (2017). | |
""" | |
w = pt.power( | |
pt.as_tensor(alpha)[..., None], | |
(pt.arange(l_max, dtype=x.dtype) - pt.as_tensor(theta)[..., None]) ** 2, | |
) | |
w = w / pt.sum(w, axis=-1, keepdims=True) if normalize else w | |
return batched_convolution(x, w, axis=axis) | |
def geometric_adstock(x, alpha: float = 0.0, l_max: int = 12, normalize: bool = False, axis: int = 0): | |
"""Geometric adstock transformation. | |
Adstock with geometric decay assumes advertising effect peaks at the same | |
time period as ad exposure. The cumulative media effect is a weighted average | |
of media spend in the current time-period (e.g. week) and previous `l_max` - 1 | |
periods (e.g. weeks). `l_max` is the maximum duration of carryover effect. | |
Parameters | |
---------- | |
x : tensor | |
Input tensor. | |
alpha : float, by default 0.0 | |
Retention rate of ad effect. Must be between 0 and 1. | |
l_max : int, by default 12 | |
Maximum duration of carryover effect. | |
normalize : bool, by default False | |
Whether to normalize the weights. | |
Returns | |
------- | |
tensor | |
Transformed tensor. | |
References | |
---------- | |
.. [1] Jin, Yuxue, et al. "Bayesian methods for media mix modeling | |
with carryover and shape effects." (2017). | |
""" | |
w = pt.power(pt.as_tensor(alpha)[..., None], pt.arange(l_max, dtype=x.dtype)) | |
w = w / pt.sum(w, axis=-1, keepdims=True) if normalize else w | |
return batched_convolution(x, w, axis=axis) | |
def logistic_saturation(x, lam: Union[npt.NDArray[np.float_], float] = 0.5): | |
from typing import Union | |
import numpy.typing as npt | |
"""Logistic saturation transformation. | |
Parameters | |
---------- | |
x : tensor | |
Input tensor. | |
lam : float or array-like, optional, by default 0.5 | |
Saturation parameter. | |
Returns | |
------- | |
tensor | |
Transformed tensor. | |
""" | |
return (1 - pt.exp(-lam * x)) / (1 + pt.exp(-lam * x)) | |
model = None | |
testing = True | |
l_max = 50 | |
with pm.Model() as model: | |
# Muteable Coords | |
model.add_coord( | |
name='dates_burn_in', | |
values=df_burn_in['date'], | |
mutable=True | |
) | |
model.add_coord( | |
name='dates', | |
values=df['date'], | |
mutable=True | |
) | |
model.add_coord( | |
name='x_columns', | |
values=spend_cols, | |
mutable=True | |
) | |
# Data | |
X = pm.MutableData( | |
name='X', | |
value=df_burn_in[spend_cols].values, | |
dims=('dates_burn_in', 'x_columns') | |
) | |
y = pm.MutableData( | |
name='observed', | |
value=df['revenue'], | |
dims='dates' | |
) | |
## Regression | |
X_beta = pm.Normal( | |
name='X_beta', | |
mu=0, | |
sigma=0.5, | |
dims=('x_columns') | |
) | |
epsilon = pm.Exponential( | |
name='epsilon', | |
lam=1 | |
) | |
lam = pm.Gamma( | |
name="lam", | |
alpha=2, | |
beta=2, | |
dims=('x_columns') | |
) | |
X_saturated = logistic_saturation( | |
X, | |
lam=lam | |
) | |
alpha = pm.Beta( | |
name='alpha', | |
alpha=10, | |
beta=0.1, | |
dims=('x_columns') | |
) | |
theta = pm.Uniform( | |
name='theta', | |
lower=0, | |
upper=7, | |
dims=('x_columns') | |
) # delay in adstock | |
X_saturated_adstock = delayed_adstock( | |
X_saturated, | |
alpha=alpha, | |
theta=theta, | |
l_max=l_max, | |
normalize=False, | |
axis=0 | |
) | |
X_saturated_adstock_complete = X_saturated_adstock[burn_in:, :] | |
X_effect = X_beta * X_saturated_adstock_complete | |
y_estimate = X_effect.sum(axis=-1) | |
# Model | |
y = pm.Normal('y', mu = y_estimate, sigma=epsilon, observed=y, dims='dates') | |
if testing == True: | |
trace_settings = { | |
'chains': 1, | |
'draws': 10, | |
'tune': 10 | |
} | |
else: | |
trace_settings = { | |
'chains': 4, | |
'draws': 1000, | |
'tune': 2000, | |
'target_accept': 0.95 | |
} | |
trace = pm.sample( | |
**trace_settings | |
) | |
posterior = pm.sample_posterior_predictive(trace, return_inferencedata=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment