Skip to content

Instantly share code, notes, and snippets.

@lspinheiro
Created June 22, 2021 09:00
Show Gist options
  • Save lspinheiro/6df74043b538056740b73d04b8ec5038 to your computer and use it in GitHub Desktop.
Save lspinheiro/6df74043b538056740b73d04b8ec5038 to your computer and use it in GitHub Desktop.
import math
import numpy as np
cdef class GlickoRating:
cdef public double mu, phi, sigma
cdef readonly double elo_scale
cdef readonly double glicko_scale
cdef public list fights
cdef public list scores
def __cinit__(self, double rating, double rd, double sigma):
self.elo_scale = 1500.
self.glicko_scale = 173.7178
self.mu = (rating - self.elo_scale) / self.glicko_scale
self.phi = rd / self.glicko_scale
self.sigma = sigma
self.fights = []
self.scores = []
def add_fight(self, str opponent, double score):
self.fights.append(opponent)
self.scores.append(score)
cdef double get_rating(self):
return self.mu * self.glicko_scale + self.elo_scale
def update_params(self, mu: float, phi, sigma: float):
self.mu = mu
self.phi = phi
self.sigma = sigma
cdef double get_rd(self):
return self.phi * self.glicko_scale
def reset_fights(self) -> None:
self.fights = []
self.scores = []
@property
def rating(self):
return self.mu * self.glicko_scale + self.elo_scale
@property
def rd(self):
return self.phi * self.glicko_scale
def __repr__(self):
return "rating: {}, RD = {}".format(self.rating, self.rd)
cdef class Glicko2:
cdef double tau
cdef dict rating_dict
def __init__(self, double tau):
self.tau = tau
self.rating_dict = {}
def __getitem__(self, player: str) -> float:
return self.rating_dict[player]
def __setitem__(self, fighter: str, fighter_rating: GlickoRating) -> None:
self.rating_dict[fighter] = fighter_rating
def add_player(self, name: str, rating: float = 1500., rd: float = 350., sigma: float = 0.06):
fighter = GlickoRating(rating, rd, sigma)
self.rating_dict[name] = fighter
cpdef double get_g_factor(self, double phi):
return 1. / math.sqrt(1. + (3. * phi**2) / (math.pi**2))
cpdef double get_expect_result(self, double mu_p1, double mu_p2, double g_p2):
return 1. / (1 + math.exp(-g_p2 * (mu_p1 - mu_p2)))
def get_win_prob(self, p1_name: str, p2_name: str) -> float:
cdef double mu_p1 = self.rating_dict[p1_name].mu
cdef double mu_p2 = self.rating_dict[p2_name].mu
cdef double phi_p1 = self.rating_dict[p1_name].phi
cdef double phi_p2 = self.rating_dict[p2_name].phi
cdef double g_phi_diff = self.get_g_factor(
math.sqrt(phi_p1**2 + phi_p2**2)
)
cdef double p1_win_prob = self.get_expect_result(mu_p1, mu_p2, g_phi_diff)
return p1_win_prob
cdef double update_volatility(self, double vol, double phi, double Delta, double v):
"""
Applies the Illinois algorithm to find the new volatility value.
"""
def check_fun(double x):
"""
Function to be optimized. No details given in the paper.
"""
cdef double term_a = math.exp(x) * (Delta**2 - phi**2 -v - math.exp(x))
cdef double term_b = 2 * (phi**2 + v + math.exp(x))**2
cdef double term_c = x - a
cdef double term_d = self.tau**2
return term_a / term_b - term_c / term_d
cdef double tol = 1e-6
cdef double a = math.log(vol**2)
cdef double A = math.log(vol**2)
cdef double B, k, b_check
if Delta**2 > phi**2 + v:
B = math.log(Delta**2 - phi**2 - v)
else:
k = 1.
b_check = check_fun(a - k * self.tau)
while b_check < 0:
k += 1
b_check = check_fun(a - k * self.tau)
B = a - k * self.tau
cdef double fA = check_fun(A)
cdef double fB = check_fun(B)
cdef double C, fC
while abs(B - A) > tol:
C = A + (A - B) * ( fA / (fB - fA))
fC = check_fun(C)
if fC * fB < 0.:
A, fA = B, fB
else:
fA = fA / 2.
B, fB = C, fC
cdef double new_vol = math.exp(A / 2.)
return new_vol
def update_ratings_no_fights(self, p1_name: str) -> tuple:
"""
Applies only step 6 of the algorithm
"""
cdef GlickoRating p1 = self.rating_dict[p1_name]
cdef double p1_updated_phi = math.sqrt(p1.phi**2 + p1.sigma**2)
return (p1.mu, p1_updated_phi, p1.sigma)
def update_ratings(self, p1_name: str) -> tuple:
"""
Applies the Glicko2 7 step algorithm to update ratings, RD and volatility.
"""
# Step 2
cdef GlickoRating p1 = self.rating_dict[p1_name]
cdef list p2_list = p1.fights
cdef int num_fights = len(p2_list)
cdef double[:] scores = np.empty(num_fights, dtype=np.double)
cdef int score_idx
cdef double score
for score_idx, score in enumerate(p1.scores):
scores[score_idx] = score
# Step 3
cdef GlickoRating p2
cdef double p2_g
cdef double p12_e
cdef double[:] e_vals = np.empty(num_fights, dtype=np.double)
cdef double[:] g_vals = np.empty(num_fights, dtype=np.double)
cdef int pidx
for pidx, p2_name in enumerate(p2_list):
p2 = self.rating_dict[p2_name]
p2_g = self.get_g_factor(p2.phi)
p12_e = self.get_expect_result(p1.mu, p2.mu, p2_g)
g_vals[pidx] = p2_g
e_vals[pidx] = p12_e
cdef double p1_v = 1. / sum([(g_vals[pidx]**2 * e_vals[pidx] * (1. - e_vals[pidx])) for pidx in range(num_fights)])
# Step 4
cdef double p1_Delta = p1_v * sum([g_vals[pidx] * (scores[pidx] - e_vals[pidx]) for pidx in range(num_fights)] )
# Step 5
cdef double p1_updated_sigma = self.update_volatility(p1.sigma, p1.phi, p1_Delta, p1_v)
# Step 6
cdef double p1_phi_star = math.sqrt(p1.phi**2 + p1_updated_sigma**2)
# Step 7
cdef double p1_updated_phi = 1. / math.sqrt((1. / p1_phi_star**2) + (1. / p1_v))
cdef double p1_updated_mu = p1.mu + p1_updated_phi**2 * sum([g_vals[pidx] * (scores[pidx] - e_vals[pidx]) for pidx in range(num_fights)])
return (p1_updated_mu, p1_updated_phi, p1_updated_sigma)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment