Skip to content

Instantly share code, notes, and snippets.

@hedgertronic
Last active February 27, 2023 20:01
Show Gist options
  • Save hedgertronic/92eb84500ace3efb83007aa6a9f65ccb to your computer and use it in GitHub Desktop.
Save hedgertronic/92eb84500ace3efb83007aa6a9f65ccb to your computer and use it in GitHub Desktop.
Add induced movement, spin axis, and other spin columns to Statcast data using Alan Nathan's formulas.
"""Add spin coluns to Statcast data.
The formulas used to add these columns comes from Alan Nathan's research. The original
paper can be found at http://baseball.physics.illinois.edu/trackman/SpinAxis.pdf and
the spreadsheet template that was the basis for this code can be found at
http://baseball.physics.illinois.edu/trackman/MovementSpinEfficiencyTemplate-v2.xlsx.
"""
from typing import cast
import numpy as np
import polars as pl
#######################################################################################
# SPIN CONSTANTS
#######################################################################################
# Environmental constant
K = 0.005383
# Standard gravity in ft/s^2
G = 32.174
# Distance of the pitcher's mound in feet
MOUND_DISTANCE = 60.5
# Distance of the initial Statcast measurement in feet
STATCAST_INITIAL_MEASUREMENT = 50.0
# Number of inches in a foot
INCHES_PER_FOOT = 12.0
# Length of the plate in feet
PLATE_LENGTH = 17.0 / INCHES_PER_FOOT
# Theoretical height of a parallel release point
ECKERSLEY_LINE = 2.85
COMPUTATION_COLUMNS = [
"release_speed",
"release_spin_rate",
"release_extension",
"release_pos_x",
"release_pos_z",
"vx0",
"vy0",
"vz0",
"ax",
"ay",
"az",
"plate_x",
"plate_z",
]
#######################################################################################
# PUBLIC API
#######################################################################################
def spin_columns(data: pl.DataFrame) -> pl.DataFrame:
if data.is_empty():
return data
spin = data[COMPUTATION_COLUMNS]
if len([s.name for s in spin if s.null_count() != spin.height]) != len(
spin.columns
):
return data
spin = _compute_release_angle(spin)
spin = _compute_bauer_units(spin)
data = data.with_columns([spin["release_angle"], spin["bauer_units"]])
return _nathan_fields(data)
def _nathan_fields(data: pl.DataFrame) -> pl.DataFrame:
nathan = data[COMPUTATION_COLUMNS]
nathan = _compute_release_point_y(nathan)
nathan = _compute_release_time(nathan)
nathan = _compute_velocity_components_at_release(nathan)
nathan = _compute_flight_time(nathan)
nathan = _compute_induced_movement(nathan)
nathan = _compute_average_velocity_components(nathan)
nathan = _compute_average_velocity(nathan)
nathan = _compute_average_drag(nathan)
nathan = _compute_magnus_acceleration_components(nathan)
nathan = _compute_magnus_acceleration(nathan)
nathan = _compute_magnus_movement(nathan)
nathan = _compute_drag_coefficient(nathan)
nathan = _compute_lift_coefficient(nathan)
nathan = _compute_spin_factor(nathan)
nathan = _compute_transverse_spin(nathan)
nathan = _compute_transverse_spin_components(nathan)
nathan = _compute_phi(nathan)
nathan = _compute_tilt_and_spin_direction(nathan)
nathan = _compute_spin_efficiency(nathan)
nathan = _compute_gyro_angle(nathan)
data = data.with_columns(
[
nathan["x_mvt"].alias("induced_horz_break"),
nathan["z_mvt"].alias("induced_vert_break"),
nathan["decimal_tilt"],
nathan["tilt"],
nathan["spin_direction"],
nathan["spin_efficiency"],
nathan["theta"].alias("gyro_angle"),
]
)
return data.fill_nan(None)
#######################################################################################
# COMPUTATION FUNCTIONS
#######################################################################################
def _compute_release_angle(data: pl.DataFrame) -> pl.DataFrame:
release_pos_x = data["release_pos_x"]
release_pos_z = data["release_pos_z"]
release_angle = np.degrees(
np.arctan2((release_pos_z + -ECKERSLEY_LINE), abs(release_pos_x))
)
return data.with_column(release_angle.alias("release_angle"))
def _compute_bauer_units(data: pl.DataFrame) -> pl.DataFrame:
release_spin_rate = data["release_spin_rate"]
release_speed = data["release_speed"]
bauer_units = release_spin_rate / release_speed
return data.with_column(bauer_units.alias("bauer_units"))
def _compute_release_point_y(data: pl.DataFrame) -> pl.DataFrame:
release_extension = data["release_extension"]
release_pos_y = MOUND_DISTANCE - release_extension
return data.with_column(release_pos_y.alias("release_pos_y"))
def _compute_release_time(data: pl.DataFrame) -> pl.DataFrame:
vy0 = data["vy0"]
ay = data["ay"]
release_pos_y = data["release_pos_y"]
tR = _time_in_air(vy0, ay, release_pos_y, STATCAST_INITIAL_MEASUREMENT, False)
return data.with_column(tR.alias("tR"))
def _compute_velocity_components_at_release(data: pl.DataFrame) -> pl.DataFrame:
vx0 = data["vx0"]
vy0 = data["vy0"]
vz0 = data["vz0"]
ax = data["ax"]
ay = data["ay"]
az = data["az"]
tR = data["tR"]
vxR = vx0 + (ax * tR)
vyR = vy0 + (ay * tR)
vzR = vz0 + (az * tR)
return data.with_columns([vxR.alias("vxR"), vyR.alias("vyR"), vzR.alias("vzR")])
def _compute_flight_time(data: pl.DataFrame) -> pl.DataFrame:
vyR = data["vyR"]
ay = data["ay"]
release_pos_y = data["release_pos_y"]
tf = _time_in_air(vyR, ay, release_pos_y, PLATE_LENGTH, True)
return data.with_column(tf.alias("tf"))
def _compute_induced_movement(data: pl.DataFrame) -> pl.DataFrame:
plate_x = data["plate_x"]
plate_z = data["plate_z"]
vxR = data["vxR"]
vyR = data["vyR"]
vzR = data["vzR"]
release_pos_x = data["release_pos_x"]
release_pos_y = data["release_pos_y"]
release_pos_z = data["release_pos_z"]
tf = data["tf"]
x_mvt = -(
(plate_x - release_pos_x - (vxR / vyR) * (PLATE_LENGTH - release_pos_y))
* INCHES_PER_FOOT
)
z_mvt = (
plate_z
- release_pos_z
- (vzR / vyR) * (PLATE_LENGTH - release_pos_y)
+ (0.5 * G * (tf**2))
) * INCHES_PER_FOOT
return data.with_columns([x_mvt.alias("x_mvt"), z_mvt.alias("z_mvt")])
def _compute_average_velocity_components(data: pl.DataFrame) -> pl.DataFrame:
vxR = data["vxR"]
vyR = data["vyR"]
vzR = data["vzR"]
ax = data["ax"]
ay = data["ay"]
az = data["az"]
tf = data["tf"]
vxbar = ((2 * vxR) + (ax * tf)) / 2
vybar = ((2 * vyR) + (ay * tf)) / 2
vzbar = ((2 * vzR) + (az * tf)) / 2
return data.with_columns(
[vxbar.alias("vxbar"), vybar.alias("vybar"), vzbar.alias("vzbar")]
)
def _compute_average_velocity(data: pl.DataFrame) -> pl.DataFrame:
vxbar = data["vxbar"]
vybar = data["vybar"]
vzbar = data["vzbar"]
vbar = _n_component_mean(vxbar, vybar, vzbar)
return data.with_column(vbar.alias("vbar"))
def _compute_average_drag(data: pl.DataFrame) -> pl.DataFrame:
ax = data["ax"]
ay = data["ay"]
az = data["az"]
vxbar = data["vxbar"]
vybar = data["vybar"]
vzbar = data["vzbar"]
vbar = data["vbar"]
adrag = -((ax * vxbar) + (ay * vybar) + ((az + G) * vzbar)) / vbar
return data.with_column(adrag.alias("adrag"))
def _compute_magnus_acceleration_components(data: pl.DataFrame) -> pl.DataFrame:
ax = data["ax"]
ay = data["ay"]
az = data["az"]
vxbar = data["vxbar"]
vybar = data["vybar"]
vzbar = data["vzbar"]
adrag = data["adrag"]
vbar = data["vbar"]
amagx = ax + (adrag * vxbar / vbar)
amagy = ay + (adrag * vybar / vbar)
amagz = az + (adrag * vzbar / vbar) + G
return data.with_columns(
[amagx.alias("amagx"), amagy.alias("amagy"), amagz.alias("amagz")]
)
def _compute_magnus_acceleration(data: pl.DataFrame) -> pl.DataFrame:
amagx = data["amagx"]
amagy = data["amagy"]
amagz = data["amagz"]
amag = _n_component_mean(amagx, amagy, amagz)
return data.with_column(amag.alias("amag"))
def _compute_magnus_movement(data: pl.DataFrame) -> pl.DataFrame:
amagx = data["amagx"]
amagz = data["amagz"]
tf = data["tf"]
Mx = 0.5 * amagx * (tf**2) * INCHES_PER_FOOT
Mz = 0.5 * amagz * (tf**2) * INCHES_PER_FOOT
return data.with_columns([Mx.alias("Mx"), Mz.alias("Mz")])
def _compute_drag_coefficient(data: pl.DataFrame) -> pl.DataFrame:
adrag = data["adrag"]
vbar = data["vbar"]
Cd = adrag / ((vbar**2) * K)
return data.with_column(Cd.alias("Cd"))
def _compute_lift_coefficient(data: pl.DataFrame) -> pl.DataFrame:
amag = data["amag"]
vbar = data["vbar"]
Cl = amag / ((vbar**2) * K)
return data.with_column(Cl.alias("Cl"))
def _compute_spin_factor(data: pl.DataFrame) -> pl.DataFrame:
Cl = data["Cl"]
Cl_adj = (
pl.when(cast(pl.Expr, Cl >= 0.336)).then(np.nan).otherwise(cast(pl.Expr, Cl))
)
S = 0.1666 * np.log(0.336 / (0.336 - Cl_adj))
return data.with_column(S.alias("S"))
def _compute_transverse_spin(data: pl.DataFrame) -> pl.DataFrame:
S = data["S"]
vbar = data["vbar"]
spinT = 78.92 * S * vbar
return data.with_column(spinT.alias("spinT"))
def _compute_transverse_spin_components(data: pl.DataFrame) -> pl.DataFrame:
spinT = data["spinT"]
vxbar = data["vxbar"]
vybar = data["vybar"]
vzbar = data["vzbar"]
amagx = data["amagx"]
amagy = data["amagy"]
amagz = data["amagz"]
vbar = data["vbar"]
amag = data["amag"]
spinTx = spinT * ((vybar * amagz) - (vzbar * amagy)) / (amag * vbar)
spinTy = spinT * ((vzbar * amagx) - (vxbar * amagz)) / (amag * vbar)
spinTz = spinT * ((vxbar * amagy) - (vybar * amagx)) / (amag * vbar)
return data.with_columns(
[spinTx.alias("spinTx"), spinTy.alias("spinTy"), spinTz.alias("spinTz")]
)
def _compute_phi(data: pl.DataFrame) -> pl.DataFrame:
amagz = data["amagz"]
amagx = data["amagx"]
arctan = np.arctan2(amagz, -amagx)
phi = (
pl.when(cast(pl.Expr, amagz > 0))
.then(cast(pl.Expr, arctan))
.otherwise(cast(pl.Expr, 360 + arctan * 180 / np.pi))
)
return data.with_column(phi.alias("phi"))
def _compute_tilt_and_spin_direction(data: pl.DataFrame) -> pl.DataFrame:
phi = data["phi"]
decimal_tilt = 3 - (1 / 30) * phi
decimal_tilt_adj = (
pl.when(cast(pl.Expr, decimal_tilt <= 0))
.then(decimal_tilt + 12)
.otherwise(cast(pl.Expr, decimal_tilt))
)
data = data.with_column(decimal_tilt_adj.alias("decimal_tilt"))
tilt = _decimal_tilt_to_tilt(data["decimal_tilt"])
data = data.with_column(tilt.alias("tilt"))
spin_direction = _tilt_to_spin_direction(data["tilt"])
data = data.with_column(spin_direction.alias("spin_direction"))
return data
# Maybe adjust spin efficiency to match distribution found on "active spin"
# leaderboards?
def _compute_spin_efficiency(data: pl.DataFrame) -> pl.DataFrame:
spinT = data["spinT"]
release_spin_rate = data["release_spin_rate"]
spin_efficiency = spinT / release_spin_rate
return data.with_column(spin_efficiency.alias("spin_efficiency"))
def _compute_gyro_angle(data: pl.DataFrame) -> pl.DataFrame:
spin_efficiency = data["spin_efficiency"]
spin_efficiency_adj = (
pl.when(cast(pl.Expr, ((1.0 >= spin_efficiency) & (spin_efficiency >= -1.0))))
.then(spin_efficiency)
.otherwise(-1.0)
)
theta = (
pl.when(spin_efficiency_adj > 0)
.then(spin_efficiency_adj.arccos() * 180 / np.pi)
.otherwise(np.nan)
)
return data.with_column(theta.alias("theta"))
#######################################################################################
# HELPER FUNCTIONS
#######################################################################################
def _time_in_air(
velocity, acceleration, position, adjustment: float, positive: bool = True
) -> pl.Series:
if positive:
direction_factor = 1
else:
direction_factor = -1
return (
-velocity
- np.sqrt(
(velocity**2)
- (2 * acceleration * (direction_factor * (position - adjustment)))
)
) / acceleration
def _n_component_mean(*args) -> pl.Series:
return np.sqrt(sum(component**2 for component in args))
def _decimal_tilt_to_tilt(decimal_tilt: pl.Series, minutes_round: int = 5) -> pl.Expr:
hours = decimal_tilt.floor().cast(int)
minutes = minutes_round * ((decimal_tilt * 60 % 60) / minutes_round).cast(int)
hours_adj = (
pl.when(cast(pl.Expr, minutes == 60))
.then((hours + 1) % 12)
.otherwise(cast(pl.Expr, hours))
)
minutes_adj = (
pl.when(cast(pl.Expr, minutes == 60)).then(0).otherwise(cast(pl.Expr, minutes))
)
minutes_formatted = minutes_adj.apply("{:0>2}".format)
return hours_adj.cast(str) + ":" + minutes_formatted.cast(str)
def _tilt_to_spin_direction(tilt: pl.Series) -> pl.Series:
tilt_time = tilt.str.strptime(pl.Time, fmt="%H:%M")
hours = tilt_time.dt.hour()
minutes = tilt_time.dt.minute()
return ((hours * 30) + (minutes / 2) + 180) % 360
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment