Skip to content

Instantly share code, notes, and snippets.

@tgsmith61591
Last active March 21, 2024 06:36
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save tgsmith61591/d8aa96ac7c74c24b33e4b0cb967ca519 to your computer and use it in GitHub Desktop.
Save tgsmith61591/d8aa96ac7c74c24b33e4b0cb967ca519 to your computer and use it in GitHub Desktop.
Ranking metrics for recommender systems
# -*- coding: utf-8 -*-
#
# Author: Taylor G Smith
#
# Recommender system ranking metrics derived from Spark source for use with
# Python-based recommender libraries (i.e., implicit,
# http://github.com/benfred/implicit/). These metrics are derived from the
# original Spark Scala source code for recommender metrics.
# https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala
from __future__ import absolute_import, division
import numpy as np
import warnings
__all__ = [
'mean_average_precision',
'ndcg_at',
'precision_at',
]
try:
xrange
except NameError: # python 3 does not have an 'xrange'
xrange = range
def _require_positive_k(k):
"""Helper function to avoid copy/pasted code for validating K"""
if k <= 0:
raise ValueError("ranking position k should be positive")
def _mean_ranking_metric(predictions, labels, metric):
"""Helper function for precision_at_k and mean_average_precision"""
# do not zip, as this will require an extra pass of O(N). Just assert
# equal length and index (compute in ONE pass of O(N)).
# if len(predictions) != len(labels):
# raise ValueError("dim mismatch in predictions and labels!")
# return np.mean([
# metric(np.asarray(predictions[i]), np.asarray(labels[i]))
# for i in xrange(len(predictions))
# ])
# Actually probably want lazy evaluation in case preds is a
# generator, since preds can be very dense and could blow up
# memory... but how to assert lengths equal? FIXME
return np.mean([
metric(np.asarray(prd), np.asarray(labels[i]))
for i, prd in enumerate(predictions) # lazy eval if generator
])
def _warn_for_empty_labels():
"""Helper for missing ground truth sets"""
warnings.warn("Empty ground truth set! Check input data")
return 0.
def precision_at(predictions, labels, k=10, assume_unique=True):
"""Compute the precision at K.
Compute the average precision of all the queries, truncated at
ranking position k. If for a query, the ranking algorithm returns
n (n is less than k) results, the precision value will be computed
as #(relevant items retrieved) / k. This formula also applies when
the size of the ground truth set is less than k.
If a query has an empty ground truth set, zero will be used as
precision together with a warning.
Parameters
----------
predictions : array-like, shape=(n_predictions,)
The prediction array. The items that were predicted, in descending
order of relevance.
labels : array-like, shape=(n_ratings,)
The labels (positively-rated items).
k : int, optional (default=10)
The rank at which to measure the precision.
assume_unique : bool, optional (default=True)
Whether to assume the items in the labels and predictions are each
unique. That is, the same item is not predicted multiple times or
rated multiple times.
Examples
--------
>>> # predictions for 3 users
>>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
... [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
... [1, 2, 3, 4, 5]]
>>> # labels for the 3 users
>>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
>>> precision_at(preds, labels, 1)
0.33333333333333331
>>> precision_at(preds, labels, 5)
0.26666666666666666
>>> precision_at(preds, labels, 15)
0.17777777777777778
"""
# validate K
_require_positive_k(k)
def _inner_pk(pred, lab):
# need to compute the count of the number of values in the predictions
# that are present in the labels. We'll use numpy in1d for this (set
# intersection in O(1))
if lab.shape[0] > 0:
n = min(pred.shape[0], k)
cnt = np.in1d(pred[:n], lab, assume_unique=assume_unique).sum()
return float(cnt) / k
else:
return _warn_for_empty_labels()
return _mean_ranking_metric(predictions, labels, _inner_pk)
def mean_average_precision(predictions, labels, assume_unique=True):
"""Compute the mean average precision on predictions and labels.
Returns the mean average precision (MAP) of all the queries. If a query
has an empty ground truth set, the average precision will be zero and a
warning is generated.
Parameters
----------
predictions : array-like, shape=(n_predictions,)
The prediction array. The items that were predicted, in descending
order of relevance.
labels : array-like, shape=(n_ratings,)
The labels (positively-rated items).
assume_unique : bool, optional (default=True)
Whether to assume the items in the labels and predictions are each
unique. That is, the same item is not predicted multiple times or
rated multiple times.
Examples
--------
>>> # predictions for 3 users
>>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
... [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
... [1, 2, 3, 4, 5]]
>>> # labels for the 3 users
>>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
>>> mean_average_precision(preds, labels)
0.35502645502645497
"""
def _inner_map(pred, lab):
if lab.shape[0]:
# compute the number of elements within the predictions that are
# present in the actual labels, and get the cumulative sum weighted
# by the index of the ranking
n = pred.shape[0]
# Scala code from Spark source:
# var i = 0
# var cnt = 0
# var precSum = 0.0
# val n = pred.length
# while (i < n) {
# if (labSet.contains(pred(i))) {
# cnt += 1
# precSum += cnt.toDouble / (i + 1)
# }
# i += 1
# }
# precSum / labSet.size
arange = np.arange(n, dtype=np.float32) + 1. # this is the denom
present = np.in1d(pred[:n], lab, assume_unique=assume_unique)
prec_sum = np.ones(present.sum()).cumsum()
denom = arange[present]
return (prec_sum / denom).sum() / lab.shape[0]
else:
return _warn_for_empty_labels()
return _mean_ranking_metric(predictions, labels, _inner_map)
def ndcg_at(predictions, labels, k=10, assume_unique=True):
"""Compute the normalized discounted cumulative gain at K.
Compute the average NDCG value of all the queries, truncated at ranking
position k. The discounted cumulative gain at position k is computed as:
sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1)
and the NDCG is obtained by dividing the DCG value on the ground truth set.
In the current implementation, the relevance value is binary.
If a query has an empty ground truth set, zero will be used as
NDCG together with a warning.
Parameters
----------
predictions : array-like, shape=(n_predictions,)
The prediction array. The items that were predicted, in descending
order of relevance.
labels : array-like, shape=(n_ratings,)
The labels (positively-rated items).
k : int, optional (default=10)
The rank at which to measure the NDCG.
assume_unique : bool, optional (default=True)
Whether to assume the items in the labels and predictions are each
unique. That is, the same item is not predicted multiple times or
rated multiple times.
Examples
--------
>>> # predictions for 3 users
>>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
... [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
... [1, 2, 3, 4, 5]]
>>> # labels for the 3 users
>>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
>>> ndcg_at(preds, labels, 3)
0.3333333432674408
>>> ndcg_at(preds, labels, 10)
0.48791273434956867
References
----------
.. [1] K. Jarvelin and J. Kekalainen, "IR evaluation methods for
retrieving highly relevant documents."
"""
# validate K
_require_positive_k(k)
def _inner_ndcg(pred, lab):
if lab.shape[0]:
# if we do NOT assume uniqueness, the set is a bit different here
if not assume_unique:
lab = np.unique(lab)
n_lab = lab.shape[0]
n_pred = pred.shape[0]
n = min(max(n_pred, n_lab), k) # min(min(p, l), k)?
# similar to mean_avg_prcsn, we need an arange, but this time +2
# since python is zero-indexed, and the denom typically needs +1.
# Also need the log base2...
arange = np.arange(n, dtype=np.float32) # length n
# since we are only interested in the arange up to n_pred, truncate
# if necessary
arange = arange[:n_pred]
denom = np.log2(arange + 2.) # length n
gains = 1. / denom # length n
# compute the gains where the prediction is present in the labels
dcg_mask = np.in1d(pred[:n], lab, assume_unique=assume_unique)
dcg = gains[dcg_mask].sum()
# the max DCG is sum of gains where the index < the label set size
max_dcg = gains[arange < n_lab].sum()
return dcg / max_dcg
else:
return _warn_for_empty_labels()
return _mean_ranking_metric(predictions, labels, _inner_ndcg)
@huang-xx
Copy link

huang-xx commented Feb 2, 2023

Update mean_average_precision to mean_average_precision_at

# -*- coding: utf-8 -*-
#
# Author: Taylor G Smith
#
# Recommender system ranking metrics derived from Spark source for use with
# Python-based recommender libraries (i.e., implicit,
# http://github.com/benfred/implicit/). These metrics are derived from the
# original Spark Scala source code for recommender metrics.
# https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala

from __future__ import absolute_import, division

import numpy as np

import warnings

__all__ = [
    "mean_average_precision_at",
    "ndcg_at",
    "precision_at",
]

try:
    xrange
except NameError:  # python 3 does not have an 'xrange'
    xrange = range


def _require_positive_k(k):
    """Helper function to avoid copy/pasted code for validating K"""
    if k <= 0:
        raise ValueError("ranking position k should be positive")


def _mean_ranking_metric(predictions, labels, k, metric):
    """Helper function for precision_at_k and mean_average_precision"""
    # do not zip, as this will require an extra pass of O(N). Just assert
    # equal length and index (compute in ONE pass of O(N)).
    # if len(predictions) != len(labels):
    #     raise ValueError("dim mismatch in predictions and labels!")
    # return np.mean([
    #     metric(np.asarray(predictions[i]), np.asarray(labels[i]))
    #     for i in xrange(len(predictions))
    # ])

    # Actually probably want lazy evaluation in case preds is a
    # generator, since preds can be very dense and could blow up
    # memory... but how to assert lengths equal? FIXME
    return np.mean(
        [
            metric(np.asarray(prd), np.asarray(labels[i]), k)
            for i, prd in enumerate(predictions)  # lazy eval if generator
        ]
    )


def _warn_for_empty_labels():
    """Helper for missing ground truth sets"""
    warnings.warn("Empty ground truth set! Check input data")
    return 0.0


def precision_at(predictions, labels, k=10, assume_unique=True):
    """Compute the precision at K.

    Compute the average precision of all the queries, truncated at
    ranking position k. If for a query, the ranking algorithm returns
    n (n is less than k) results, the precision value will be computed
    as #(relevant items retrieved) / k. This formula also applies when
    the size of the ground truth set is less than k.

    If a query has an empty ground truth set, zero will be used as
    precision together with a warning.

    Parameters
    ----------
    predictions : array-like, shape=(n_predictions,)
        The prediction array. The items that were predicted, in descending
        order of relevance.

    labels : array-like, shape=(n_ratings,)
        The labels (positively-rated items).

    k : int, optional (default=10)
        The rank at which to measure the precision.

    assume_unique : bool, optional (default=True)
        Whether to assume the items in the labels and predictions are each
        unique. That is, the same item is not predicted multiple times or
        rated multiple times.

    Examples
    --------
    >>> # predictions for 3 users
    >>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
    ...          [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
    ...          [1, 2, 3, 4, 5]]
    >>> # labels for the 3 users
    >>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
    >>> precision_at(preds, labels, 1)
    0.33333333333333331
    >>> precision_at(preds, labels, 5)
    0.26666666666666666
    >>> precision_at(preds, labels, 15)
    0.17777777777777778
    """
    # validate K
    _require_positive_k(k)

    def _inner_pk(pred, lab, k=10):
        # need to compute the count of the number of values in the predictions
        # that are present in the labels. We'll use numpy in1d for this (set
        # intersection in O(1))
        if lab.shape[0] > 0:
            n = min(pred.shape[0], k)
            cnt = np.in1d(pred[:n], lab, assume_unique=assume_unique).sum()
            return float(cnt) / k
        else:
            return _warn_for_empty_labels()

    return _mean_ranking_metric(predictions, labels, k, _inner_pk)


def mean_average_precision_at(predictions, labels, k=10, assume_unique=True):
    """Compute the mean average precision on predictions and labels.

    Returns the mean average precision (MAP) of all the queries. If a query
    has an empty ground truth set, the average precision will be zero and a
    warning is generated.

    Parameters
    ----------
    predictions : array-like, shape=(n_predictions,)
        The prediction array. The items that were predicted, in descending
        order of relevance.

    labels : array-like, shape=(n_ratings,)
        The labels (positively-rated items).

    assume_unique : bool, optional (default=True)
        Whether to assume the items in the labels and predictions are each
        unique. That is, the same item is not predicted multiple times or
        rated multiple times.

    Examples
    --------
    >>> # predictions for 3 users
    >>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
    ...          [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
    ...          [1, 2, 3, 4, 5]]
    >>> # labels for the 3 users
    >>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
    >>>mean_average_precision_at(preds, labels)
    0.35502645502645497
    >>>mean_average_precision_at(preds, labels, k=10)
    0.35502645502645497
    >>>mean_average_precision_at(preds, labels, k=3)
    0.2407407407407407
    >>>mean_average_precision_at(preds, labels, k=5)
    0.2111111111111111
    """

    def _inner_map(pred, lab, k=10):
        if lab.shape[0]:
            # compute the number of elements within the predictions that are
            # present in the actual labels, and get the cumulative sum weighted
            # by the index of the ranking
            n = min(pred.shape[0], k)

            # var i = 0
            # var cnt = 0
            # var precSum = 0.0
            # val n = math.min(k, pred.length)
            # while (i < n) {
            #     if (lab.contains(pred(i))) {
            #     cnt += 1
            #     precSum += cnt.toDouble / (i + 1)
            #     }
            #     i += 1
            # }
            # precSum / math.min(lab.size, k)

            arange = np.arange(n, dtype=np.float32) + 1.0  # this is the denom
            present = np.in1d(pred[:n], lab, assume_unique=assume_unique)
            prec_sum = np.ones(present.sum()).cumsum()
            denom = arange[present]
            return (prec_sum / denom).sum() / min(lab.shape[0], k)

        else:
            return _warn_for_empty_labels()

    return _mean_ranking_metric(predictions, labels, k, _inner_map)


def ndcg_at(predictions, labels, k=10, assume_unique=True):
    """Compute the normalized discounted cumulative gain at K.

    Compute the average NDCG value of all the queries, truncated at ranking
    position k. The discounted cumulative gain at position k is computed as:

        sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1)

    and the NDCG is obtained by dividing the DCG value on the ground truth set.
    In the current implementation, the relevance value is binary.

    If a query has an empty ground truth set, zero will be used as
    NDCG together with a warning.

    Parameters
    ----------
    predictions : array-like, shape=(n_predictions,)
        The prediction array. The items that were predicted, in descending
        order of relevance.

    labels : array-like, shape=(n_ratings,)
        The labels (positively-rated items).

    k : int, optional (default=10)
        The rank at which to measure the NDCG.

    assume_unique : bool, optional (default=True)
        Whether to assume the items in the labels and predictions are each
        unique. That is, the same item is not predicted multiple times or
        rated multiple times.

    Examples
    --------
    >>> # predictions for 3 users
    >>> preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5],
    ...          [4, 1, 5, 6, 2, 7, 3, 8, 9, 10],
    ...          [1, 2, 3, 4, 5]]
    >>> # labels for the 3 users
    >>> labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]
    >>> ndcg_at(preds, labels, 3)
    0.3333333432674408
    >>> ndcg_at(preds, labels, 10)
    0.48791273434956867

    References
    ----------
    .. [1] K. Jarvelin and J. Kekalainen, "IR evaluation methods for
           retrieving highly relevant documents."
    """
    # validate K
    _require_positive_k(k)

    def _inner_ndcg(pred, lab, k=10):
        if lab.shape[0]:
            # if we do NOT assume uniqueness, the set is a bit different here
            if not assume_unique:
                lab = np.unique(lab)

            n_lab = lab.shape[0]
            n_pred = pred.shape[0]
            n = min(max(n_pred, n_lab), k)  # min(min(p, l), k)?

            # similar to mean_avg_prcsn, we need an arange, but this time +2
            # since python is zero-indexed, and the denom typically needs +1.
            # Also need the log base2...
            arange = np.arange(n, dtype=np.float32)  # length n

            # since we are only interested in the arange up to n_pred, truncate
            # if necessary
            arange = arange[:n_pred]
            denom = np.log2(arange + 2.0)  # length n
            gains = 1.0 / denom  # length n

            # compute the gains where the prediction is present in the labels
            dcg_mask = np.in1d(pred[:n], lab, assume_unique=assume_unique)
            dcg = gains[dcg_mask].sum()

            # the max DCG is sum of gains where the index < the label set size
            max_dcg = gains[arange < n_lab].sum()
            return dcg / max_dcg

        else:
            return _warn_for_empty_labels()

    return _mean_ranking_metric(predictions, labels, k, _inner_ndcg)


"""
if __name__=="__main__":
    preds = [[1, 6, 2, 7, 8, 3, 9, 10, 4, 5], [4, 1, 5, 6, 2, 7, 3, 8, 9, 10], [1, 2, 3, 4, 5] ]
    labels = [[1, 2, 3, 4, 5], [1, 2, 3], []]

    for k in [1, 5, 10]:
        print(f"Precision@{k}: {precision_at(preds, labels, k)}")
        print(f"MAP@{k}: {mean_average_precision_at(preds, labels, k)}")
        print(f"NGCD@{k}: {ndcg_at(preds, labels, k)}")
        # print(f"NGCD@{k}: {ndcg_at(preds, labels, k)}")
        print('-' * 10)

Precision@1: 0.3333333333333333
MAP@1: 0.3333333333333333
NGCD@1: 0.3333333333333333
----------
Precision@5: 0.26666666666666666
MAP@5: 0.2111111111111111
NGCD@5: 0.32878801226615906
----------
Precision@10: 0.26666666666666666
MAP@10: 0.35502645502645497
NGCD@10: 0.48791273434956867
----------


from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import RankingMetrics

spark = SparkSession\
        .builder\
        .appName("RankingMetrics")\
        .getOrCreate()

predictionAndLabels = spark.sparkContext.parallelize([
    ([1, 6, 2, 7, 8, 3, 9, 10, 4, 5], [1, 2, 3, 4, 5]),
    ([4, 1, 5, 6, 2, 7, 3, 8, 9, 10], [1, 2, 3]),
    ([1, 2, 3, 4, 5], [])])
metrics = RankingMetrics(predictionAndLabels)

for k in [1, 5, 10]:
    print(f"Precision@{k}: {metrics.precisionAt(k)}")
    print(f"MAP@{k}: {metrics.meanAveragePrecisionAt(k)}")
    print(f"NGCD@{k}: {metrics.ndcgAt(k)}")
    print('-' * 10)

spark.stop()


Precision@1: 0.3333333333333333
MAP@1: 0.3333333333333333
NGCD@1: 0.3333333333333333
----------
Precision@5: 0.26666666666666666
MAP@5: 0.2111111111111111
NGCD@5: 0.3287880038045473
----------
Precision@10: 0.26666666666666666
MAP@10: 0.35502645502645497
NGCD@10: 0.48791274569166343
----------
"""

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment