Skip to content

Instantly share code, notes, and snippets.

@znwang25
Last active March 23, 2023 16:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save znwang25/490545dbe762dcfdae5329b2e2fa120d to your computer and use it in GitHub Desktop.
Save znwang25/490545dbe762dcfdae5329b2e2fa120d to your computer and use it in GitHub Desktop.
BanditCVR
import numpy as np
import pandas as pd
import datetime
from collections import Counter
class Arm(object):
"""
Estimate the CVR (theta) and delay distribution parameter (lambda) from the conversion logs.
"""
def __init__(
self,
data_generator,
theta_init=0.1,
lambda_init=1 / 100,
alpha_init=1,
beta_init=1,
):
self.last_theta = theta_init
self.last_lambda = lambda_init
self.alpha = alpha_init
self.beta = beta_init
self.data_generator = data_generator
def get_latest_conversion_log_for_arm(self):
self.conversion_log = self.data_generator.get_conversion_log()
def prepare_weight(self, e_theta, e_lambda):
self.conversion_log["elapsed_time"] = np.where(
self.conversion_log.conversion_flag,
self.conversion_log.conversion_timestamp
- self.conversion_log.click_timestamp,
self.conversion_log.last_update_timestamp
- self.conversion_log.click_timestamp,
)
self.conversion_log["elapsed_time"] = (
self.conversion_log["elapsed_time"].dt.total_seconds() / 3600
)
self.conversion_log["weight"] = np.where(
self.conversion_log.conversion_flag,
1,
e_theta
* np.exp(-e_lambda * self.conversion_log["elapsed_time"])
/ (
1
- e_theta
+ e_theta * np.exp(-e_lambda * self.conversion_log["elapsed_time"])
),
)
def get_estimate_one_cycle(self):
N_cum_conversion = np.sum(self.conversion_log["conversion_flag"])
if N_cum_conversion != 0:
e_lambda = N_cum_conversion / np.dot(
self.conversion_log["elapsed_time"], self.conversion_log["weight"]
)
e_theta = N_cum_conversion / np.sum(
1
- np.exp(
-e_lambda
* (
(
self.conversion_log.last_update_timestamp
- self.conversion_log.click_timestamp
).dt.total_seconds()
/ 3600
)
)
)
else:
e_theta = np.nan
e_lambda = np.nan
return e_theta, e_lambda, N_cum_conversion
def get_estimate(self, n_cycle=20):
self.get_latest_conversion_log_for_arm()
e_theta, e_lambda = self.last_theta, self.last_lambda
for i in range(n_cycle):
self.prepare_weight(e_theta, e_lambda)
e_theta, e_lambda, N_cum_conversion = self.get_estimate_one_cycle()
if np.isnan(e_theta):
e_theta = self.last_theta
if np.isnan(e_lambda):
e_lambda = self.last_lambda
self.last_theta = e_theta
self.last_lambda = e_lambda
self.alpha = 1 + N_cum_conversion
self.beta = max(1, 1 - N_cum_conversion + N_cum_conversion / e_theta)
self.naive_cvr = N_cum_conversion / self.conversion_log.click_id.count()
def draw_beta_sample(self, size):
return np.random.beta(self.alpha, self.beta, size)
import numpy as np
import pandas as pd
import datetime
from collections import Counter
class ConversionLogDataGenerator(object):
def __init__(self, real_theta=0.05, real_lambda=50, seed=123):
self.real_theta = real_theta
self.real_lambda = real_lambda
self.conversion_log = None
self.current_timestamp = datetime.datetime(2022, 1, 1)
self.rng = np.random.default_rng(seed)
def gen_new_logs(self, size=200):
"""
Generate fake log data.
"""
click_id = np.arange(size)
click_timestamp = self.current_timestamp + self.rng.random(
size
) * datetime.timedelta(days=1)
conversion_flag_latent = self.rng.binomial(1, self.real_theta, size)
conversion_timestamp_latent = np.where(
conversion_flag_latent,
click_timestamp
+ self.rng.exponential(1 / self.real_lambda, size)
* datetime.timedelta(hours=1),
np.nan,
)
df = pd.DataFrame(
{
"click_id": click_id,
"click_timestamp": click_timestamp,
"conversion_flag_latent": conversion_flag_latent,
"conversion_timestamp_latent": conversion_timestamp_latent,
}
)
return df
def get_conversion_log(self):
"""
Return all the conversion log cumulated to present.
And only show conversions that can be observed at the update time.
The returned output is a DataFrame with the following columns:
click_id,
click_timestamp,
conversion_flag,
conversion_timestamp,
last_update_timestamp
"""
new_logs = self.gen_new_logs()
if self.conversion_log is not None:
self.conversion_log = pd.concat([new_logs, self.conversion_log], sort=False)
else:
self.conversion_log = new_logs
self.current_timestamp = self.current_timestamp + datetime.timedelta(days=1)
self.conversion_log["last_update_timestamp"] = self.current_timestamp
out_df = self.conversion_log.copy()
out_df["conversion_flag"] = np.where(
out_df.conversion_timestamp_latent < out_df.last_update_timestamp, 1, 0
)
out_df["conversion_timestamp"] = out_df.conversion_timestamp_latent.where(
out_df["conversion_flag"] == 1, np.nan
)
out_df = out_df.drop(
columns=["conversion_flag_latent", "conversion_timestamp_latent"]
)
return out_df
import numpy as np
import pandas as pd
import datetime
from collections import Counter
def compute_assignment_probability(arms, size=1000):
"""
Stochastic sampling: take n (@size) draws for each arm,
compute assignment probability as the probability of being the best arm.
@param arms list[Arm]: list of Arm objects
@return assignment_probability np.ndarray: an array of assignment probability for each arm
"""
K = len(arms)
sample_p = np.array([arm.draw_beta_sample(size) for arm in arms])
idx = np.argmax(sample_p, axis=0)
count = Counter({key: 0 for key in range(K)})
count.update(idx)
assignment_probability = np.array(list(count.values())) / size
return assignment_probability
import numpy as np
import pandas as pd
import datetime
from collections import Counter
class ConversionLogDataGenerator(object):
def __init__(self, real_theta=0.05, real_lambda=50, seed=123):
self.real_theta = real_theta
self.real_lambda = real_lambda
self.conversion_log = None
self.current_timestamp = datetime.datetime(2022, 1, 1)
self.rng = np.random.default_rng(seed)
def gen_new_logs(self, size=200):
"""
Generate fake log data.
"""
click_id = np.arange(size)
click_timestamp = self.current_timestamp + self.rng.random(
size
) * datetime.timedelta(days=1)
conversion_flag_latent = self.rng.binomial(1, self.real_theta, size)
conversion_timestamp_latent = np.where(
conversion_flag_latent,
click_timestamp
+ self.rng.exponential(1 / self.real_lambda, size)
* datetime.timedelta(hours=1),
np.nan,
)
df = pd.DataFrame(
{
"click_id": click_id,
"click_timestamp": click_timestamp,
"conversion_flag_latent": conversion_flag_latent,
"conversion_timestamp_latent": conversion_timestamp_latent,
}
)
return df
def get_conversion_log(self):
"""
Return all the conversion log cumulated to present.
And only show conversions that can be observed at the update time.
The returned output is a DataFrame with the following columns:
click_id,
click_timestamp,
conversion_flag,
conversion_timestamp,
last_update_timestamp
"""
new_logs = self.gen_new_logs()
if self.conversion_log is not None:
self.conversion_log = pd.concat([new_logs, self.conversion_log], sort=False)
else:
self.conversion_log = new_logs
self.current_timestamp = self.current_timestamp + datetime.timedelta(days=1)
self.conversion_log["last_update_timestamp"] = self.current_timestamp
out_df = self.conversion_log.copy()
out_df["conversion_flag"] = np.where(
out_df.conversion_timestamp_latent < out_df.last_update_timestamp, 1, 0
)
out_df["conversion_timestamp"] = out_df.conversion_timestamp_latent.where(
out_df["conversion_flag"] == 1, np.nan
)
out_df = out_df.drop(
columns=["conversion_flag_latent", "conversion_timestamp_latent"]
)
return out_df
class Arm(object):
"""
Estimate the CVR (theta) and delay distribution parameter (lambda) from the conversion logs.
"""
def __init__(
self,
data_generator,
theta_init=0.1,
lambda_init=1 / 100,
alpha_init=1,
beta_init=1,
):
self.last_theta = theta_init
self.last_lambda = lambda_init
self.alpha = alpha_init
self.beta = beta_init
self.data_generator = data_generator
def get_latest_conversion_log_for_arm(self):
self.conversion_log = self.data_generator.get_conversion_log()
def prepare_weight(self, e_theta, e_lambda):
self.conversion_log["elapsed_time"] = np.where(
self.conversion_log.conversion_flag,
self.conversion_log.conversion_timestamp
- self.conversion_log.click_timestamp,
self.conversion_log.last_update_timestamp
- self.conversion_log.click_timestamp,
)
self.conversion_log["elapsed_time"] = (
self.conversion_log["elapsed_time"].dt.total_seconds() / 3600
)
self.conversion_log["weight"] = np.where(
self.conversion_log.conversion_flag,
1,
e_theta
* np.exp(-e_lambda * self.conversion_log["elapsed_time"])
/ (
1
- e_theta
+ e_theta * np.exp(-e_lambda * self.conversion_log["elapsed_time"])
),
)
def get_estimate_one_cycle(self):
N_cum_conversion = np.sum(self.conversion_log["conversion_flag"])
if N_cum_conversion != 0:
e_lambda = N_cum_conversion / np.dot(
self.conversion_log["elapsed_time"], self.conversion_log["weight"]
)
e_theta = N_cum_conversion / np.sum(
1
- np.exp(
-e_lambda
* (
(
self.conversion_log.last_update_timestamp
- self.conversion_log.click_timestamp
).dt.total_seconds()
/ 3600
)
)
)
else:
e_theta = np.nan
e_lambda = np.nan
return e_theta, e_lambda, N_cum_conversion
def get_estimate(self, n_cycle=20):
self.get_latest_conversion_log_for_arm()
e_theta, e_lambda = self.last_theta, self.last_lambda
for i in range(n_cycle):
self.prepare_weight(e_theta, e_lambda)
e_theta, e_lambda, N_cum_conversion = self.get_estimate_one_cycle()
if np.isnan(e_theta):
e_theta = self.last_theta
if np.isnan(e_lambda):
e_lambda = self.last_lambda
self.last_theta = e_theta
self.last_lambda = e_lambda
self.alpha = 1 + N_cum_conversion
self.beta = max(1, 1 - N_cum_conversion + N_cum_conversion / e_theta)
self.naive_cvr = N_cum_conversion / self.conversion_log.click_id.count()
def draw_beta_sample(self, size):
return np.random.beta(self.alpha, self.beta, size)
def compute_assignment_probability(arms, size=1000):
"""
Stochastic sampling: take n (@size) draws for each arm,
compute assignment probability as the probability of being the best arm.
@param arms list[Arm]: list of Arm objects
@return assignment_probability np.ndarray: an array of assignment probability for each arm
"""
K = len(arms)
sample_p = np.array([arm.draw_beta_sample(size) for arm in arms])
idx = np.argmax(sample_p, axis=0)
count = Counter({key: 0 for key in range(K)})
count.update(idx)
assignment_probability = np.array(list(count.values())) / size
return assignment_probability
def pretty_print(arms):
assign_prob = compute_assignment_probability(arms)
print(
"{0:^15}|{1:^15}|{2:^15}|{3:^15}|{4:^15}".format(
"Arm", "Naive CVR", "Corr'd. CVR", "Mean delay", "Assign Prob"
)
)
print(("-" * 15 + "+") * 4 + ("-" * 15))
for i, arm in enumerate(arms):
print(
"{0:^15}|{1:^15.5f}|{2:^15.5f}|{3:^15.3f}|{4:^15.4f}".format(
i, arm.naive_cvr, arm.last_theta, 1 / arm.last_lambda, assign_prob[i]
)
)
print(("-" * 15 + "+") * 4 + ("-" * 15) + "\n")
if __name__ == "__main__":
arm0 = Arm(ConversionLogDataGenerator(real_theta=0.035, real_lambda=1 / 100))
arm1 = Arm(ConversionLogDataGenerator(real_theta=0.04, real_lambda=1 / 150))
arm2 = Arm(ConversionLogDataGenerator(real_theta=0.045, real_lambda=1 / 200))
arms = [arm0, arm1, arm2]
for t in range(30):
print(f"Iteration {t}")
for i, arm in enumerate(arms):
arm.get_estimate()
pretty_print(arms)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment