Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save popcsev/36b25f1ceff5db3be288aa92215233cd to your computer and use it in GitHub Desktop.
Save popcsev/36b25f1ceff5db3be288aa92215233cd to your computer and use it in GitHub Desktop.
def BayesianMMM(splits="W"):
if splits == "Q":
time_series = pd.PeriodIndex(dates, freq='Q').astype(str).str[-1].astype(int).values
elif splits == "H":
time_series = pd.PeriodIndex(dates, freq='Q').astype(str).str[-1].map({'1':1, '2':1, '3':2, '4':2}).values
elif splits == "YoY":
time_series = np.array([1]*52 + [2]*52)
else:
time_series = np.arange(104)
def scale(x, y):
return x * y.sum() / x.sum()
with pm.Model(coords={"time": time_series}) as mmm:
target = media_transformed['REVENUE'] / media_transformed['REVENUE'].mean()
# std of random walk
sigma_walk = pm.Uniform("sigma_walk", lower=0, upper=1)
media_contributions = []
for channel in channel_priors.keys():
# define coefficient
channel_prior = channel_priors[channel]
rolling_channel_coefficient = pm.GaussianRandomWalk(
f"coefficient_{channel}",
sigma=sigma_walk,
init_dist=pm.Normal.dist(channel_prior, 0.01),
dims="time"
)
# define saturation
alpha = pm.Uniform(f"alpha_{channel}", lower=0.5, upper=2)
gamma = pm.Uniform(f"gamma_{channel}", lower=0.5, upper=1.5)
saturated_media = hill_transform(
pt.as_tensor_variable(media_transformed[channel] / media_transformed[channel].mean()),
alpha,
gamma
)
scaled_media = scale(saturated_media, target)
scaled_media /= scaled_media.mean()
# contribution
channel_contribution = pm.Deterministic(f"contribution_{channel}", rolling_channel_coefficient * scaled_media)
media_contributions.append(channel_contribution)
# controls
holiday_coefficient = pm.TruncatedNormal("coefficient_holiday", mu=holiday_prior, sigma=0.0001, lower=0)
controls = pm.Deterministic("contribution_holiday", holiday_coefficient * scale(pt.as_tensor_variable(bias['holiday_period']), target))
# trend
trend_coefficient = pm.Normal("coefficient_trend", mu=trend_prior, sigma=0.0001)
trend = pm.Deterministic("contribution_trend", trend_coefficient * pt.as_tensor_variable(target.shift(1).fillna(method='backfill')))
# seasonality
seasonality = []
for i in np.arange(1,d+1):
coeff_cos = pm.Normal(f"coefficient_seasonality_cos_{i}", mu=seasonality_prior, sigma=0.0001)
coeff_sin = pm.Normal(f"coefficient_seasonality_sin_{i}", mu=seasonality_prior, sigma=0.0001)
cos_term = pm.Deterministic(f"contribution_seasonality_cos_{i}", coeff_cos * pt.as_tensor_variable(bias[f"cos_{i}"]) * target.sum()/26)
sin_term = pm.Deterministic(f"contribution_seasonality_sin_{i}", coeff_sin * pt.as_tensor_variable(bias[f"sin_{i}"]) * target.sum()/26)
seasonality.extend([cos_term, sin_term])
noise = pm.Uniform("sigma", lower=0, upper=0.02)
intercept_coefficient = pm.TruncatedNormal("coefficient_intercept", mu=0.5, sigma=0.0001, lower=0)
intercept = pm.Deterministic("contribution_intercept", intercept_coefficient * target.mean())
# define likelihood
likelihood = pm.Normal("revenue",
mu = intercept + trend + sum(seasonality) + sum(media_contributions),
sigma = noise,
observed=target)
# inference
trace = pm.sample(tune=1000, chains=1)
return mmm, trace
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment