Skip to content

Instantly share code, notes, and snippets.

@neelsomani
Created Feb 3, 2022
Embed
What would you like to do?
Comparison of different ranking algorithms: https://neelsomani.com/blog/improving-elo-ranking-system.php
import numpy as np
import random
import math
import logging
N_PLAYERS = 5
LOGGER = logging.getLogger('elo')
class Ranker:
def __init__(self):
pass
@staticmethod
def rank(games):
raise NotImplementedError
class Elo(Ranker):
K_FACTOR = 50
START_SCORE = 1200
@staticmethod
def _pr_win(rating_a, rating_b):
return 1 / (1 + 10 ** (rating_b - rating_a))
@staticmethod
def rank(games):
initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)]
for ((i, j), o) in games:
rating_i, rating_j = initial_scores[i], initial_scores[j]
rating_i, rating_j = (
rating_i + Elo.K_FACTOR * (o - Elo._pr_win(rating_i, rating_j)),
rating_j + Elo.K_FACTOR * ((1 - o) - Elo._pr_win(rating_j, rating_i))
)
initial_scores[i], initial_scores[j] = rating_i, rating_j
return np.array(initial_scores)
class Glicko(Ranker):
RD = 350
Q = .0057565
G = 1 / (1 + 3 * .0057565 ** 2 * 350 ** 2 / 3.14 ** 2) ** .5
@staticmethod
def _expected_score(r, r_j):
return 1 / (1 + 10 ** (-Glicko.G * (r - r_j) / 400))
@staticmethod
def rank(games):
initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)]
final_scores = []
for i in range(N_PLAYERS):
# Calculate d_sq
d_sq = 0
for j in range(N_PLAYERS):
for ((p_1, p_2), o) in games:
if p_1 == i and p_2 == j or p_1 == j and p_2 == i:
ex_s = Glicko._expected_score(
initial_scores[i],
initial_scores[j]
)
d_sq += Glicko.G**2 * ex_s * (1 - ex_s)
d_sq = (Glicko.Q**2 * d_sq)**-1
# Compute the sum in the numerator for the score update
inner_sum = 0
for j in range(N_PLAYERS):
for ((p_1, p_2), o) in games:
if p_1 == i and p_2 == j:
pass
elif p_1 == j and p_2 == i:
o = 1 - o
else:
continue
ex_score = Glicko._expected_score(
initial_scores[i],
initial_scores[j]
)
inner_sum += Glicko.G * (o - ex_score)
score_update = Glicko.Q / (1 / Glicko.RD**2 + 1 / d_sq) * inner_sum
final_scores.append(initial_scores[i] + score_update)
return final_scores
class SampleElo(Ranker):
N_RANDOM_SAMPLES = 100
@staticmethod
def _permute(lst):
return [t[1] for t in sorted([(random.random(), v) for v in lst])]
@staticmethod
def rank(games):
return np.mean(np.array([
Elo.rank(SampleElo._permute(games))
for _ in range(SampleElo.N_RANDOM_SAMPLES)
]), axis=0)
class SysEqElo(Ranker):
@staticmethod
def _get_games_won(games, i, j):
return sum(a == i and b == j and o == 1 for ((a, b), o) in games)
@staticmethod
def _get_total_games(games, i, j):
return sum(a == i and b == j for ((a, b), o) in games)
@staticmethod
def _get_sys_eq(games):
X = []
b = []
for i in range(0, N_PLAYERS - 1):
for j in range(i + 1, N_PLAYERS):
total = SysEqElo._get_total_games(games, i, j)
if total == 0:
continue
i_won = SysEqElo._get_games_won(games, i, j)
j_won = total - i_won
# [log_10(j_won) - log_10(i_won)] + r_i / 400 - r_j / 400 = 0
eq = np.zeros(N_PLAYERS)
eq[i] = 1 / 400
eq[j] = -1 / 400
X.append(eq)
# FIXME: A math domain error occurs here when a player wins no
# games
b.append(math.log10(i_won) - math.log10(j_won))
return X, b
@staticmethod
def _append_constraint(X, b):
new_constraint = np.zeros(N_PLAYERS)
new_constraint[0] = 1
# We only care about relative ranks, so we anchor to 1200
return np.vstack([X, new_constraint]), np.append(b, 1200)
@staticmethod
def _solve_system(X, b):
return np.linalg.lstsq(X, b, rcond=None)[0]
@staticmethod
def rank(games):
X, b = SysEqElo._get_sys_eq(games)
X, b = SysEqElo._append_constraint(np.array(X), np.array(b))
return SysEqElo._solve_system(X, b)
class PageRank(Ranker):
@staticmethod
def _create_win_loss_matrix(games):
outcomes = np.zeros((N_PLAYERS, N_PLAYERS))
for p in range(N_PLAYERS):
for p_2 in range(N_PLAYERS):
games_p_lost_to_p_2 = sum(
p == i and p_2 == j and o == 0
or p == j and p_2 == i and o == 1
for ((i, j), o) in games
)
outcomes[p_2][p] = games_p_lost_to_p_2
for p in range(N_PLAYERS):
if (outcomes[:, p] == 0).all():
outcomes[:, p] = 1
return outcomes / outcomes.sum(axis=0)
@staticmethod
def rank(games):
mtx = PageRank._create_win_loss_matrix(games)
eigenvalues, eigenvectors = np.linalg.eig(mtx)
for eig, vec in zip(eigenvalues, eigenvectors.T):
if abs(eig - 1) < 10**-5:
if not (vec >= 0).all() and not (vec <= 0).all():
raise RuntimeError(f'Violation of Perron-Frobenius: {vec}')
return np.abs(vec)
raise RuntimeError('Did not find eigenvalue=1')
def get_ranking_algorithms():
return {
'standard_elo': Elo,
# 'sample_elo': SampleElo,
'system_of_equations': SysEqElo,
'page_rank': PageRank,
'glicko': Glicko
}
def get_games(players_strength, n):
outcomes = [0, 1]
players = list(range(N_PLAYERS))
games = []
for _ in range(n):
i, j = np.random.choice(players, size=2, replace=False)
i, j = min(i, j), max(i, j)
denominator = players_strength[i] + players_strength[j]
# 0 if player j wins, 1 if player i wins
outcome = np.random.choice(
outcomes,
p=[
players_strength[j] / denominator,
players_strength[i] / denominator
]
)
games.append(((i, j), outcome))
return games
def get_rankings(lst):
return [i[0] for i in sorted(enumerate(lst), key=lambda x: x[1])]
def get_corr_coefs():
players_strength = [int(random.random() * 1000) for _ in range(N_PLAYERS)]
actual_ranks = get_rankings(players_strength)
LOGGER.debug(f'Player strengths: {actual_ranks}')
games = get_games(players_strength, 500)
coefs = []
for name, alg in get_ranking_algorithms().items():
rankings = get_rankings(alg.rank(games))
coefs.append(np.corrcoef(actual_ranks, rankings)[0, 1])
LOGGER.debug(f'{name}: {coefs[-1]}')
return tuple(coefs)
def avg_corr_coefs(n=400):
# Compare all ranking algorithms
scores = {name: 0 for name in get_ranking_algorithms().keys()}
total_iterations = 0
for i in range(n):
LOGGER.info(f'Iteration: {i}')
try:
for k, v in zip(scores, get_corr_coefs()):
scores[k] += v
except ValueError as ex:
LOGGER.info(ex)
total_iterations += 1
return {k: v / total_iterations for k, v in scores.items()}
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment