Skip to content

Instantly share code, notes, and snippets.

@mjbommar
Created January 31, 2022 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mjbommar/2cca12a7e0f3c8976daf0f4054bc6391 to your computer and use it in GitHub Desktop.
Save mjbommar/2cca12a7e0f3c8976daf0f4054bc6391 to your computer and use it in GitHub Desktop.
pipeline_dp-issue-237 constant_column.py
# package imports
import numpy.random
import pandas
import sklearn.datasets # scikit-learn==1.0.2
# dp imports
import pipeline_dp
if __name__ == "__main__":
# setup random state
rng = numpy.random.RandomState(seed=0)
# load diabetes data into a nice pd DataFrame
dataset_dict = sklearn.datasets.load_diabetes()
df = pandas.DataFrame(dataset_dict['data'],
columns=dataset_dict['feature_names'])
# add explicit patient ID
df.loc[:, "patient_id"] = range(df.shape[0])
# add state variable
df.loc[:, "state_id"] = numpy.random.randint(0, 50, size=(df.shape[0],))
# add a constant column
df.loc[:, "constant"] = 42
# add target
df.loc[:, "progression"] = dataset_dict['target']
# debug shape
print(f"loaded original df with shape {df.shape}")
# column std
print("column standard deviations:")
print(df.std())
# setup dp engine
dp_backend = pipeline_dp.LocalBackend()
dp_budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1e3, total_delta=1e-1)
dp_engine = pipeline_dp.DPEngine(dp_budget_accountant, dp_backend)
# setup extractor and parameters
privacy_column = "patient_id"
partition_column = "state_id"
# explicit partition list for 50 states
partition_list = range(50)
# setup column transform specs
value_column_list = [("age", pipeline_dp.Metrics.MEAN),
("sex", pipeline_dp.Metrics.MEAN),
("constant", pipeline_dp.Metrics.MEAN),
]
# unit/partition parameters (shared across all columns)
max_contributions_per_partition=3
max_partitions_contributed=2
result_columns = {}
for value_column_name, value_metric in value_column_list:
# setup column extractor
value_extractor = pipeline_dp.DataExtractors(
partition_extractor=lambda row: row[partition_column],
privacy_id_extractor=lambda row: row[privacy_column],
value_extractor=lambda row: row[value_column_name])
# handle min/max value for specific metric
if value_metric == pipeline_dp.Metrics.MEAN:
min_value = df[value_column_name].min() - df[value_column_name].std()
max_value = df[value_column_name].max() + df[value_column_name].std()
else:
raise ValueError("unsupported metric")
# setup agg param object
value_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.LAPLACE,
metrics=[value_metric,],
max_contributions_per_partition=max_contributions_per_partition,
max_partitions_contributed=max_partitions_contributed,
min_value=min_value,
max_value=max_value,
public_partitions=partition_list,
)
# track non-materialized aggregate
result_columns[value_column_name] = dp_engine.aggregate([r for _, r in df.iterrows()],
value_params,
value_extractor)
# finalize budget calculation prior to materializing values
dp_budget_accountant.compute_budgets()
# build dataframe with indices aligned
first_column_name = value_column_list[0][0]
first_column_index, first_column_values = zip(*result_columns[first_column_name])
result_df = pandas.DataFrame(first_column_values, index=first_column_index, columns=[first_column_name,])
if len(value_column_list) > 1:
for column_name, _ in value_column_list[1:]:
column_index, column_values = zip(*result_columns[column_name])
result_df.loc[column_index, column_name] = column_values
print("original")
print(df.groupby(partition_column)[first_column_name].mean().sort_values(ascending=False).head(5))
print("transformed")
print(result_df[first_column_name].sort_values(ascending=False).head(5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment