Skip to content

Instantly share code, notes, and snippets.

@MBrouns
Created Feb 26, 2021
Embed
What would you like to do?
timeseers_data_gen
def trend_data(n_changepoints, location="spaced", noise=0.001):
delta = np.random.laplace(size=n_changepoints)
t = np.linspace(0, 1, 1000)
if location == "random":
s = np.sort(np.random.choice(t, n_changepoints, replace=False))
elif location == "spaced":
s = np.linspace(0, np.max(t), n_changepoints + 2)[1:-1]
else:
raise ValueError('invalid `location`, should be "random" or "spaced"')
A = (t[:, None] > s) * 1
k, m = 0, 0
growth = k + A @ delta
gamma = -s * delta
offset = m + A @ gamma
trend = growth * t + offset + np.random.randn(len(t)) * noise
return (
pd.DataFrame({"t": pd.date_range("2018-1-1", periods=len(t)), "value": trend}),
delta,
)
def seasonal_data(n_components, noise=0.001):
def X(t, p=365.25, n=10):
x = 2 * np.pi * (np.arange(n) + 1) * t[:, None] / p
return np.concatenate((np.cos(x), np.sin(x)), axis=1)
t = np.linspace(0, 1, 1000)
beta = np.random.normal(size=2 * n_components)
seasonality = X(t, 365.25 / len(t), n_components) @ beta + np.random.randn(len(t)) * noise
return (
pd.DataFrame(
{"t": pd.date_range("2018-1-1", periods=len(t)), "value": seasonality}
),
beta,
)
def X(t, p=365.25, n=10):
x = 2 * np.pi * (np.arange(n) + 1) * t[:, None] / p
return np.concatenate((np.cos(x), np.sin(x)), axis=1)
def make_series(s, delta, k, m, yearly_beta, weekly_beta, yearly_ho, noise):
t = np.linspace(0, 1, 1000)
A = (t[:, None] > s) * 1
growth = k + A @ delta
gamma = -s * delta
offset = m + A @ gamma
trend = growth * t + offset + np.random.randn(len(t)) * noise
yearly_seasonality = X(t, 365.25 / len(t), len(yearly_beta) // 2) @ yearly_beta + np.random.randn(len(t)) * noise
yearly_ho_seasonality = X(t, 365.25 / len(t), len(yearly_ho) // 2) @ yearly_ho + np.random.randn(len(t))
weekly_seasonality = X(t, 7 / len(t), len(weekly_beta) // 2) @ weekly_beta + np.random.randn(len(t)) * noise
return (
pd.DataFrame({"t": pd.date_range("2018-1-1", periods=len(t)), "value": 10000 + trend + yearly_seasonality + weekly_seasonality + yearly_ho_seasonality}),
delta,
)
n_changepoints = 12
n_yearly_components = 1
n_weekly_components = 2
t = np.linspace(0, 1, 1000)
summer_s = np.sort(np.random.choice(t, n_changepoints, replace=False))
yearly_summer_beta = np.random.normal(size=2 * n_yearly_components) * 2000
yearly_winter_beta = -1 * yearly_summer_beta
weekly_beta = np.random.normal(size=2 * n_weekly_components) * 100
parameters = {
'summer_1': {
'k': 0,
'm': 1000,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 4000,
'yearly_beta': yearly_summer_beta,
'yearly_ho': np.random.normal(size=12) * 200,
'weekly_beta': weekly_beta * 2.2,
'noise': 50
},
'summer_2': {
'k': 0,
'm': 100,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 1000,
'yearly_beta': yearly_summer_beta * 1.1 + np.random.normal(size=2 * n_yearly_components) * 500,
'yearly_ho': np.random.normal(size=12) * 300,
'weekly_beta': weekly_beta,
'noise': 50
},
'summer_3': {
'k': 0,
'm': 5000,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 1000,
'yearly_beta': yearly_summer_beta * 0.93,
'yearly_ho': np.random.normal(size=12) * 300,
'weekly_beta': weekly_beta * 0.4,
'noise': 150
},
'winter_1': {
'k': 0,
'm': 2000,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 1000,
'yearly_beta': yearly_winter_beta * 1.3,
'yearly_ho': np.random.normal(size=12) * 300,
'weekly_beta': weekly_beta,
'noise': 50
},
'winter_2': {
'k': 0,
'm': 6000,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 1000,
'yearly_beta': yearly_winter_beta,
'yearly_ho': np.random.normal(size=12) * 500,
'weekly_beta': weekly_beta * 0.4,
'noise': 50
},
'all_year': {
'k': 0,
'm': 2500,
's': summer_s,
'delta': np.random.laplace(size=n_changepoints) * 1000,
'yearly_beta': yearly_summer_beta * 0.1,
'yearly_ho': np.random.normal(size=12) * 200,
'weekly_beta': weekly_beta,
'noise': 50
}
}
def add_promos(df, promos):
df = df.assign(promo_multiplier=1, days_in_promo=0).copy()
for product, promo_start, promo_length, effect in promos:
promo_pattern = st.lognorm(1, 0, 15).pdf(np.linspace(0, 30, 30)) * effect
df.loc[(
df['series'] == product) &
(df['t'] >= promo_start) &
(df['t'] < promo_start + promo_length), 'days_in_promo'] = np.linspace(0, promo_length.days - 1, promo_length.days)
df.loc[(
df['series'] == product) &
(df['t'] >= promo_start) &
(df['t'] < promo_start + promo_length), 'promo_multiplier'] += promo_pattern
return df
promos = [
('summer_1', datetime(2018, 4, 1), timedelta(days=30), 7),
('summer_2', datetime(2018, 6, 1), timedelta(days=30), 7),
('summer_1', datetime(2019, 5, 1), timedelta(days=30), 7),
('winter_2', datetime(2019, 7, 1), timedelta(days=30), 7),
('summer_3', datetime(2020, 7, 1), timedelta(days=30), 7),
]
np.random.seed(42)
df = (
pd.concat([make_series(**parameter)[0].assign(series=series) for series, parameter in parameters.items()])
.loc[lambda d: d['series'].str.startswith('summer') | d['series'].str.startswith('winter')]
.assign(
group=lambda d: pd.Categorical(d['series'].str.split('_').str[0]),
series=lambda d: pd.Categorical(d['series']),
).sort_values('t')
.loc[lambda d: ~((d['series'] == 'summer_3') & (d['t'] < datetime(2020, 5, 1)))]
.loc[lambda d: ~((d['series'] == 'summer_2') & (d['t'] > datetime(2019, 9, 1)))]
.loc[lambda d: ~((d['series'] == 'winter_1') & (d['t'] < datetime(2020, 5, 1)))]
.loc[lambda d: ~((d['series'] == 'winter_1') & (d['t'] > datetime(2020, 9, 1)))]
.loc[lambda d: ~((d['series'] == 'winter_2') & ((d['t'] < datetime(2019, 4, 1)) | (d['t'] > datetime(2020, 4, 1))))]
.assign(value=(lambda d: np.where(np.random.randint(0, 100, size=len(d)) == 10, 0, d['value'])))
.assign(value=lambda d: d['value'] + np.random.randn(len(d['value'])) * 800)
.pipe(add_promos, promos)
.assign(value=lambda d: d['value'] * d['promo_multiplier'])
.assign(days_in_promo=lambda d: pd.Categorical(d['days_in_promo']))
.reset_index()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment