Comparison of different ranking algorithms: https://neelsomani.com/blog/improving-elo-ranking-system.php
 import numpy as np import random import math import logging N_PLAYERS = 5 LOGGER = logging.getLogger('elo') class Ranker: def __init__(self): pass @staticmethod def rank(games): raise NotImplementedError class Elo(Ranker): K_FACTOR = 50 START_SCORE = 1200 @staticmethod def _pr_win(rating_a, rating_b): return 1 / (1 + 10 ** (rating_b - rating_a)) @staticmethod def rank(games): initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] for ((i, j), o) in games: rating_i, rating_j = initial_scores[i], initial_scores[j] rating_i, rating_j = ( rating_i + Elo.K_FACTOR * (o - Elo._pr_win(rating_i, rating_j)), rating_j + Elo.K_FACTOR * ((1 - o) - Elo._pr_win(rating_j, rating_i)) ) initial_scores[i], initial_scores[j] = rating_i, rating_j return np.array(initial_scores) class Glicko(Ranker): RD = 350 Q = .0057565 G = 1 / (1 + 3 * .0057565 ** 2 * 350 ** 2 / 3.14 ** 2) ** .5 @staticmethod def _expected_score(r, r_j): return 1 / (1 + 10 ** (-Glicko.G * (r - r_j) / 400)) @staticmethod def rank(games): initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] final_scores = [] for i in range(N_PLAYERS): # Calculate d_sq d_sq = 0 for j in range(N_PLAYERS): for ((p_1, p_2), o) in games: if p_1 == i and p_2 == j or p_1 == j and p_2 == i: ex_s = Glicko._expected_score( initial_scores[i], initial_scores[j] ) d_sq += Glicko.G**2 * ex_s * (1 - ex_s) d_sq = (Glicko.Q**2 * d_sq)**-1 # Compute the sum in the numerator for the score update inner_sum = 0 for j in range(N_PLAYERS): for ((p_1, p_2), o) in games: if p_1 == i and p_2 == j: pass elif p_1 == j and p_2 == i: o = 1 - o else: continue ex_score = Glicko._expected_score( initial_scores[i], initial_scores[j] ) inner_sum += Glicko.G * (o - ex_score) score_update = Glicko.Q / (1 / Glicko.RD**2 + 1 / d_sq) * inner_sum final_scores.append(initial_scores[i] + score_update) return final_scores class SampleElo(Ranker): N_RANDOM_SAMPLES = 100 @staticmethod def _permute(lst): return [t[1] for t in sorted([(random.random(), v) for v in lst])] @staticmethod def rank(games): return np.mean(np.array([ Elo.rank(SampleElo._permute(games)) for _ in range(SampleElo.N_RANDOM_SAMPLES) ]), axis=0) class SysEqElo(Ranker): @staticmethod def _get_games_won(games, i, j): return sum(a == i and b == j and o == 1 for ((a, b), o) in games) @staticmethod def _get_total_games(games, i, j): return sum(a == i and b == j for ((a, b), o) in games) @staticmethod def _get_sys_eq(games): X = [] b = [] for i in range(0, N_PLAYERS - 1): for j in range(i + 1, N_PLAYERS): total = SysEqElo._get_total_games(games, i, j) if total == 0: continue i_won = SysEqElo._get_games_won(games, i, j) j_won = total - i_won # [log_10(j_won) - log_10(i_won)] + r_i / 400 - r_j / 400 = 0 eq = np.zeros(N_PLAYERS) eq[i] = 1 / 400 eq[j] = -1 / 400 X.append(eq) # FIXME: A math domain error occurs here when a player wins no # games b.append(math.log10(i_won) - math.log10(j_won)) return X, b @staticmethod def _append_constraint(X, b): new_constraint = np.zeros(N_PLAYERS) new_constraint[0] = 1 # We only care about relative ranks, so we anchor to 1200 return np.vstack([X, new_constraint]), np.append(b, 1200) @staticmethod def _solve_system(X, b): return np.linalg.lstsq(X, b, rcond=None)[0] @staticmethod def rank(games): X, b = SysEqElo._get_sys_eq(games) X, b = SysEqElo._append_constraint(np.array(X), np.array(b)) return SysEqElo._solve_system(X, b) class PageRank(Ranker): @staticmethod def _create_win_loss_matrix(games): outcomes = np.zeros((N_PLAYERS, N_PLAYERS)) for p in range(N_PLAYERS): for p_2 in range(N_PLAYERS): games_p_lost_to_p_2 = sum( p == i and p_2 == j and o == 0 or p == j and p_2 == i and o == 1 for ((i, j), o) in games ) outcomes[p_2][p] = games_p_lost_to_p_2 for p in range(N_PLAYERS): if (outcomes[:, p] == 0).all(): outcomes[:, p] = 1 return outcomes / outcomes.sum(axis=0) @staticmethod def rank(games): mtx = PageRank._create_win_loss_matrix(games) eigenvalues, eigenvectors = np.linalg.eig(mtx) for eig, vec in zip(eigenvalues, eigenvectors.T): if abs(eig - 1) < 10**-5: if not (vec >= 0).all() and not (vec <= 0).all(): raise RuntimeError(f'Violation of Perron-Frobenius: {vec}') return np.abs(vec) raise RuntimeError('Did not find eigenvalue=1') def get_ranking_algorithms(): return { 'standard_elo': Elo, # 'sample_elo': SampleElo, 'system_of_equations': SysEqElo, 'page_rank': PageRank, 'glicko': Glicko } def get_games(players_strength, n): outcomes = [0, 1] players = list(range(N_PLAYERS)) games = [] for _ in range(n): i, j = np.random.choice(players, size=2, replace=False) i, j = min(i, j), max(i, j) denominator = players_strength[i] + players_strength[j] # 0 if player j wins, 1 if player i wins outcome = np.random.choice( outcomes, p=[ players_strength[j] / denominator, players_strength[i] / denominator ] ) games.append(((i, j), outcome)) return games def get_rankings(lst): return [i[0] for i in sorted(enumerate(lst), key=lambda x: x[1])] def get_corr_coefs(): players_strength = [int(random.random() * 1000) for _ in range(N_PLAYERS)] actual_ranks = get_rankings(players_strength) LOGGER.debug(f'Player strengths: {actual_ranks}') games = get_games(players_strength, 500) coefs = [] for name, alg in get_ranking_algorithms().items(): rankings = get_rankings(alg.rank(games)) coefs.append(np.corrcoef(actual_ranks, rankings)[0, 1]) LOGGER.debug(f'{name}: {coefs[-1]}') return tuple(coefs) def avg_corr_coefs(n=400): # Compare all ranking algorithms scores = {name: 0 for name in get_ranking_algorithms().keys()} total_iterations = 0 for i in range(n): LOGGER.info(f'Iteration: {i}') try: for k, v in zip(scores, get_corr_coefs()): scores[k] += v except ValueError as ex: LOGGER.info(ex) total_iterations += 1 return {k: v / total_iterations for k, v in scores.items()} if __name__ == '__main__': logging.basicConfig(level=logging.INFO)