Skip to content

Instantly share code, notes, and snippets.

@alexandru-dinu
Last active October 14, 2021 09:16
Show Gist options
  • Save alexandru-dinu/d973b212cc33e69a6b9b950ebd8b8fc3 to your computer and use it in GitHub Desktop.
Save alexandru-dinu/d973b212cc33e69a6b9b950ebd8b8fc3 to your computer and use it in GitHub Desktop.
Maximal Marginal Relevance (Carbonell and Goldstein, 1998)
import numpy as np
import torch as T
def maximal_marginal_relevance(
score_di_q: T.Tensor,
score_di_dj: T.Tensor,
*,
lam: float,
num_iters: int,
score_type: str,
) -> np.ndarray:
"""Computes the Maximal Marginal Relevance (Carbonell and Goldstein, 1998) given two similarity arrays.
Score can be defined in terms of similarity or distance. Similarity is maximized, distance is minimized.
Note:
`.cpu()` is called to account for cases when the tensors are on GPU.
If they are already in RAM, then `.cpu()` is a no-op.
Args:
score_di_q (T.Tensor): Scores between candidates and query, shape: (N,).
score_di_dj (T.Tensor): Pairwise scores between candidates, shape: (N, N).
lam (float): Lambda parameter: 0 = maximal diversity, 1 = greedy.
num_iters (int): Number of results to return.
score_type (str): One of ["similarity", "distance"].
Returns:
T.Tensor: Ordering of the candidates with respect to the MMR scoring.
"""
_accepted_score_types = ["similarity", "distance"]
if score_type not in _accepted_score_types:
raise ValueError(f"score_type must be one of {_accepted_score_types}.")
if not (0 < num_iters <= len(score_di_q)):
raise ValueError(f"Number of iterations must be in (0, {len(score_di_q)}].")
if not (0 <= lam <= 1):
raise ValueError("lambda must be in [0, 1].")
R = np.arange(len(score_di_q))
if score_type == "similarity":
S = np.array([score_di_q.argmax().cpu()])
elif score_type == "distance":
S = np.array([score_di_q.argmin().cpu()])
for _ in range(num_iters - 1):
cur_di = R[~np.isin(R, S)] # Di in R\S
lhs = score_di_q[cur_di]
if score_type == "similarity":
rhs = score_di_dj[S[:, None], cur_di].max(axis=0).values
idx = np.argmax(lam * lhs - (1 - lam) * rhs)
elif score_type == "distance":
rhs = score_di_dj[S[:, None], cur_di].min(axis=0).values
idx = np.argmin(lam * lhs - (1 - lam) * rhs)
S = np.append(S, [cur_di[idx]])
return S
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment