Last active
January 11, 2022 13:31
-
-
Save zkavtaskin/acf07855fc4292d7dce4139d1f68d93b to your computer and use it in GitHub Desktop.
Machine learning model partitioning for multi-tenant data in Python. Full write up can be found here: https://zankavtaskin.medium.com/machine-learning-model-partitioning-for-multi-tenant-data-in-python-7297dd7f6ba1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.linear_model import LinearRegression | |
import pickle | |
import random | |
import numpy as np | |
""" | |
Sample data preparation for the partition. | |
-------- | |
This example uses linear function y=mx+b where m is the tenant / company | |
specific bias and b is the error. LinearRegression is then used to find | |
parameters to fit the generated data. | |
""" | |
samples = 1000 | |
x = np.random.randint(100000, size=(samples, 1)) | |
bias = np.random.randint(1000, size=3) | |
error = np.random.normal(loc=0, scale=2, size=(samples, 1)) | |
Xy_a = np.hstack([np.full((samples,1), 0), x, bias[0]*x+error]) | |
Xy_b = np.hstack([np.full((samples,1), 1), x, bias[1]*x+error]) | |
Xy_c = np.hstack([np.full((samples,1), 2), x, bias[2]*x+error]) | |
Xy = np.concatenate((Xy_a, Xy_b, Xy_c), axis=0) | |
random.shuffle(Xy) | |
X = Xy[:,0:2] | |
y = Xy[:,2] | |
""" | |
Example of how this model can be used. | |
-------- | |
Part 1 shows how it is possible to set up, train and use Partition class with your underlying model. | |
Part 2 shows that model state can be preserved and reloaded for future predictions, this | |
means the model should work seamlessly with MLflow. | |
""" | |
idx_to_test = np.random.randint(low=0, high=len(X), size=5).tolist() | |
partitioned_linear_regression = Partition(LinearRegression, normalize=True, copy_X=True) | |
partitioned_linear_regression.fit(X, y) | |
score_per_partition = partitioned_linear_regression.score(X, y) | |
print("Coefficient of determination per partition:", score_per_partition) | |
y_hat = partitioned_linear_regression.predict(X[idx_to_test]) | |
print("y actual:", y[idx_to_test]) | |
print("ŷ predicted:", y_hat) | |
filename = 'partition.pkl' | |
pickle.dump(partitioned_linear_regression, open(filename, 'wb')) | |
partitioned_linear_regression_from_pkl = pickle.load(open(filename, 'rb')) | |
y_hat_from_pkl = partitioned_linear_regression_from_pkl.predict(X[idx_to_test]) | |
print("y actual:", y[idx_to_test]) | |
print("ŷ predicted (using pkl):", y_hat_from_pkl) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from numpy.core.numeric import NaN | |
class Partition(): | |
""" | |
GNU General Public License v3.0 | |
Partition Model creates submodels by partition key based on X input. | |
Then each submodel is trained individually based on the same hyperparameters. | |
Partition can be used in situations where data has a specific bias / context | |
and trained parameters need to be preserved for the future predictions. | |
E.g. in multi-tenancy scenarios tenants data might be 1) Generic and therefore | |
would benefit from shared data training or 2) Specific, adding bias and reducing | |
the prediction performance. In scenario 2 the partition model helps through stratification. | |
---------- | |
model : class | |
Provide the class type, not the instance or the string of the model that needs to be | |
trained e.g. LinearRegression. | |
model_params : **model_params | |
These hyper parameters are passed to the the model when model is created | |
e.g. Partition(LinearRegression, normalize=True, copy_X=True) | |
this will be treated as LinearRegression(normalize=True, copy_X=True) | |
Examples | |
-------- | |
>>> from sklearn.linear_model import LinearRegression | |
>>> partitioned_linear_regression = Partition(LinearRegression, normalize=True, copy_X=True) | |
>>> partitioned_linear_regression.fit(X, y) | |
>>> partitioned_linear_regression.score(X, y) | |
[[0.0, 0.9999999999999857], [1.0, 0.9999999999999869], [2.0, 0.9999999999999901]] | |
>>> partitioned_linear_regression.predict(X_test) | |
[23099992.00036588, 54803672.0157626, 26120047.9873289, 50752503.8809955, 56659703.855495304] | |
""" | |
def __init__(self, model, **model_params): | |
self._model = model | |
self._model_params = model_params | |
self._models = {} | |
def fit(self, X, y): | |
""" | |
Fit underlying partitioned models. | |
Parameters | |
---------- | |
X : {array-like, sparse matrix} of shape (n_samples, n_features) containing training data | |
First feature X[:,0] must contain a partition key that is not None or NaN. | |
Samples do not need to be in partition key order. | |
y : array-like of shape (n_samples,) or (n_samples, n_targets) target values. | |
No partition key is required. | |
Returns | |
------- | |
self : object | |
Fitted partition model. | |
""" | |
self.__validate_partition_key(X) | |
keys_with_idx = self.__get_partition_keys_with_row_idx(X, True) | |
for i, partition_key in enumerate(keys_with_idx): | |
partition_key_row_idx = keys_with_idx.get(partition_key) | |
self._models[partition_key].fit(X[partition_key_row_idx,1:], y[partition_key_row_idx]) | |
return self | |
def predict(self, X): | |
""" | |
Predict using the underlying partitioned models. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape (n_samples, n_features) | |
First feature X[:,0] must contain a partition key that is not None or NaN. | |
Samples do not need to be in partition key order. | |
Returns | |
------- | |
predictions : array, shape (n_samples,1) | |
""" | |
self.__validate_partition_key(X) | |
predictions = [] | |
for i, x in enumerate(X): | |
if x[0] is NaN or x[0] is None: | |
raise Exception("Partition key cannot be None or NaN type") | |
predictions.append(self._models[x[0]].predict([x[1:]])[0]) | |
return predictions | |
def score(self, X, y): | |
""" | |
Return the score of the underlying model per partition. Score could be | |
coefficient of determination, mean accuracy or other type of score. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape (n_samples, n_features) | |
First feature X[:,0] must contain a partition key that is not None or NaN. | |
Samples do not need to be in partition key order. | |
y : array-like of shape (n_samples,) or (n_samples, n_targets) target values. | |
No partition key is required. | |
Returns | |
------- | |
scores : array, shape (n_partitions,2) | |
""" | |
self.__validate_partition_key(X) | |
keys_with_idx = self.__get_partition_keys_with_row_idx(X) | |
scores = [] | |
for i, partition_key in enumerate(keys_with_idx): | |
partition_key_row_idx = keys_with_idx.get(partition_key) | |
score = self._models[partition_key].score(X[partition_key_row_idx,1:], y[partition_key_row_idx]) | |
scores.append([partition_key, score]) | |
return scores | |
def __validate_partition_key(self, X): | |
if 1 >= len(X[0]): | |
raise Exception("X must contain more than 1 column where first column is used as partition key") | |
def __get_partition_keys_with_row_idx(self, X, init_model=False): | |
keys_with_idx = {} | |
for i, x in enumerate(X): | |
if x[0] is NaN or x[0] is None: | |
raise Exception("Partition key cannot be None or NaN type") | |
if x[0] not in keys_with_idx: | |
if init_model: | |
self._models[x[0]] = self._model(*self._model_params) | |
keys_with_idx[x[0]] = [i] | |
else: | |
keys_with_idx[x[0]].append(i) | |
return keys_with_idx |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment