Skip to content

Instantly share code, notes, and snippets.

@jamm1985
Last active August 16, 2021 06:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamm1985/39783d2e6f20340116d6e14a73749fe6 to your computer and use it in GitHub Desktop.
Save jamm1985/39783d2e6f20340116d6e14a73749fe6 to your computer and use it in GitHub Desktop.
Perform logistic regression for landslide prediction based on rainfall measurements
"""
File: landslide_rainfall_logit.py
Author: Andrey Stepnov
Email: a.stepnov@geophystech.ru, myjamm@gmail.com
Github: https://github.com/jamm1985
Description: Perform logistic regression for landslide prediction
based on rainfall measurements.
Classification tests, plots and metrics!
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.formula.api import logit
from statsmodels.graphics.regressionplots import plot_partregress_grid
from stargazer.stargazer import Stargazer
from sklearn.metrics import classification_report
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import balanced_accuracy_score
# read csv from wr43988.zip with day Temp and Rainfall data
# Vladivostok station
# source http://meteo.ru/it/178-aisori
rainfall_raw = pd.read_csv(
"../DATA/wr43988/wr43988.txt",
sep=";",
header=None,
parse_dates=[[1, 2, 3]],
skipinitialspace=True,
na_values=" ")
# set column names from data description fld43988a0.txt
rainfall_raw.columns =\
['date',
'index_vmo',
'temperature_quality_feature',
'min_day_temp',
'avg_day_temp',
'max_day_temp',
'total_rainfall']
print("count NA for Temp and Rainfall data \n{}".format(
rainfall_raw.isna().sum()))
# drop index_vmp and temperature_quality_feature column
rainfall_raw = rainfall_raw.drop(columns=['index_vmo',
'temperature_quality_feature'])
# read csv from wr43990.zip with day Soil temperature at depth
# Vladivostok station
# source http://meteo.ru/it/178-aisori
soil_temp_depth_raw = pd.read_csv(
"../DATA/wr43990/wr43990.txt",
sep=";",
header=None,
parse_dates=[[1, 2, 3]],
skipinitialspace=True,
na_values="999.9")
# set column names from data description fld43990a0.txt
soil_temp_depth_raw.columns =\
['date',
'index_vmo',
'temp_2_cm',
'temp_5_cm',
'temp_10_cm',
'temp_15_cm',
'temp_20_cm',
'temp_40_cm',
'temp_60_cm',
'temp_80_cm',
'temp_120_cm',
'temp_160_cm',
'temp_240_cm',
'temp_320_cm']
print("count NA for Soil temperature at depth data \n{}".format(
soil_temp_depth_raw.isna().sum()))
# convert date column to datetime64 and mark NaT out-of-bounds dates
soil_temp_depth_raw['date'] =\
pd.to_datetime(soil_temp_depth_raw['date'], errors='coerce')
# drop out-of-bounds dates rows
soil_temp_depth_raw =\
soil_temp_depth_raw[soil_temp_depth_raw['date'].notnull()]
# drop index_vmo column and columns with NA more than 50%
soil_temp_depth_raw =\
soil_temp_depth_raw.drop(
columns=['index_vmo',
'temp_2_cm',
'temp_5_cm',
'temp_10_cm',
'temp_15_cm',
'temp_60_cm',
'temp_320_cm'])
# join two dataframes by date key
DATA = rainfall_raw.join(
soil_temp_depth_raw.set_index('date'), on='date')
print("count NA for accumulated DATA \n{}".format(
DATA.isna().sum()))
# drop rows before 1946-01-01
DATA = DATA[DATA['date'] >= '1946-01-01']
DATA = DATA.reset_index(drop=True)
# landslide cases
landslides = pd.read_excel("../DATA/landslides_cases.xlsx")
# join data
DATA = DATA.join(landslides.set_index('DATE'), on='date')
# assume that all landslide did not occur at any other day
DATA['LANDSLIDE'] = DATA['LANDSLIDE'].fillna(0)
# add cumulative rainfall during YEAR
# I trust there is more elegant and efficient way to do this!
DATA['cumm_year_rainfall'] = 0.0
for i in range(0, len(DATA)):
DATA.cumm_year_rainfall[i] =\
DATA[(DATA['date'] > DATA.iloc[i].date - pd.offsets.YearBegin(1))
& (DATA['date'] < DATA.iloc[i].date)]['total_rainfall'].sum()
# add antecedent rainfall values for each row
# IETD = 1 day, see Hong, M., Kim, J., Jeong, S., 2018. Landslides 15, 523–534.
# and again there is more elegant way to do this!
DATA['antecedent_rainfall'] = 0.0
for i in range(1, len(DATA)):
j = 1
antecedent_rainfall_value = 0
while DATA.iloc[i-j].total_rainfall != 0:
if pd.isna(DATA.total_rainfall.iloc[i-j]):
pass
else:
antecedent_rainfall_value +=\
DATA.iloc[i-j].total_rainfall
j += 1
# print(i, j)
DATA.antecedent_rainfall[i] = antecedent_rainfall_value
# save DATA to xlsx
DATA.to_excel("../DATA/all_variables.xlsx")
# load data if you lost you ipython session
# DATA = pd.read_excel("../DATA/all_variables.xlsx")
# summary table about DATA
summary_columns = ['count', 'mean', 'std', 'min', 'max']
format_table = {'count': '{:.0f}',
'mean': '{:.3f}',
'std': '{:.3f}',
'min': '{:.1f}',
'max': '{:.1f}',
}
with open('../RESULTS/summary_all_data.html', 'w') as f:
print(
DATA.describe().T[summary_columns].style.format(format_table).render(),
file=f)
# see landslides cases over day temp and rainfalls
CASES = DATA[DATA['LANDSLIDE'] == 1][[
'date',
'avg_day_temp',
'total_rainfall',
'cumm_year_rainfall',
'antecedent_rainfall']]
print("All landslide cases\n{}".format(CASES))
print("And some stats about it\n{}".format(CASES.describe()))
# For logit: fight a QUASI-COMPLETE SEPARATION
rainfall_threshold =\
CASES['total_rainfall'].min() - DATA['total_rainfall'].std()
print("Rainfall threshold is {}".format(rainfall_threshold))
# logit!
REGRESSION_SAMPLE = DATA[DATA['total_rainfall'] >= rainfall_threshold]
REGRESSION_SAMPLE = REGRESSION_SAMPLE.reset_index(drop=True)
REGRESSION_SAMPLE = REGRESSION_SAMPLE.astype({'LANDSLIDE': 'int32'})
# summary table about train set
summary_columns = ['count', 'mean', 'std', 'min', 'max']
format_table = {'count': '{:.0f}',
'mean': '{:.3f}',
'std': '{:.3f}',
'min': '{:.1f}',
'max': '{:.1f}',
}
with open('../RESULTS/summary_train_set.html', 'w') as f:
print(
REGRESSION_SAMPLE.describe().T[summary_columns].style.format(
format_table).render(),
file=f)
# R-style logit!
logit_mod = logit(
"LANDSLIDE ~"
" total_rainfall"
" + antecedent_rainfall"
" + cumm_year_rainfall",
REGRESSION_SAMPLE)
logit_res = logit_mod.fit()
print(logit_res.summary())
print("\n")
print(logit_res.get_margeff().summary())
# write results to a table
stargazer = Stargazer([logit_res])
with open('../RESULTS/logit.html', 'w') as f:
print(stargazer.render_html(), file=f)
# optimal threshold
# see
# https://machinelearningmastery.com/threshold-moving-for-imbalanced-classification/
y_true = REGRESSION_SAMPLE['LANDSLIDE'].to_numpy()
# calculate roc curves
precision, recall, thresholds =\
precision_recall_curve(y_true, logit_res.predict())
# convert to f score
fscore = (2 * precision * recall) / (precision + recall)
# locate the index of the largest f score
ix = np.argmax(fscore)
print('Best Threshold=%f, F-Score=%.3f' % (thresholds[ix], fscore[ix]))
# Model performance metrics
y_true = REGRESSION_SAMPLE['LANDSLIDE'].to_numpy()
y_pred = np.where(logit_res.predict() >= thresholds[ix], 1, 0)
# precision, recall, f1, overall accuracy
print(classification_report(y_true,
y_pred,
target_names=['Not a Landslide',
'Landslide']))
# confusion matrix for the best threshold
print("confusion matrix\n{}".format(
logit_res.pred_table(threshold=thresholds[ix])))
# balanced accuracy
print("ballanced accuracy {}".format(
balanced_accuracy_score(y_true, y_pred)))
# PLOTTING SECTION!
# plot best threshold for logit model (Precision-Recall Curve)
plt.clf()
plt.plot(recall, precision, marker='.',
color='gray', label='Logit(P)')
plt.scatter(recall[ix], precision[ix], s=80,
marker='o', color='black', label='Best F1 score')
# axis labels
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.legend()
# show the plot
plt.savefig("../FIGS/best_threshold.png", dpi=600, format='png')
plt.close()
# plot best threshold for logit model (F1 score and Balanced Accuracy Curve)
# calculate balanced accuracy
balanced_accuracy = np.array([])
for i in np.insert(thresholds, 0, 0.0):
y_pred_iteration = np.where(logit_res.predict() >= i, 1, 0)
balanced_accuracy =\
np.append(balanced_accuracy,
balanced_accuracy_score(y_true, y_pred_iteration))
plt.clf()
plt.figure(figsize=(8, 6))
plt.plot(np.insert(thresholds, 0, 0.0), fscore, marker='.',
color='gray', label='F1score')
# plt.plot(thresholds, balanced_accuracy, marker='x',
# color='gray', label='Balanced Accuracy')
plt.scatter(np.insert(thresholds, 0, 0.0)[ix], fscore[ix], s=80,
marker='o', color='black', label='Best F1 score')
# axis labels
plt.xlabel('Threshold')
plt.ylabel('F1 score')
plt.legend()
# show the plot
plt.savefig("../FIGS/F1score_threshold.png", dpi=600, format='png')
plt.close()
# plot threshold and precision/recall
plt.clf()
plt.figure(figsize=(8, 6))
plt.plot(np.insert(thresholds, 0, 0.0), recall, marker='.',
color='gray', label='Recall')
plt.plot(np.insert(thresholds, 0, 0.0), precision, marker='x',
color='gray', label='Precision')
plt.scatter(np.insert(thresholds, 0, 0.0)[ix+1], fscore[ix], s=80,
marker='o', color='black', label='Best F1 score')
# axis labels
plt.xlabel('Threshold')
# plt.ylabel('')
plt.legend()
# show the plot
plt.savefig("../FIGS/precision_recall_threshold.png", dpi=600, format='png')
plt.close()
# correlation matrix all the DATA
correlation_matrix = DATA.corr()
# clear plot object
plt.clf()
plt.figure(figsize=(18, 14))
# rename axis to human-like style
# plot and save fig
ax_1 = sns.heatmap(
correlation_matrix,
annot=True,
square=True,
linewidths=.5)
ax_1.set_title('Correlation matrix')
plt.savefig("../FIGS/correlation_matrix.png", dpi=600, format='png')
# plt.show()
plt.close()
# print only rainfall correlation matrix
# clear plot object
plt.clf()
correlation_matrix_rainfall =\
DATA[['total_rainfall',
'cumm_year_rainfall',
'antecedent_rainfall',
'LANDSLIDE']].corr()
lables = ["Daily\nrainfall",
"Cumulative\nprecipitation",
"Antecedent\nrainfall",
"Landslide"]
correlation_matrix_rainfall.columns = lables
correlation_matrix_rainfall.index = lables
plt.figure(figsize=(10, 8))
# fix bug with text alignment on y-axes
# https://github.com/mwaskom/seaborn/issues/1820
plt.setp(plt.gca().yaxis.get_majorticklabels(), va="center")
ax_1 = sns.heatmap(
correlation_matrix_rainfall,
annot=True,
square=True,
linewidths=.5)
# ax_1.set_title('Correlation matrix')
plt.savefig("../FIGS/correlation_matrix_rainfall.png", dpi=600, format='png')
# plt.show()
plt.close()
# plot Landslide dummy variable on Day and Year cumulative rainfall axes
# Fig 5
# clear plot object
plt.clf()
lm = sns.lmplot(
x='total_rainfall',
y='cumm_year_rainfall',
hue='LANDSLIDE',
# logistic=True,
fit_reg=False,
markers=[".", "+"],
legend_out=True,
palette=["lightgray", "black"],
data=DATA)
lm = lm.set_axis_labels("Daily rainfall, mm", "Cumulative precipitation, mm")
lm._legend.set_title("Landslide")
new_labels = ['No', 'Yes']
for t, l in zip(lm._legend.texts, new_labels): t.set_text(l)
# lm.fig.suptitle('Landslide occurrence depends on rainfall (mm)')
# plot decision boundary P=0.5
X_05 = np.array([84, 128])
# set one variable to median and solve for P=0.5
Y_05 = (logit_res.params['Intercept'] +
logit_res.params['total_rainfall'] * X_05 +
logit_res.params['antecedent_rainfall'] *
REGRESSION_SAMPLE['antecedent_rainfall'].mean()) /\
-logit_res.params['cumm_year_rainfall']
linewidth = 0.6
fontsize = 8
plt.plot(X_05, Y_05, '--', color='black', linewidth=linewidth, label='P=0.5')
plt.text(X_05[0], Y_05[0], '0.50',
ha='right', va='center',
backgroundcolor='white',
fontsize=fontsize)
# Plot decision boundary for P=BEST
X_best = np.array([64, 108])
Y_best = np.log(thresholds[ix]/(1-thresholds[ix])) /\
logit_res.params['cumm_year_rainfall'] +\
(logit_res.params['Intercept'] +
logit_res.params['total_rainfall'] * X_best +
logit_res.params['antecedent_rainfall'] *
REGRESSION_SAMPLE['antecedent_rainfall'].mean()) /\
-logit_res.params['cumm_year_rainfall']
plt.plot(X_best, Y_best, '-', color='black', linewidth=linewidth)
plt.text(X_best[0], Y_best[0], '{:.2f}'.format(thresholds[ix]),
ha='right', va='center',
backgroundcolor='white',
fontsize=fontsize)
plt.savefig('../FIGS/scatter_categorical_plot_1.png', dpi=600, format='png')
plt.close()
# plot Landslide dummy variable on Day and antecedent cumulative rainfall axes
# Fig 6
# clear plot object
plt.clf()
lm = sns.lmplot(
x='total_rainfall',
y='antecedent_rainfall',
hue='LANDSLIDE',
# logistic=True,
fit_reg=False,
markers=[".", "+"],
legend_out=True,
palette=["lightgray", "black"],
data=DATA)
lm = lm.set_axis_labels("Daily rainfall, mm", "Antecedent rainfall, mm")
lm._legend.set_title("Landslide")
new_labels = ['No', 'Yes']
for t, l in zip(lm._legend.texts, new_labels): t.set_text(l)
# lm.fig.suptitle('Landslide occurrence depends on rainfall (mm)')
# plot decision boundary for P=0.5
# set one variable to median and solve for P=0.5
X_05 = np.array([60, 120])
Y_05 = (logit_res.params['Intercept'] +
logit_res.params['total_rainfall'] * X_05 +
logit_res.params['cumm_year_rainfall'] *
REGRESSION_SAMPLE['cumm_year_rainfall'].mean()) /\
-logit_res.params['antecedent_rainfall']
linewidth = 0.6
fontsize = 8
plt.plot(X_05, Y_05, '--', color='black', linewidth=linewidth, label='P=0.5')
plt.text(X_05[0], Y_05[0], '0.50',
ha='right', va='center',
backgroundcolor='white',
fontsize=fontsize)
# Plot decision boundary for P=BEST
X_best = np.array([40, 98])
Y_best = np.log(thresholds[ix]/(1-thresholds[ix])) /\
logit_res.params['antecedent_rainfall'] +\
(logit_res.params['Intercept'] +
logit_res.params['total_rainfall'] * X_best +
logit_res.params['cumm_year_rainfall'] *
REGRESSION_SAMPLE['cumm_year_rainfall'].mean()) /\
-logit_res.params['antecedent_rainfall']
plt.plot(X_best, Y_best, '-', color='black', linewidth=linewidth)
plt.text(X_best[0], Y_best[0]+5, '{:.2f}'.format(thresholds[ix]),
ha='right', va='center',
backgroundcolor='white',
fontsize=fontsize)
plt.savefig('../FIGS/scatter_categorical_plot_2.png', dpi=600, format='png')
plt.close()
# plot cumulative rainfall distributions by months
# Fig 3
RAINFAL_BY_MONTH =\
DATA[['date', 'total_rainfall']].set_index('date').resample('M').sum()
RAINFAL_BY_MONTH['Month'] =\
RAINFAL_BY_MONTH.index.strftime('%b')
plt.clf()
plt.figure(figsize=(8, 6))
ax = sns.boxplot(y="total_rainfall",
x="Month",
palette=["black"],
data=RAINFAL_BY_MONTH)
plt.setp(ax.artists, edgecolor='k', facecolor='w')
plt.setp(ax.lines, color='k')
plt.ylabel('Precipitation, mm')
plt.savefig('../FIGS/rainfall_by_month.png', dpi=600, format='png')
plt.close()
# plot cases by total rainfall and date time
CASES_fig = CASES[['date', 'total_rainfall']]
CASES_fig['month'] = CASES_fig['date'].dt.month_name()
plt.clf()
plt.figure(figsize=(8, 6))
lm = sns.lmplot(
x='date',
y='total_rainfall',
hue='month',
hue_order=['July', 'August', 'September', 'October'],
# logistic=True,
fit_reg=False,
markers=["x", "+", ".", "2"],
legend_out=True,
# palette=["black"],
data=CASES_fig)
lm = lm.set_axis_labels("Year", "Rainfall intensity, mm")
lm._legend.set_title("Landslide")
plt.savefig('../FIGS/scatter_categorical_plot_4.png', dpi=600, format='png')
plt.close()
# Partial regression plot
CORRECT_NAMES = REGRESSION_SAMPLE[['total_rainfall',
'antecedent_rainfall',
'cumm_year_rainfall',
'LANDSLIDE']]
CORRECT_NAMES.columns = ['Daily_rainfall',
'Antecedent_rainfall',
'Cumulative_precipitation',
'Landslide']
logit_mod = logit(
"Landslide ~"
" Daily_rainfall"
" + Antecedent_rainfall"
" + Cumulative_precipitation",
CORRECT_NAMES)
logit_res = logit_mod.fit()
print(logit_res.summary())
print("\n")
print(logit_res.get_margeff().summary())
plt.clf()
fig = plt.figure(figsize=(10, 8))
plot_partregress_grid(logit_res, fig=fig)
plt.savefig('../FIGS/partial_regression_plot.png', dpi=600, format='png')
plt.close()
@jamm1985
Copy link
Author

jamm1985 commented Sep 5, 2020

Historical meteorological data:

Landslide cases:

@jamm1985
Copy link
Author

jamm1985 commented Aug 16, 2021

Citation

Stepnova, Y.A., Stepnov, A.A., Konovalov, A.V. et al. Predictive Model of Rainfall-Induced Landslides in High-Density Urban Areas of the South Primorsky Region (Russia). Pure Appl. Geophys. (2021). https://doi.org/10.1007/s00024-021-02822-y

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment