Skip to content

Instantly share code, notes, and snippets.

@kayibal
Last active March 12, 2024 21:02
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save kayibal/16340660d1d85b9ea1872a5d9be0f383 to your computer and use it in GitHub Desktop.
Save kayibal/16340660d1d85b9ea1872a5d9be0f383 to your computer and use it in GitHub Desktop.
"""Recommendation module.
This module deals with using lightFM models in production and includes a
LightFm subclass which provides a predict_online method to use in API or
similar scenarios.
"""
import operator
from logging import getLogger
import numpy as np
import pandas as pd
import sparsity as sp
from scipy import sparse
log = getLogger(__name__)
try:
from lightfm import LightFM
except ImportError:
log.debug('Importing recommend module without LightFm installed. '
'Install with pip install lightfm!')
try:
from cachetools import cachedmethod, LRUCache
from cachetools.keys import hashkey
except ImportError:
log.debug('Importing recommend module without cachetools installed.'
'Install with pip install cachetools!')
class LFMRecommender(LightFM):
"""Recommender class based on the LightFM Model.
The LightFM model is more expressive if an identity matrix is appended to
feature matrices. It acts like a memory for the model as it
will create a individual embedding (vector of no_comp) for each previously
(during training) seen user.
If the user is unknown from training, but user_features are available
these can be passed and the model will try to give the best recommendations
based on the data that is available. There will be an embedding for each
feature used during training.
Furthermore baseline recommendations are computed and returned
in case the user is not known nor user features are available.
Finally this class contains many checks on data integrity and is able to
recover from things like shuffled or additional features.
Parameters
----------
indicators: 'users', 'items', 'both' or False
whether to add identity matrices to the respective
features matrices. Adds a user/item memory to the model.
kwargs:
remaining arguments are passed to the lightFM model.
"""
def __init__(self, indicators='both', **kwargs):
"""Initialize model.
Parameters
----------
indicators: 'users', 'items', 'both' or False
whether to add identity matrices to the respective
features matrices. Adds a user/item memory to the model.
kwargs:
remaining arguments are passed to the lightFM model.
"""
super().__init__(**kwargs)
self.uid_map = pd.Series([])
self.iid_map = pd.Series([])
if indicators in ['both', 'users', 'items', False]:
self.indicator_setting = indicators
elif indicators:
self.indicator_setting = 'both'
else:
raise ValueError("Invalid identity_matrix parameters: {}"
.format(indicators))
self.user_feature_names = pd.Index([])
self.item_feature_names = pd.Index([])
self.baseline = pd.Series([])
self._user_indicator = None
self._item_indicator = None
self._item_cache = LRUCache(maxsize=8)
def fit_partial(self, interactions: sp.SparseFrame,
user_features: sp.SparseFrame = None,
item_features: sp.SparseFrame = None,
sample_weight=None,
epochs=1,
num_threads=1,
verbose=False):
try:
self._check_initialized()
except ValueError:
self.prepare(interactions, item_features, user_features)
interactions = interactions.data
user_features = getattr(user_features, 'data', None)
item_features = getattr(item_features, 'data', None)
user_features, item_features = self.append_indicators(
user_features, item_features
)
super().fit_partial(interactions, user_features, item_features,
sample_weight, epochs, num_threads, verbose)
def prepare(self, interactions, item_features, user_features):
"""Prepare model for fit and prediction.
This method initializes many model attributes like
item and user mappings, as well as features. This is
usually done automatically for the user.
In some rare cases it might be useful though. E.g.
when using append_identity on a untrained model
(used in train_with_early_stopping)
Parameters
----------
interactions: SparseFrame
train interactions
item_features: SparseFrame, None
item metadata features
user_features: SparseFrame, None
user metadata features
Returns
-------
None
"""
self.uid_map = pd.Series(np.arange(interactions.shape[0]),
index=interactions.index)
# TODO fix part where interactions are created with MultiIndex in cols
if isinstance(interactions.columns, pd.MultiIndex):
interactions._columns = interactions.columns.levels[0]
self.iid_map = pd.Series(np.arange(interactions.shape[1]),
index=interactions.columns)
if self.indicator_setting:
self._init_indicators()
if not self.indicator_setting and \
(user_features is None or item_features is None):
raise ValueError("Can't estimate embeddings without indicators. "
"Try setting identity_matrix='both' or pass user "
"and item features to estimate embeddings.")
self.user_feature_names = getattr(user_features, 'columns', None)
self.item_feature_names = getattr(item_features, 'columns', None)
self.baseline = pd.Series(
np.asarray(interactions.mean(axis=0)).flatten(),
index=interactions.columns,
name='score') \
.sort_values(ascending=False)
def append_indicators(self, user_features, item_features):
"""Append indicator like used during training.
Helper function mainly to use with LightFM evaluation functions.
Parameters
----------
user_features: csr_matrix
user features without identity/indicators
item_features: csr_matrix
item_features without identity/indicators
Returns
-------
uf_with_indicator, if_with_inidcator: csr_matrix
"""
if self.indicator_setting in ['users', 'both']:
if user_features is not None:
user_features = sparse.hstack([user_features,
self._user_indicator[:-1, :]])
else:
user_features = self._user_indicator[:-1, :]
if self.indicator_setting in ['items', 'both']:
if item_features is not None:
item_features = sparse.hstack([item_features,
self._item_indicator])
else:
item_features = self._item_indicator
return user_features, item_features
def _init_indicators(self):
"""Initialize indicator matrices."""
if self.indicator_setting in ['both', 'users']:
D = len(self.uid_map)
self._user_indicator = sparse.vstack([
sparse.identity(D, format='csr'),
sparse.csr_matrix((1, D))
])
if self.indicator_setting in ['items', 'both']:
self._item_indicator = sparse.identity(
len(self.iid_map), format='csr')
def append_user_identity_row(self, v, idx):
"""Append single identity row to vector.
Parameters
----------
v: csr_matrix
row_vector
idx:
identity index will determine the position of the positive
entry in the appended identity
Returns
-------
appended: csr_matrix
"""
return sparse.hstack([v, self._user_indicator[idx, :]])
def _check_missing_features(self, item_feat, user_feat):
"""Check for any missing features."""
if user_feat is not None:
user_feat_diff = set(self.user_feature_names) - \
set(user_feat.columns)
if len(user_feat_diff):
raise ValueError('Missing user features: {}'
.format(user_feat_diff))
if item_feat is not None and self.user_feature_names is not None:
item_feat_diff = set(self.item_feature_names) -\
set(item_feat.columns)
if len(item_feat_diff):
raise ValueError('Missing item features: {}'
.format(item_feat_diff))
@cachedmethod(cache=operator.attrgetter('_item_cache'),
key=lambda _, __, item_ids: hashkey(item_ids))
def get_item_data(self, item_features, item_ids):
"""Return item data.
This creates the item feature csr and corresponding item names and
numerical ids. Caches result in case same items are requested again.
"""
item_ids = np.asarray(list(item_ids))
if item_features is not None:
assert item_features.shape[0] >= len(item_ids)
assert set(item_ids).issubset(set(item_features.index))
iid_map = pd.Series(np.arange(len(item_features)),
index=item_features.index)
else:
iid_map = self.iid_map
iid_map = iid_map.reindex(item_ids)
return self._construct_item_features(item_features, item_ids), \
iid_map.values,\
iid_map.index
def predict_online(self, user_id, item_ids, item_features=None,
user_features=None, num_threads=1, use_baseline=False):
"""Helper method to use during API use.
This method reads all available data and gives the best possible
recommendation for a received sample.
It also executes various checks of data integrity.
Parameters
----------
user_id: scalar
user ids as provided during training
item_ids: array like
item ids as provided during training
item_features: SparseFrame
user_features: SparseFrame
num_threads: int
Number of threads to use during prediction
use_baseline: true
in case user is not known and no user features are passed and
use_baseline=True baseline predictions will be returned . If
use_baseline=False a KeyError will be raised.
Returns
-------
predictions: pd.Series
a mapping from item id to score (unsorted)
"""
self._check_missing_features(item_features, user_features)
if item_ids is not None:
if isinstance(item_ids, pd.Index):
item_ids = item_ids.tolist()
item_names = tuple(item_ids)
else:
item_names = tuple(self.iid_map.index.tolist())
item_feat_csr, num_item_ids, item_labels = \
self.get_item_data(item_features, item_names)
try:
user_feat_csr = self._construct_user_features(user_id,
user_features)
except KeyError:
if use_baseline:
return self.baseline
else:
raise
# for single case prediction we always use id 0 as lightFm uses it as
# index into the user feature matrix if the user was known during
# training we append an identity matrix to indicate that the user
# was known.
pred = super().predict(0, num_item_ids,
item_feat_csr, user_feat_csr,
num_threads)
pred = pd.Series(pred, index=item_labels)
return pred
def _construct_item_features(self, item_features, item_ids):
"""Create item features during predict."""
# align feature names
if self.indicator_setting in ['both', 'items']:
item_indicator = sp.SparseFrame(self._item_indicator,
index=self.iid_map.index)
item_indicator = item_indicator.reindex(item_ids).data
else:
item_indicator = None
if self.item_feature_names is None:
return item_indicator
item_feat_csr = item_features\
.loc[:, self.item_feature_names]\
.reindex(item_ids, axis=0)\
.data
if item_indicator is not None:
item_feat_csr = sparse.hstack([item_feat_csr,
item_indicator])
return item_feat_csr
def __setstate__(self, state):
"""Support unpickling older versions of this class."""
if 'identity_matrix' in state:
state['indicator_setting'] = state['identity_matrix']
self.__dict__ = state
def _construct_user_features(self, user_id, user_features):
"""Create user features for a single user."""
# retrieve numerical user ids
# abort and return baseline recommendations if user is not known
# and no user features are passed
user_known = True
try:
num_user_id = self.uid_map.loc[user_id]
except KeyError:
# Case we have no features nor the user was known we abort.
if user_features is None:
raise
user_known = False
num_user_id = 0
if user_features is not None:
if self.user_feature_names is None:
raise ValueError('Model was trained without user features. '
'But received user features for prediction.')
user_feat_csr = user_features.loc[:, self.user_feature_names].data
if user_feat_csr.shape[0] > 1:
raise ValueError(
'Received user feature matrix with more than 1 row.')
else:
user_feat_csr = None
if self.user_feature_names is not None and \
self.indicator_setting in [False, 'users']:
raise ValueError("Need user features as used "
"during training: {}"
.format(self.user_feature_names))
if self.indicator_setting in ['users', 'both']:
# if no user_features were used during training
# no need to handle further cases just use indicator row.
if self.user_feature_names is None:
user_feat_csr = self._user_indicator[num_user_id]
# Append identity matrix only if user is known from training,
# features have been passed and the identity_matrix flag is set.
elif user_feat_csr is not None and user_known:
user_feat_csr = self.append_user_identity_row(user_feat_csr,
num_user_id)
elif user_feat_csr is None and user_known:
empty_features = sparse.csr_matrix(
(1, len(self.user_feature_names)))
user_feat_csr = self.append_user_identity_row(empty_features,
num_user_id)
elif user_features is not None and not user_known:
user_feat_csr = self.append_user_identity_row(
user_feat_csr, -1)
return user_feat_csr
@kayibal
Copy link
Author

kayibal commented Sep 3, 2018

Install dependencies:

pip install lightfm scipy pandas
pip install git+https://github.com/datarevenue-berlin/sparsity.git

@sunnyjha
Copy link

sunnyjha commented Apr 27, 2019

Thanks. Very informative. In there a typo in line 228 by any chance? Did you mean self. item_feature_names is not None:?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment