Skip to content

Instantly share code, notes, and snippets.

@KarimLulu
Last active December 15, 2019 17:28
Show Gist options
  • Save KarimLulu/29c3846b0a6fd793ae6d426ebfbfc002 to your computer and use it in GitHub Desktop.
Save KarimLulu/29c3846b0a6fd793ae6d426ebfbfc002 to your computer and use it in GitHub Desktop.
Rank-aware evaluation metrics
from collections import Counter
import logging
import numpy as np
from scipy.stats import kendalltau
from typing import List, Any, Union
LOG_FORMAT = ("%(asctime)s - %(levelname)s - "
"%(filename)s:%(lineno)s - %(funcName)s"
" - %(message)s")
LOG_DATE_FORMAT = "%d-%m-%Y %H:%M:%S"
LOG_LEVEL = logging.INFO
logging.basicConfig(level=LOG_LEVEL,
format=LOG_FORMAT,
datefmt=LOG_DATE_FORMAT)
logger = logging.getLogger(__name__)
def ensure_array(data: Union[List[Any], np.ndarray]) -> np.ndarray:
"""
Ensure that an input data is an instance of `np.ndarray`.
Parameters
----------
data : Union[List[Any], np.ndarray]
Input data.
Returns
-------
np.ndarray
Converted data.
"""
if not isinstance(data, np.ndarray):
return np.asarray(data)
return data
def precision_at_k(y: Union[List[Any], np.ndarray],
k: int) -> float:
"""
Precision at k (p@k).
Parameters
----------
y_true : Union[List[Any], np.ndarray]
True relevance scores sorted by predicted relevance scores.
k : int
Number of top predicted items to consider.
Returns
-------
float
Precision at k.
"""
assert k >= 1
a = (ensure_array(y)[:k] != 0)
return a.mean()
def average_precision_at_k(y: Union[List[Any], np.ndarray],
k: int) -> float:
"""
Average precision at k (ap@k).
Parameters
----------
y : Union[List[Any], np.ndarray]
True relevance scores sorted by predicted relevance scores.
k : int
Number of top predicted items to consider.
Returns
-------
float
Average precision at k.
"""
assert k >= 1
y = (ensure_array(y) != 0)[:k]
# calculate p@k only for relevant items.
rez = [precision_at_k(y, i+1) for i in range(y.size) if y[i]] or [0.0]
return np.mean(rez)
def mean_average_precision_at_k(ys: List[Union[List[Any], np.ndarray]],
k: int = None) -> float:
"""
Mean Average Precision at k (map@k).
Parameters
----------
y : List[Union[List[Any], np.ndarray]]
List of true relevance scores (list or numpy) in rank order
(first element is the first item).
k : int, optional
Number of top predicted items to consider.
Returns
-------
float
Mean average precision at k.
"""
return np.mean([average_precision_at_k(y, k or len(y)) for y in ys])
def dcg_at_k(y: Union[List[Any], np.ndarray],
k: int) -> float:
"""
Discounted cumulative gain at k (dcg@k).
Parameters
----------
y : Union[List[Any], np.ndarray]
True relevance scores sorted by predicted relevance scores.
k : int
Number of top predicted items to consider.
Returns
-------
float
Discounted cumulative gain at k (dcg@k).
"""
assert k >= 1
y = ensure_array(y)[:k]
if y.size:
return np.sum((2 ** y - 1) / np.log2(np.arange(y.size) + 2))
return 0.0
def ndcg_at_k(y: Union[List[Any], np.ndarray],
k: int) -> float:
"""
Normalized discounted cumulative gain (ndcg@k).
Parameters
----------
y : Union[List[Any], np.ndarray]
True relevance scores sorted by predicted relevance scores.
k : int
Number of top predicted items to consider.
Returns
-------
float
Normalized discounted cumulative gain (ndcg@k).
"""
assert k >= 1
dcg = dcg_at_k(y, k)
dcg_max = dcg_at_k(sorted(y, reverse=True), k)
if not dcg_max:
return 0.0
return dcg / dcg_max
def reciprocal_rank_at_k(y: Union[List[Any], np.ndarray],
k: int) -> float:
"""
Reciprocal rank at k (rr@k).
Parameters
----------
y : Union[List[Any], np.ndarray]
True relevance scores sorted by predicted relevance scores.
k : int
Number of top predicted items to consider.
Returns
-------
float
Reciprocal rank at k (rr@k).
"""
assert k >= 1
y = ensure_array(y)[:k]
if y.size:
return 1 / (np.nonzero(y)[0] + 1)
return 0.0
def mean_reciprocal_rank_at_k(ys: List[Union[List[Any], np.ndarray]],
k: int = None) -> float:
"""
Mean reciprocal rank at k (mrr@k).
Parameters
----------
y : List[Union[List[Any], np.ndarray]]
List of true relevance scores (list or numpy) in rank order
(first element is the first item).
k : int, optional
Number of top predicted items to consider.
Returns
-------
float
Mean reciprocal rank at k (mrr@k).
"""
return np.mean([reciprocal_rank_at_k(y, k or len(y)) for y in ys])
def sign(a: float) -> bool:
return bool(a > 0) - bool(a < 0)
def get_ties(data: Union[List[Any], np.ndarray]) -> int:
"""
Calculate total number of tied values across all groups.
Parameters
----------
data : Union[List[Any], np.ndarray]
Input data.
Returns
-------
int
Total number of tied values.
"""
total, counter = 0, Counter(data)
for v in counter.values():
total += v * (v - 1) / 2
return total
def kendalltau_b(x: Union[List[Any], np.ndarray],
y: Union[List[Any], np.ndarray]) -> float:
"""
Kendall rank correlation coefficient, commonly referred to as Kendall's tau coefficient.
Reference: https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient#Tau-b.
Parameters
----------
x : Union[List[Any], np.ndarray]
Array of rankings X.
y : Union[List[Any], np.ndarray]
Array of rankings Y.
Returns
-------
float
Kendall's tau coefficient.
"""
assert len(x) == len(y)
numerator, n = 0, len(x)
for i in range(n):
for j in range(i+1, n):
numerator += sign(x[i] - x[j]) * sign(y[i] - y[j])
# handle ties
n0 = n * (n - 1) // 2
n1, n2 = map(lambda x: get_ties(x), [x, y])
return numerator / np.sqrt((n0 - n1) * (n0 - n2))
if __name__ == "__main__":
r = [0, 0, 1]
logger.info(f"Data: {r}")
for k in range(1, len(r)+1):
p_at_k = precision_at_k(r, k)
logger.info(f"P@{k}: {p_at_k:.2f}")
r = [1, 1, 0, 1, 0, 1, 0, 0, 0, 1]
logger.info(f"Data: {r}")
avg_p_at_k = average_precision_at_k(r, len(r))
logger.info(f"AP: {avg_p_at_k:.2f}")
logger.info(f"Data: {[r, [0]]}")
map_at_k = mean_average_precision_at_k([r, [0]])
logger.info(f"MAP: {map_at_k:.2f}")
assert mean_average_precision_at_k([r]) == average_precision_at_k(r, len(r))
assert avg_p_at_k / 2 == map_at_k
r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0]
logger.info(f"Data: {r}")
for k in [1, 3, 5, 7, 10]:
logger.info(f"DCG@{k}: {dcg_at_k(r, k):.2f}")
logger.info(f"NDCG@{k}: {ndcg_at_k(r, k):.2f}")
assert ndcg_at_k(sorted(r, reverse=True), len(r)) == 1.0
rs = [[0, 0, 1], [0, 1, 0], [1, 0, 0]]
logger.info(f"Data: {rs}")
logger.info(f"MRR: {mean_reciprocal_rank_at_k(rs):.2f}")
# Kendall's tau
x = [1, -1, 1, 2]
y = [2, -2, 4, 4]
logger.info(f"Data: {x}, {y}")
logger.info(f"Kendall's tau: {kendalltau_b(x, y)}")
stat = kendalltau(x, y)
logger.info(f"SciPy Kendall's tau: {stat.correlation:.3f}")
assert kendalltau_b(x, x) == kendalltau_b(y, y) == 1.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment