Skip to content

Instantly share code, notes, and snippets.

@zkavtaskin
Last active January 11, 2022 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zkavtaskin/acf07855fc4292d7dce4139d1f68d93b to your computer and use it in GitHub Desktop.
Save zkavtaskin/acf07855fc4292d7dce4139d1f68d93b to your computer and use it in GitHub Desktop.
Machine learning model partitioning for multi-tenant data in Python. Full write up can be found here: https://zankavtaskin.medium.com/machine-learning-model-partitioning-for-multi-tenant-data-in-python-7297dd7f6ba1
from sklearn.linear_model import LinearRegression
import pickle
import random
import numpy as np
"""
Sample data preparation for the partition.
--------
This example uses linear function y=mx+b where m is the tenant / company
specific bias and b is the error. LinearRegression is then used to find
parameters to fit the generated data.
"""
samples = 1000
x = np.random.randint(100000, size=(samples, 1))
bias = np.random.randint(1000, size=3)
error = np.random.normal(loc=0, scale=2, size=(samples, 1))
Xy_a = np.hstack([np.full((samples,1), 0), x, bias[0]*x+error])
Xy_b = np.hstack([np.full((samples,1), 1), x, bias[1]*x+error])
Xy_c = np.hstack([np.full((samples,1), 2), x, bias[2]*x+error])
Xy = np.concatenate((Xy_a, Xy_b, Xy_c), axis=0)
random.shuffle(Xy)
X = Xy[:,0:2]
y = Xy[:,2]
"""
Example of how this model can be used.
--------
Part 1 shows how it is possible to set up, train and use Partition class with your underlying model.
Part 2 shows that model state can be preserved and reloaded for future predictions, this
means the model should work seamlessly with MLflow.
"""
idx_to_test = np.random.randint(low=0, high=len(X), size=5).tolist()
partitioned_linear_regression = Partition(LinearRegression, normalize=True, copy_X=True)
partitioned_linear_regression.fit(X, y)
score_per_partition = partitioned_linear_regression.score(X, y)
print("Coefficient of determination per partition:", score_per_partition)
y_hat = partitioned_linear_regression.predict(X[idx_to_test])
print("y actual:", y[idx_to_test])
print("ŷ predicted:", y_hat)
filename = 'partition.pkl'
pickle.dump(partitioned_linear_regression, open(filename, 'wb'))
partitioned_linear_regression_from_pkl = pickle.load(open(filename, 'rb'))
y_hat_from_pkl = partitioned_linear_regression_from_pkl.predict(X[idx_to_test])
print("y actual:", y[idx_to_test])
print("ŷ predicted (using pkl):", y_hat_from_pkl)
import numpy as np
from numpy.core.numeric import NaN
class Partition():
"""
GNU General Public License v3.0
Partition Model creates submodels by partition key based on X input.
Then each submodel is trained individually based on the same hyperparameters.
Partition can be used in situations where data has a specific bias / context
and trained parameters need to be preserved for the future predictions.
E.g. in multi-tenancy scenarios tenants data might be 1) Generic and therefore
would benefit from shared data training or 2) Specific, adding bias and reducing
the prediction performance. In scenario 2 the partition model helps through stratification.
----------
model : class
Provide the class type, not the instance or the string of the model that needs to be
trained e.g. LinearRegression.
model_params : **model_params
These hyper parameters are passed to the the model when model is created
e.g. Partition(LinearRegression, normalize=True, copy_X=True)
this will be treated as LinearRegression(normalize=True, copy_X=True)
Examples
--------
>>> from sklearn.linear_model import LinearRegression
>>> partitioned_linear_regression = Partition(LinearRegression, normalize=True, copy_X=True)
>>> partitioned_linear_regression.fit(X, y)
>>> partitioned_linear_regression.score(X, y)
[[0.0, 0.9999999999999857], [1.0, 0.9999999999999869], [2.0, 0.9999999999999901]]
>>> partitioned_linear_regression.predict(X_test)
[23099992.00036588, 54803672.0157626, 26120047.9873289, 50752503.8809955, 56659703.855495304]
"""
def __init__(self, model, **model_params):
self._model = model
self._model_params = model_params
self._models = {}
def fit(self, X, y):
"""
Fit underlying partitioned models.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features) containing training data
First feature X[:,0] must contain a partition key that is not None or NaN.
Samples do not need to be in partition key order.
y : array-like of shape (n_samples,) or (n_samples, n_targets) target values.
No partition key is required.
Returns
-------
self : object
Fitted partition model.
"""
self.__validate_partition_key(X)
keys_with_idx = self.__get_partition_keys_with_row_idx(X, True)
for i, partition_key in enumerate(keys_with_idx):
partition_key_row_idx = keys_with_idx.get(partition_key)
self._models[partition_key].fit(X[partition_key_row_idx,1:], y[partition_key_row_idx])
return self
def predict(self, X):
"""
Predict using the underlying partitioned models.
Parameters
----------
X : array-like or sparse matrix, shape (n_samples, n_features)
First feature X[:,0] must contain a partition key that is not None or NaN.
Samples do not need to be in partition key order.
Returns
-------
predictions : array, shape (n_samples,1)
"""
self.__validate_partition_key(X)
predictions = []
for i, x in enumerate(X):
if x[0] is NaN or x[0] is None:
raise Exception("Partition key cannot be None or NaN type")
predictions.append(self._models[x[0]].predict([x[1:]])[0])
return predictions
def score(self, X, y):
"""
Return the score of the underlying model per partition. Score could be
coefficient of determination, mean accuracy or other type of score.
Parameters
----------
X : array-like or sparse matrix, shape (n_samples, n_features)
First feature X[:,0] must contain a partition key that is not None or NaN.
Samples do not need to be in partition key order.
y : array-like of shape (n_samples,) or (n_samples, n_targets) target values.
No partition key is required.
Returns
-------
scores : array, shape (n_partitions,2)
"""
self.__validate_partition_key(X)
keys_with_idx = self.__get_partition_keys_with_row_idx(X)
scores = []
for i, partition_key in enumerate(keys_with_idx):
partition_key_row_idx = keys_with_idx.get(partition_key)
score = self._models[partition_key].score(X[partition_key_row_idx,1:], y[partition_key_row_idx])
scores.append([partition_key, score])
return scores
def __validate_partition_key(self, X):
if 1 >= len(X[0]):
raise Exception("X must contain more than 1 column where first column is used as partition key")
def __get_partition_keys_with_row_idx(self, X, init_model=False):
keys_with_idx = {}
for i, x in enumerate(X):
if x[0] is NaN or x[0] is None:
raise Exception("Partition key cannot be None or NaN type")
if x[0] not in keys_with_idx:
if init_model:
self._models[x[0]] = self._model(*self._model_params)
keys_with_idx[x[0]] = [i]
else:
keys_with_idx[x[0]].append(i)
return keys_with_idx
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment