Last active
March 23, 2023 16:38
-
-
Save znwang25/490545dbe762dcfdae5329b2e2fa120d to your computer and use it in GitHub Desktop.
BanditCVR
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import datetime | |
from collections import Counter | |
class Arm(object): | |
""" | |
Estimate the CVR (theta) and delay distribution parameter (lambda) from the conversion logs. | |
""" | |
def __init__( | |
self, | |
data_generator, | |
theta_init=0.1, | |
lambda_init=1 / 100, | |
alpha_init=1, | |
beta_init=1, | |
): | |
self.last_theta = theta_init | |
self.last_lambda = lambda_init | |
self.alpha = alpha_init | |
self.beta = beta_init | |
self.data_generator = data_generator | |
def get_latest_conversion_log_for_arm(self): | |
self.conversion_log = self.data_generator.get_conversion_log() | |
def prepare_weight(self, e_theta, e_lambda): | |
self.conversion_log["elapsed_time"] = np.where( | |
self.conversion_log.conversion_flag, | |
self.conversion_log.conversion_timestamp | |
- self.conversion_log.click_timestamp, | |
self.conversion_log.last_update_timestamp | |
- self.conversion_log.click_timestamp, | |
) | |
self.conversion_log["elapsed_time"] = ( | |
self.conversion_log["elapsed_time"].dt.total_seconds() / 3600 | |
) | |
self.conversion_log["weight"] = np.where( | |
self.conversion_log.conversion_flag, | |
1, | |
e_theta | |
* np.exp(-e_lambda * self.conversion_log["elapsed_time"]) | |
/ ( | |
1 | |
- e_theta | |
+ e_theta * np.exp(-e_lambda * self.conversion_log["elapsed_time"]) | |
), | |
) | |
def get_estimate_one_cycle(self): | |
N_cum_conversion = np.sum(self.conversion_log["conversion_flag"]) | |
if N_cum_conversion != 0: | |
e_lambda = N_cum_conversion / np.dot( | |
self.conversion_log["elapsed_time"], self.conversion_log["weight"] | |
) | |
e_theta = N_cum_conversion / np.sum( | |
1 | |
- np.exp( | |
-e_lambda | |
* ( | |
( | |
self.conversion_log.last_update_timestamp | |
- self.conversion_log.click_timestamp | |
).dt.total_seconds() | |
/ 3600 | |
) | |
) | |
) | |
else: | |
e_theta = np.nan | |
e_lambda = np.nan | |
return e_theta, e_lambda, N_cum_conversion | |
def get_estimate(self, n_cycle=20): | |
self.get_latest_conversion_log_for_arm() | |
e_theta, e_lambda = self.last_theta, self.last_lambda | |
for i in range(n_cycle): | |
self.prepare_weight(e_theta, e_lambda) | |
e_theta, e_lambda, N_cum_conversion = self.get_estimate_one_cycle() | |
if np.isnan(e_theta): | |
e_theta = self.last_theta | |
if np.isnan(e_lambda): | |
e_lambda = self.last_lambda | |
self.last_theta = e_theta | |
self.last_lambda = e_lambda | |
self.alpha = 1 + N_cum_conversion | |
self.beta = max(1, 1 - N_cum_conversion + N_cum_conversion / e_theta) | |
self.naive_cvr = N_cum_conversion / self.conversion_log.click_id.count() | |
def draw_beta_sample(self, size): | |
return np.random.beta(self.alpha, self.beta, size) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import datetime | |
from collections import Counter | |
class ConversionLogDataGenerator(object): | |
def __init__(self, real_theta=0.05, real_lambda=50, seed=123): | |
self.real_theta = real_theta | |
self.real_lambda = real_lambda | |
self.conversion_log = None | |
self.current_timestamp = datetime.datetime(2022, 1, 1) | |
self.rng = np.random.default_rng(seed) | |
def gen_new_logs(self, size=200): | |
""" | |
Generate fake log data. | |
""" | |
click_id = np.arange(size) | |
click_timestamp = self.current_timestamp + self.rng.random( | |
size | |
) * datetime.timedelta(days=1) | |
conversion_flag_latent = self.rng.binomial(1, self.real_theta, size) | |
conversion_timestamp_latent = np.where( | |
conversion_flag_latent, | |
click_timestamp | |
+ self.rng.exponential(1 / self.real_lambda, size) | |
* datetime.timedelta(hours=1), | |
np.nan, | |
) | |
df = pd.DataFrame( | |
{ | |
"click_id": click_id, | |
"click_timestamp": click_timestamp, | |
"conversion_flag_latent": conversion_flag_latent, | |
"conversion_timestamp_latent": conversion_timestamp_latent, | |
} | |
) | |
return df | |
def get_conversion_log(self): | |
""" | |
Return all the conversion log cumulated to present. | |
And only show conversions that can be observed at the update time. | |
The returned output is a DataFrame with the following columns: | |
click_id, | |
click_timestamp, | |
conversion_flag, | |
conversion_timestamp, | |
last_update_timestamp | |
""" | |
new_logs = self.gen_new_logs() | |
if self.conversion_log is not None: | |
self.conversion_log = pd.concat([new_logs, self.conversion_log], sort=False) | |
else: | |
self.conversion_log = new_logs | |
self.current_timestamp = self.current_timestamp + datetime.timedelta(days=1) | |
self.conversion_log["last_update_timestamp"] = self.current_timestamp | |
out_df = self.conversion_log.copy() | |
out_df["conversion_flag"] = np.where( | |
out_df.conversion_timestamp_latent < out_df.last_update_timestamp, 1, 0 | |
) | |
out_df["conversion_timestamp"] = out_df.conversion_timestamp_latent.where( | |
out_df["conversion_flag"] == 1, np.nan | |
) | |
out_df = out_df.drop( | |
columns=["conversion_flag_latent", "conversion_timestamp_latent"] | |
) | |
return out_df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import datetime | |
from collections import Counter | |
def compute_assignment_probability(arms, size=1000): | |
""" | |
Stochastic sampling: take n (@size) draws for each arm, | |
compute assignment probability as the probability of being the best arm. | |
@param arms list[Arm]: list of Arm objects | |
@return assignment_probability np.ndarray: an array of assignment probability for each arm | |
""" | |
K = len(arms) | |
sample_p = np.array([arm.draw_beta_sample(size) for arm in arms]) | |
idx = np.argmax(sample_p, axis=0) | |
count = Counter({key: 0 for key in range(K)}) | |
count.update(idx) | |
assignment_probability = np.array(list(count.values())) / size | |
return assignment_probability |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import datetime | |
from collections import Counter | |
class ConversionLogDataGenerator(object): | |
def __init__(self, real_theta=0.05, real_lambda=50, seed=123): | |
self.real_theta = real_theta | |
self.real_lambda = real_lambda | |
self.conversion_log = None | |
self.current_timestamp = datetime.datetime(2022, 1, 1) | |
self.rng = np.random.default_rng(seed) | |
def gen_new_logs(self, size=200): | |
""" | |
Generate fake log data. | |
""" | |
click_id = np.arange(size) | |
click_timestamp = self.current_timestamp + self.rng.random( | |
size | |
) * datetime.timedelta(days=1) | |
conversion_flag_latent = self.rng.binomial(1, self.real_theta, size) | |
conversion_timestamp_latent = np.where( | |
conversion_flag_latent, | |
click_timestamp | |
+ self.rng.exponential(1 / self.real_lambda, size) | |
* datetime.timedelta(hours=1), | |
np.nan, | |
) | |
df = pd.DataFrame( | |
{ | |
"click_id": click_id, | |
"click_timestamp": click_timestamp, | |
"conversion_flag_latent": conversion_flag_latent, | |
"conversion_timestamp_latent": conversion_timestamp_latent, | |
} | |
) | |
return df | |
def get_conversion_log(self): | |
""" | |
Return all the conversion log cumulated to present. | |
And only show conversions that can be observed at the update time. | |
The returned output is a DataFrame with the following columns: | |
click_id, | |
click_timestamp, | |
conversion_flag, | |
conversion_timestamp, | |
last_update_timestamp | |
""" | |
new_logs = self.gen_new_logs() | |
if self.conversion_log is not None: | |
self.conversion_log = pd.concat([new_logs, self.conversion_log], sort=False) | |
else: | |
self.conversion_log = new_logs | |
self.current_timestamp = self.current_timestamp + datetime.timedelta(days=1) | |
self.conversion_log["last_update_timestamp"] = self.current_timestamp | |
out_df = self.conversion_log.copy() | |
out_df["conversion_flag"] = np.where( | |
out_df.conversion_timestamp_latent < out_df.last_update_timestamp, 1, 0 | |
) | |
out_df["conversion_timestamp"] = out_df.conversion_timestamp_latent.where( | |
out_df["conversion_flag"] == 1, np.nan | |
) | |
out_df = out_df.drop( | |
columns=["conversion_flag_latent", "conversion_timestamp_latent"] | |
) | |
return out_df | |
class Arm(object): | |
""" | |
Estimate the CVR (theta) and delay distribution parameter (lambda) from the conversion logs. | |
""" | |
def __init__( | |
self, | |
data_generator, | |
theta_init=0.1, | |
lambda_init=1 / 100, | |
alpha_init=1, | |
beta_init=1, | |
): | |
self.last_theta = theta_init | |
self.last_lambda = lambda_init | |
self.alpha = alpha_init | |
self.beta = beta_init | |
self.data_generator = data_generator | |
def get_latest_conversion_log_for_arm(self): | |
self.conversion_log = self.data_generator.get_conversion_log() | |
def prepare_weight(self, e_theta, e_lambda): | |
self.conversion_log["elapsed_time"] = np.where( | |
self.conversion_log.conversion_flag, | |
self.conversion_log.conversion_timestamp | |
- self.conversion_log.click_timestamp, | |
self.conversion_log.last_update_timestamp | |
- self.conversion_log.click_timestamp, | |
) | |
self.conversion_log["elapsed_time"] = ( | |
self.conversion_log["elapsed_time"].dt.total_seconds() / 3600 | |
) | |
self.conversion_log["weight"] = np.where( | |
self.conversion_log.conversion_flag, | |
1, | |
e_theta | |
* np.exp(-e_lambda * self.conversion_log["elapsed_time"]) | |
/ ( | |
1 | |
- e_theta | |
+ e_theta * np.exp(-e_lambda * self.conversion_log["elapsed_time"]) | |
), | |
) | |
def get_estimate_one_cycle(self): | |
N_cum_conversion = np.sum(self.conversion_log["conversion_flag"]) | |
if N_cum_conversion != 0: | |
e_lambda = N_cum_conversion / np.dot( | |
self.conversion_log["elapsed_time"], self.conversion_log["weight"] | |
) | |
e_theta = N_cum_conversion / np.sum( | |
1 | |
- np.exp( | |
-e_lambda | |
* ( | |
( | |
self.conversion_log.last_update_timestamp | |
- self.conversion_log.click_timestamp | |
).dt.total_seconds() | |
/ 3600 | |
) | |
) | |
) | |
else: | |
e_theta = np.nan | |
e_lambda = np.nan | |
return e_theta, e_lambda, N_cum_conversion | |
def get_estimate(self, n_cycle=20): | |
self.get_latest_conversion_log_for_arm() | |
e_theta, e_lambda = self.last_theta, self.last_lambda | |
for i in range(n_cycle): | |
self.prepare_weight(e_theta, e_lambda) | |
e_theta, e_lambda, N_cum_conversion = self.get_estimate_one_cycle() | |
if np.isnan(e_theta): | |
e_theta = self.last_theta | |
if np.isnan(e_lambda): | |
e_lambda = self.last_lambda | |
self.last_theta = e_theta | |
self.last_lambda = e_lambda | |
self.alpha = 1 + N_cum_conversion | |
self.beta = max(1, 1 - N_cum_conversion + N_cum_conversion / e_theta) | |
self.naive_cvr = N_cum_conversion / self.conversion_log.click_id.count() | |
def draw_beta_sample(self, size): | |
return np.random.beta(self.alpha, self.beta, size) | |
def compute_assignment_probability(arms, size=1000): | |
""" | |
Stochastic sampling: take n (@size) draws for each arm, | |
compute assignment probability as the probability of being the best arm. | |
@param arms list[Arm]: list of Arm objects | |
@return assignment_probability np.ndarray: an array of assignment probability for each arm | |
""" | |
K = len(arms) | |
sample_p = np.array([arm.draw_beta_sample(size) for arm in arms]) | |
idx = np.argmax(sample_p, axis=0) | |
count = Counter({key: 0 for key in range(K)}) | |
count.update(idx) | |
assignment_probability = np.array(list(count.values())) / size | |
return assignment_probability | |
def pretty_print(arms): | |
assign_prob = compute_assignment_probability(arms) | |
print( | |
"{0:^15}|{1:^15}|{2:^15}|{3:^15}|{4:^15}".format( | |
"Arm", "Naive CVR", "Corr'd. CVR", "Mean delay", "Assign Prob" | |
) | |
) | |
print(("-" * 15 + "+") * 4 + ("-" * 15)) | |
for i, arm in enumerate(arms): | |
print( | |
"{0:^15}|{1:^15.5f}|{2:^15.5f}|{3:^15.3f}|{4:^15.4f}".format( | |
i, arm.naive_cvr, arm.last_theta, 1 / arm.last_lambda, assign_prob[i] | |
) | |
) | |
print(("-" * 15 + "+") * 4 + ("-" * 15) + "\n") | |
if __name__ == "__main__": | |
arm0 = Arm(ConversionLogDataGenerator(real_theta=0.035, real_lambda=1 / 100)) | |
arm1 = Arm(ConversionLogDataGenerator(real_theta=0.04, real_lambda=1 / 150)) | |
arm2 = Arm(ConversionLogDataGenerator(real_theta=0.045, real_lambda=1 / 200)) | |
arms = [arm0, arm1, arm2] | |
for t in range(30): | |
print(f"Iteration {t}") | |
for i, arm in enumerate(arms): | |
arm.get_estimate() | |
pretty_print(arms) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment