Skip to content

Instantly share code, notes, and snippets.

@ykerzhner

ykerzhner/output Secret

Last active February 3, 2021 05:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ykerzhner/51358780a6a4cc33266515f17bf98a96 to your computer and use it in GitHub Desktop.
Save ykerzhner/51358780a6a4cc33266515f17bf98a96 to your computer and use it in GitHub Desktop.
Code and output demonstrating a bug in the spark binary logistic regression intercept computation.
Is Data Centered: True
Doing adjustment: False
log(odds): -3.9876303002978997
Just the blr data
Coefficients: [0.3001878865948312,0.20061697535429474]
Intercept: -3.999522225240717
blr data plus one vector that is filled with 1's and .9's
Coefficients: [0.30024701108766194,0.20055441356153272,-0.012393535533061458]
Intercept: -3.987260922443554
Is Data Centered: False
Doing adjustment: False
log(odds): -3.542495168380248
Just the blr data
Coefficients: [0.3009017564778608,0.20029856941859797]
Intercept: -4.00054217613009
blr data plus one vector that is filled with 1's and .9's
Coefficients: [0.2997261304455311,0.18830032771483074,-0.44301560942213103]
Intercept: -3.5428941035683303
Is Data Centered: False
Doing adjustment: True
log(odds): -3.542495168380248
Just the blr data
Coefficients: [0.30089837234760153,0.20029625637312143]
Intercept: -4.000536011464547
blr data plus one vector that is filled with 1's and .9's
Coefficients: [0.3009241865087016,0.2002436621485895,-0.005125361429629902]
Intercept: -3.995378338490539
import logging
import math
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
logger = logging.getLogger(__name__)
def generate_blr_data(theta_1: float, theta_2: float, intercept: float, centered: bool,
num_distribution_samplings: int, num_rows_per_sampling: int):
"""
This function takes two coefficients and an intercept value and generates the two independent variables
and the target vector of 0's and 1's. To generate the data, it samples two different uniform random
variables in order to generate the two independent data vectors. Based on these vectors and the
coefficients and intercept, it computes the associated logistic probability for the event. Based on
this probability, it creates a target vector of 1's and 0's.
Args:
theta_1: first coefficient.
theta_2: second coefficient.
intercept: the intercept for the regression data.
centered: A boolean, if true, the uniform random variables used to generate the independent vectors
have mean 0.
num_distribution_samplings: number of samplings drawn from the uniform random variable.
num_rows_per_sampling: The number of rows used to simulate the given probability, e.g. if the probability
is .7236, and num_rows_per_sampling is 1000, then 724 of them would be 1's and 276 would be 0's.
"""
if centered:
uniforms = np.random.uniform(-.5, .5, num_distribution_samplings)
uniforms2 = np.random.uniform(-1, 1, num_distribution_samplings)
else:
uniforms = np.random.uniform(0, 1.0, num_distribution_samplings)
uniforms2 = np.random.uniform(1, 2, num_distribution_samplings)
h_theta = intercept + theta_1 * uniforms + theta_2 * uniforms2
prob = 1 / (1 + np.exp(-h_theta))
array = np.zeros((num_distribution_samplings, num_rows_per_sampling), dtype=int)
for i in range(num_distribution_samplings):
array[i, 0:int(round(num_rows_per_sampling * prob[i]))] = 1
num_rows = num_distribution_samplings * num_rows_per_sampling
feature_1 = np.tile(uniforms.reshape(num_distribution_samplings, 1), num_rows_per_sampling).reshape(num_rows)
feature_2 = np.tile(uniforms2.reshape(num_distribution_samplings, 1), num_rows_per_sampling).reshape(num_rows)
target = array.reshape(num_rows)
return feature_1, feature_2, target
def test_blr(spark: SparkSession, centered: bool, adjust_data: bool):
"""
This function attempts to fit two binary logistic regressions to the test data. The first fit uses the two
vectors and the corresponding target vector created in generate_blr_data. The second fit is similar to the
first, except that it adds a third vector that is .9 in the first 10% of the entries, and 1.0 in the last
90% of the entries. The first fit is always successful and returns the two coefficients and intercept used
to generate the data. The second fit is successful only when the uniform random variables used to generate
the data were centered (i.e. came from a distribution with mean 0). In the case that the uniform random
variables were not centered, the regression correctly predicts the coefficients, but the value of the
intercept returned by the fit is roughly equal to the log(odds) of the target vector (log(# 1's / # 0's)).
The log(odds) of the target vector is a good estimate for the intercept if and only if each of the data
vectors came from a distribution with mean 0. To summarize the bug: When the optimization surface has a
flat basin where the log loss function has roughly the same values, the minimization favors an intercept
that is roughly equal to the log(odds) of the target. This heuristic is only correct for centered data,
and gives the incorrect intercept when the data is not centered.
spark: A spark session.
centered: a boolean that controls whether the generated data comes from a distribution with mean 0 or not.
adjust_data: a boolean that centers the data, and then adjusts the intercept accordingly. Enabling this
gives the correct value for the intercept even when the data is not centered.
"""
np.random.seed(123456)
print("Is Data Centered: ", centered)
print("Doing adjustment: ", adjust_data)
regParam = 1e-8
num_distribution_samplings = int(1e3)
num_rows_per_sampling = 1000
theta_1 = .3
theta_2 = .2
intercept = -4
feature1, feature2, target = generate_blr_data(theta_1, theta_2, intercept, centered,
num_distribution_samplings, num_rows_per_sampling)
num_rows = num_distribution_samplings * num_rows_per_sampling
const_feature = np.ones(num_rows, dtype=float)
const_feature[0:int(num_rows / 10)] = .9
data_dict = {'feature1': feature1,
'feature2': feature2,
'const_feature': const_feature,
'label': target}
counts = np.unique(data_dict['label'], return_counts=True)
log_odds = math.log(counts[1][1] / counts[1][0])
print("log(odds): ", log_odds)
df = pd.DataFrame.from_dict(data_dict)
spark_df = spark.createDataFrame(df)
if adjust_data:
avg1, avg2 = list(spark_df.groupBy().mean('feature1', 'feature2').collect())[0]
# Center the data manually
spark_df = spark_df.withColumn('feature1', spark_df['feature1'] - avg1)
spark_df = spark_df.withColumn('feature2', spark_df['feature2'] - avg2)
vec = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
spark_df1 = vec.transform(spark_df)
lr = LogisticRegression(maxIter=100, regParam=regParam, elasticNetParam=0.5, fitIntercept=True)
lrModel = lr.fit(spark_df1)
# Print the coefficients and intercept for logistic regression
print("Just the blr data")
print("Coefficients: " + str(lrModel.coefficients))
if adjust_data:
# Adjust the intercept by the averages used to center the data before the regression.
print("Intercept: " + str(lrModel.intercept -avg1 * lrModel.coefficients[0] - avg2 * lrModel.coefficients[1]))
else:
print("Intercept: " + str(lrModel.intercept))
vec = VectorAssembler(inputCols=['feature1', 'feature2', 'const_feature'], outputCol="features")
spark_df2 = vec.transform(spark_df)
lrModel = lr.fit(spark_df2)
# Print the coefficients and intercept for logistic regression
print("blr data plus one vector that is filled with 1's and .9's")
print("Coefficients: " + str(lrModel.coefficients))
if adjust_data:
# Adjust the intercept by the averages used to center the data before the regression.
print("Intercept: " + str(lrModel.intercept - avg1 * lrModel.coefficients[0] - avg2 * lrModel.coefficients[1]))
else:
print("Intercept: " + str(lrModel.intercept))
print()
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Perform the test with centered data
test_blr(spark, centered=True, adjust_data=False)
# Perform the test with data that isn't centered. This is where the bug manifests itself.
test_blr(spark, centered=False, adjust_data=False)
# One can work around this bug by centering the data before running the binary logistic regresssion,
# and then adjusting the intercept accordingly. This is done in the final test.
test_blr(spark, centered=False, adjust_data=True)
if __name__ == "__main__":
main()
@ykerzhner
Copy link
Author

ykerzhner commented Feb 1, 2021

The test program runs 3 tests, and each test contains two fits. I will refer to say the second fit of the first test as 1.b. etc. The bug being presented in this gist is particularly evident in test 2.b. The fit produces an intercept that is equal to the log(odds) of the target vector. This is incorrect because the log(odds) is a good approximation for the intercept if and only if the data is centered. If the data is not centered, the correct intercept can be computed by adjusting the log(odds) appropriately. This is done in test 3.a and 3.b, and yields the correct intercept.
I believe that part of the objective function being minimized when fitting the logistic regression has a term that penalizes the intercept as it moves away from the log(odds). This produces a bug when the data is not centered. The last two statements are my hunches, as I have not been able to find this within the spark code. Also note that even test 1.b. yields an intercept that is almost exactly the log(odds) of the data, which is slightly larger than -4. (log(odds): -3.9876303002978997, Intercept: -3.987260922443554)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment