Last active
December 15, 2019 17:28
-
-
Save KarimLulu/29c3846b0a6fd793ae6d426ebfbfc002 to your computer and use it in GitHub Desktop.
Rank-aware evaluation metrics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import Counter | |
import logging | |
import numpy as np | |
from scipy.stats import kendalltau | |
from typing import List, Any, Union | |
LOG_FORMAT = ("%(asctime)s - %(levelname)s - " | |
"%(filename)s:%(lineno)s - %(funcName)s" | |
" - %(message)s") | |
LOG_DATE_FORMAT = "%d-%m-%Y %H:%M:%S" | |
LOG_LEVEL = logging.INFO | |
logging.basicConfig(level=LOG_LEVEL, | |
format=LOG_FORMAT, | |
datefmt=LOG_DATE_FORMAT) | |
logger = logging.getLogger(__name__) | |
def ensure_array(data: Union[List[Any], np.ndarray]) -> np.ndarray: | |
""" | |
Ensure that an input data is an instance of `np.ndarray`. | |
Parameters | |
---------- | |
data : Union[List[Any], np.ndarray] | |
Input data. | |
Returns | |
------- | |
np.ndarray | |
Converted data. | |
""" | |
if not isinstance(data, np.ndarray): | |
return np.asarray(data) | |
return data | |
def precision_at_k(y: Union[List[Any], np.ndarray], | |
k: int) -> float: | |
""" | |
Precision at k (p@k). | |
Parameters | |
---------- | |
y_true : Union[List[Any], np.ndarray] | |
True relevance scores sorted by predicted relevance scores. | |
k : int | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Precision at k. | |
""" | |
assert k >= 1 | |
a = (ensure_array(y)[:k] != 0) | |
return a.mean() | |
def average_precision_at_k(y: Union[List[Any], np.ndarray], | |
k: int) -> float: | |
""" | |
Average precision at k (ap@k). | |
Parameters | |
---------- | |
y : Union[List[Any], np.ndarray] | |
True relevance scores sorted by predicted relevance scores. | |
k : int | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Average precision at k. | |
""" | |
assert k >= 1 | |
y = (ensure_array(y) != 0)[:k] | |
# calculate p@k only for relevant items. | |
rez = [precision_at_k(y, i+1) for i in range(y.size) if y[i]] or [0.0] | |
return np.mean(rez) | |
def mean_average_precision_at_k(ys: List[Union[List[Any], np.ndarray]], | |
k: int = None) -> float: | |
""" | |
Mean Average Precision at k (map@k). | |
Parameters | |
---------- | |
y : List[Union[List[Any], np.ndarray]] | |
List of true relevance scores (list or numpy) in rank order | |
(first element is the first item). | |
k : int, optional | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Mean average precision at k. | |
""" | |
return np.mean([average_precision_at_k(y, k or len(y)) for y in ys]) | |
def dcg_at_k(y: Union[List[Any], np.ndarray], | |
k: int) -> float: | |
""" | |
Discounted cumulative gain at k (dcg@k). | |
Parameters | |
---------- | |
y : Union[List[Any], np.ndarray] | |
True relevance scores sorted by predicted relevance scores. | |
k : int | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Discounted cumulative gain at k (dcg@k). | |
""" | |
assert k >= 1 | |
y = ensure_array(y)[:k] | |
if y.size: | |
return np.sum((2 ** y - 1) / np.log2(np.arange(y.size) + 2)) | |
return 0.0 | |
def ndcg_at_k(y: Union[List[Any], np.ndarray], | |
k: int) -> float: | |
""" | |
Normalized discounted cumulative gain (ndcg@k). | |
Parameters | |
---------- | |
y : Union[List[Any], np.ndarray] | |
True relevance scores sorted by predicted relevance scores. | |
k : int | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Normalized discounted cumulative gain (ndcg@k). | |
""" | |
assert k >= 1 | |
dcg = dcg_at_k(y, k) | |
dcg_max = dcg_at_k(sorted(y, reverse=True), k) | |
if not dcg_max: | |
return 0.0 | |
return dcg / dcg_max | |
def reciprocal_rank_at_k(y: Union[List[Any], np.ndarray], | |
k: int) -> float: | |
""" | |
Reciprocal rank at k (rr@k). | |
Parameters | |
---------- | |
y : Union[List[Any], np.ndarray] | |
True relevance scores sorted by predicted relevance scores. | |
k : int | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Reciprocal rank at k (rr@k). | |
""" | |
assert k >= 1 | |
y = ensure_array(y)[:k] | |
if y.size: | |
return 1 / (np.nonzero(y)[0] + 1) | |
return 0.0 | |
def mean_reciprocal_rank_at_k(ys: List[Union[List[Any], np.ndarray]], | |
k: int = None) -> float: | |
""" | |
Mean reciprocal rank at k (mrr@k). | |
Parameters | |
---------- | |
y : List[Union[List[Any], np.ndarray]] | |
List of true relevance scores (list or numpy) in rank order | |
(first element is the first item). | |
k : int, optional | |
Number of top predicted items to consider. | |
Returns | |
------- | |
float | |
Mean reciprocal rank at k (mrr@k). | |
""" | |
return np.mean([reciprocal_rank_at_k(y, k or len(y)) for y in ys]) | |
def sign(a: float) -> bool: | |
return bool(a > 0) - bool(a < 0) | |
def get_ties(data: Union[List[Any], np.ndarray]) -> int: | |
""" | |
Calculate total number of tied values across all groups. | |
Parameters | |
---------- | |
data : Union[List[Any], np.ndarray] | |
Input data. | |
Returns | |
------- | |
int | |
Total number of tied values. | |
""" | |
total, counter = 0, Counter(data) | |
for v in counter.values(): | |
total += v * (v - 1) / 2 | |
return total | |
def kendalltau_b(x: Union[List[Any], np.ndarray], | |
y: Union[List[Any], np.ndarray]) -> float: | |
""" | |
Kendall rank correlation coefficient, commonly referred to as Kendall's tau coefficient. | |
Reference: https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient#Tau-b. | |
Parameters | |
---------- | |
x : Union[List[Any], np.ndarray] | |
Array of rankings X. | |
y : Union[List[Any], np.ndarray] | |
Array of rankings Y. | |
Returns | |
------- | |
float | |
Kendall's tau coefficient. | |
""" | |
assert len(x) == len(y) | |
numerator, n = 0, len(x) | |
for i in range(n): | |
for j in range(i+1, n): | |
numerator += sign(x[i] - x[j]) * sign(y[i] - y[j]) | |
# handle ties | |
n0 = n * (n - 1) // 2 | |
n1, n2 = map(lambda x: get_ties(x), [x, y]) | |
return numerator / np.sqrt((n0 - n1) * (n0 - n2)) | |
if __name__ == "__main__": | |
r = [0, 0, 1] | |
logger.info(f"Data: {r}") | |
for k in range(1, len(r)+1): | |
p_at_k = precision_at_k(r, k) | |
logger.info(f"P@{k}: {p_at_k:.2f}") | |
r = [1, 1, 0, 1, 0, 1, 0, 0, 0, 1] | |
logger.info(f"Data: {r}") | |
avg_p_at_k = average_precision_at_k(r, len(r)) | |
logger.info(f"AP: {avg_p_at_k:.2f}") | |
logger.info(f"Data: {[r, [0]]}") | |
map_at_k = mean_average_precision_at_k([r, [0]]) | |
logger.info(f"MAP: {map_at_k:.2f}") | |
assert mean_average_precision_at_k([r]) == average_precision_at_k(r, len(r)) | |
assert avg_p_at_k / 2 == map_at_k | |
r = [3, 2, 3, 0, 0, 1, 2, 2, 3, 0] | |
logger.info(f"Data: {r}") | |
for k in [1, 3, 5, 7, 10]: | |
logger.info(f"DCG@{k}: {dcg_at_k(r, k):.2f}") | |
logger.info(f"NDCG@{k}: {ndcg_at_k(r, k):.2f}") | |
assert ndcg_at_k(sorted(r, reverse=True), len(r)) == 1.0 | |
rs = [[0, 0, 1], [0, 1, 0], [1, 0, 0]] | |
logger.info(f"Data: {rs}") | |
logger.info(f"MRR: {mean_reciprocal_rank_at_k(rs):.2f}") | |
# Kendall's tau | |
x = [1, -1, 1, 2] | |
y = [2, -2, 4, 4] | |
logger.info(f"Data: {x}, {y}") | |
logger.info(f"Kendall's tau: {kendalltau_b(x, y)}") | |
stat = kendalltau(x, y) | |
logger.info(f"SciPy Kendall's tau: {stat.correlation:.3f}") | |
assert kendalltau_b(x, x) == kendalltau_b(y, y) == 1.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment