Instantly share code, notes, and snippets.

# neelsomani/elo.py

Created February 3, 2022 06:15
Show Gist options
• Save neelsomani/1e9dd03240278f754e3e79bd2739e3b2 to your computer and use it in GitHub Desktop.
Comparison of different ranking algorithms: https://neelsomani.com/blog/improving-elo-ranking-system.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
 import numpy as np import random import math import logging N_PLAYERS = 5 LOGGER = logging.getLogger('elo') class Ranker: def __init__(self): pass @staticmethod def rank(games): raise NotImplementedError class Elo(Ranker): K_FACTOR = 50 START_SCORE = 1200 @staticmethod def _pr_win(rating_a, rating_b): return 1 / (1 + 10 ** (rating_b - rating_a)) @staticmethod def rank(games): initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] for ((i, j), o) in games: rating_i, rating_j = initial_scores[i], initial_scores[j] rating_i, rating_j = ( rating_i + Elo.K_FACTOR * (o - Elo._pr_win(rating_i, rating_j)), rating_j + Elo.K_FACTOR * ((1 - o) - Elo._pr_win(rating_j, rating_i)) ) initial_scores[i], initial_scores[j] = rating_i, rating_j return np.array(initial_scores) class Glicko(Ranker): RD = 350 Q = .0057565 G = 1 / (1 + 3 * .0057565 ** 2 * 350 ** 2 / 3.14 ** 2) ** .5 @staticmethod def _expected_score(r, r_j): return 1 / (1 + 10 ** (-Glicko.G * (r - r_j) / 400)) @staticmethod def rank(games): initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] final_scores = [] for i in range(N_PLAYERS): # Calculate d_sq d_sq = 0 for j in range(N_PLAYERS): for ((p_1, p_2), o) in games: if p_1 == i and p_2 == j or p_1 == j and p_2 == i: ex_s = Glicko._expected_score( initial_scores[i], initial_scores[j] ) d_sq += Glicko.G**2 * ex_s * (1 - ex_s) d_sq = (Glicko.Q**2 * d_sq)**-1 # Compute the sum in the numerator for the score update inner_sum = 0 for j in range(N_PLAYERS): for ((p_1, p_2), o) in games: if p_1 == i and p_2 == j: pass elif p_1 == j and p_2 == i: o = 1 - o else: continue ex_score = Glicko._expected_score( initial_scores[i], initial_scores[j] ) inner_sum += Glicko.G * (o - ex_score) score_update = Glicko.Q / (1 / Glicko.RD**2 + 1 / d_sq) * inner_sum final_scores.append(initial_scores[i] + score_update) return final_scores class SampleElo(Ranker): N_RANDOM_SAMPLES = 100 @staticmethod def _permute(lst): return [t[1] for t in sorted([(random.random(), v) for v in lst])] @staticmethod def rank(games): return np.mean(np.array([ Elo.rank(SampleElo._permute(games)) for _ in range(SampleElo.N_RANDOM_SAMPLES) ]), axis=0) class SysEqElo(Ranker): @staticmethod def _get_games_won(games, i, j): return sum(a == i and b == j and o == 1 for ((a, b), o) in games) @staticmethod def _get_total_games(games, i, j): return sum(a == i and b == j for ((a, b), o) in games) @staticmethod def _get_sys_eq(games): X = [] b = [] for i in range(0, N_PLAYERS - 1): for j in range(i + 1, N_PLAYERS): total = SysEqElo._get_total_games(games, i, j) if total == 0: continue i_won = SysEqElo._get_games_won(games, i, j) j_won = total - i_won # [log_10(j_won) - log_10(i_won)] + r_i / 400 - r_j / 400 = 0 eq = np.zeros(N_PLAYERS) eq[i] = 1 / 400 eq[j] = -1 / 400 X.append(eq) # FIXME: A math domain error occurs here when a player wins no # games b.append(math.log10(i_won) - math.log10(j_won)) return X, b @staticmethod def _append_constraint(X, b): new_constraint = np.zeros(N_PLAYERS) new_constraint[0] = 1 # We only care about relative ranks, so we anchor to 1200 return np.vstack([X, new_constraint]), np.append(b, 1200) @staticmethod def _solve_system(X, b): return np.linalg.lstsq(X, b, rcond=None)[0] @staticmethod def rank(games): X, b = SysEqElo._get_sys_eq(games) X, b = SysEqElo._append_constraint(np.array(X), np.array(b)) return SysEqElo._solve_system(X, b) class PageRank(Ranker): @staticmethod def _create_win_loss_matrix(games): outcomes = np.zeros((N_PLAYERS, N_PLAYERS)) for p in range(N_PLAYERS): for p_2 in range(N_PLAYERS): games_p_lost_to_p_2 = sum( p == i and p_2 == j and o == 0 or p == j and p_2 == i and o == 1 for ((i, j), o) in games ) outcomes[p_2][p] = games_p_lost_to_p_2 for p in range(N_PLAYERS): if (outcomes[:, p] == 0).all(): outcomes[:, p] = 1 return outcomes / outcomes.sum(axis=0) @staticmethod def rank(games): mtx = PageRank._create_win_loss_matrix(games) eigenvalues, eigenvectors = np.linalg.eig(mtx) for eig, vec in zip(eigenvalues, eigenvectors.T): if abs(eig - 1) < 10**-5: if not (vec >= 0).all() and not (vec <= 0).all(): raise RuntimeError(f'Violation of Perron-Frobenius: {vec}') return np.abs(vec) raise RuntimeError('Did not find eigenvalue=1') def get_ranking_algorithms(): return { 'standard_elo': Elo, # 'sample_elo': SampleElo, 'system_of_equations': SysEqElo, 'page_rank': PageRank, 'glicko': Glicko } def get_games(players_strength, n): outcomes = [0, 1] players = list(range(N_PLAYERS)) games = [] for _ in range(n): i, j = np.random.choice(players, size=2, replace=False) i, j = min(i, j), max(i, j) denominator = players_strength[i] + players_strength[j] # 0 if player j wins, 1 if player i wins outcome = np.random.choice( outcomes, p=[ players_strength[j] / denominator, players_strength[i] / denominator ] ) games.append(((i, j), outcome)) return games def get_rankings(lst): return [i[0] for i in sorted(enumerate(lst), key=lambda x: x[1])] def get_corr_coefs(): players_strength = [int(random.random() * 1000) for _ in range(N_PLAYERS)] actual_ranks = get_rankings(players_strength) LOGGER.debug(f'Player strengths: {actual_ranks}') games = get_games(players_strength, 500) coefs = [] for name, alg in get_ranking_algorithms().items(): rankings = get_rankings(alg.rank(games)) coefs.append(np.corrcoef(actual_ranks, rankings)[0, 1]) LOGGER.debug(f'{name}: {coefs[-1]}') return tuple(coefs) def avg_corr_coefs(n=400): # Compare all ranking algorithms scores = {name: 0 for name in get_ranking_algorithms().keys()} total_iterations = 0 for i in range(n): LOGGER.info(f'Iteration: {i}') try: for k, v in zip(scores, get_corr_coefs()): scores[k] += v except ValueError as ex: LOGGER.info(ex) total_iterations += 1 return {k: v / total_iterations for k, v in scores.items()} if __name__ == '__main__': logging.basicConfig(level=logging.INFO)