Skip to content

Instantly share code, notes, and snippets.

@janpipek
Created August 16, 2022 16:59
Show Gist options
  • Save janpipek/329d42bd2f12fd49549a01a8591b0cd6 to your computer and use it in GitHub Desktop.
Save janpipek/329d42bd2f12fd49549a01a8591b0cd6 to your computer and use it in GitHub Desktop.
Normalize series
import numbers
from typing import Literal, Optional, Union
import numpy as np
import pandas as pd
def normalize_series(
series: pd.Series,
*,
new_sum: Union[numbers.Real, Literal["count"]] = "count",
weights: Optional[pd.Series] = None,
) -> pd.Series:
"""Normalize a series to a new target sum.
Args:
series: The series to perform on
weights: Optional weights for the sum (scale-independent)
new_sum:
"count" (default) => scale to a (weighted) average of 1.0
number => the total (weighted) sum will be equal to this (1.0 to get fractions)
Returns:
A new series with non-na values replaced, whose (potentially weighted) sum
is equal to `new_sum`.
Example:
>>> normalize_series(pd.Series([1, 2, 3])).tolist()
[0.5, 1.0, 1.5]
>>> normalize_series(pd.Series([1, np.nan, 2]), new_sum=1.0).tolist()
[0.3333333333333333, nan, 0.6666666666666666]
>>> normalize_series(pd.Series([4, 2, 1]), weights=pd.Series([1, 0, 96])).tolist()
[3.88, 1.94, 0.97]
Note that it is possible to normalize empty or all-NaN series only
if the new_sum is not explicitly specified ("mean").
"""
if weights is not None:
if not weights.index.equals(series.index):
raise ValueError(
f"Different indices for the series: {series.index} and the weights: {weights.index}"
)
if weights[series.notna()].isna().any():
raise ValueError(
"Weights must be defined for all non-na items of the series."
)
# Only use weights where applicable
weights = weights.where(series.notna(), np.nan)
weights = weights * weights.count() / weights.sum() # => mean=1.0
current_sum = (series * weights).sum()
else:
current_sum = series.sum()
if new_sum == "count":
# Trivial with all NaNs or empty series
new_sum = series.count()
if new_sum == 0.0:
return series.copy()
if not np.isfinite(new_sum):
raise ValueError(f"The target weight must be finite: {new_sum}")
if current_sum == 0.0:
raise ValueError(f"Cannot normalize a series with zero sum: {series}")
if not np.isfinite(current_sum):
raise ValueError(f"Cannot normalize a series with infinite sum: {series}")
return series * new_sum / current_sum
import numpy as np
import pandas as pd
import pytest
class TestNormalizeSeries:
@pytest.mark.parametrize(
"values,expected",
[
# TODO: add ids
[[], []],
[[1, 2, 3], [0.5, 1.0, 1.5]],
[[1], [1.0]],
[[2, 0.5, np.nan], [1.6, 0.4, np.nan]],
[[np.nan], [np.nan]],
],
)
def test_for_count(self, values, expected):
values = pd.Series(values)
result = normalize_series(values)
assert_series_equal(result, pd.Series(expected))
@pytest.mark.parametrize(
"values",
[
[0.0],
[0.0, np.nan],
[-1.0, 1.0],
],
)
def test_with_zero_mean(self, values):
with pytest.raises(ValueError, match="Cannot normalize a series with zero sum"):
values = pd.Series(values)
normalize_series(values)
@pytest.mark.parametrize(
"values",
[
[np.inf],
[0.0, np.inf],
[np.inf, -np.inf],
],
)
def test_with_infinite_sum(self, values):
with pytest.raises(
ValueError, match="Cannot normalize a series with infinite sum"
):
values = pd.Series(values)
normalize_series(values)
@pytest.mark.parametrize(
"values,new_sum,expected",
[
[[1, 2, 5], 1.0, [0.125, 0.25, 0.625]],
[[1], 1.33, [1.33]],
[[2, 0.5, np.nan], 4, [3.2, 0.8, np.nan]],
],
)
def test_with_valid_target_weight(self, values, new_sum, expected):
values = pd.Series(values)
result = normalize_series(values, new_sum=new_sum)
assert_series_equal(result, pd.Series(expected))
@pytest.mark.parametrize("new_sum", [np.inf, np.nan])
def test_with_invalid_target_weight(self, new_sum):
with pytest.raises(ValueError, match="The target weight must be finite"):
normalize_series(pd.Series([1, 2]), new_sum=new_sum)
def test_keeps_index(self):
x = pd.Series({"a": 1, "b": 2})
result = normalize_series(x)
expected_index = pd.Index(["a", "b"])
assert_index_equal(result.index, expected_index)
@pytest.mark.parametrize(
"weights,expected",
[
pytest.param([1, 1], [0.5, 1.5], id="identity"),
pytest.param([2, 2], [0.5, 1.5], id="identity-scaled"),
pytest.param([0.01, 1], [0.3355, 1.006], id="one-very-small"),
pytest.param([0, 10], [0.3333, 1], id="one-ignored"),
],
)
def test_with_weights(self, weights, expected):
series = pd.Series([1, 3])
weights = pd.Series(weights)
expected = pd.Series(expected)
result = normalize_series(series, weights=weights)
assert_series_equal(result, expected, rtol=1e-3)
@pytest.mark.parametrize(
"weights_index",
[
pytest.param([0, 1], id="short"),
pytest.param([0, 1, 2, 3], id="long"),
pytest.param([1, 0, 3], id="different"),
],
)
def test_weights_with_an_invalid_index(self, weights_index):
series = pd.Series([1, 2, 3])
weights = pd.Series(1, index=weights_index)
with pytest.raises(ValueError, match="Different indices"):
normalize_series(series, weights=weights)
# TODO: Add some hypothesis tests to verify the weighted sum invariant
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment