Skip to content

Instantly share code, notes, and snippets.

@amuramatsu
Last active June 6, 2023 18:56
Show Gist options
  • Save amuramatsu/1e56a7dec3d80bbacfb6c8bf3b51e3fa to your computer and use it in GitHub Desktop.
Save amuramatsu/1e56a7dec3d80bbacfb6c8bf3b51e3fa to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2023 MURAMATSU Atsushi <amura1977@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import itertools
import functools
import operator
from typing import Any, Union, Optional, Iterable, Sequence, Dict
import numpy as np
import pandas as pd
from scipy import interpolate #type: ignore
tqdm = None
try:
from tqdm import tqdm #type: ignore
except ImportError:
pass
def pandas_addnews(
df: pd.DataFrame,
x_column: str,
x_data: Iterable[Union[float, int]],
fill_na: Optional[Iterable[str]]=None) -> pd.DataFrame:
"""Add room for new data.
This function add new data rows with indicated column.
Args:
df: DataFrame
x_column: column name for data add
x_data: adding values
fill_na: filled with NaN columns
Returns:
New DataFrame
"""
dfone = df[df[x_column] == df[x_column][0]].copy(deep=False)
if fill_na is not None:
for k in fill_na:
dfone[k] = np.nan
df_new = df
for d in x_data:
dfx = dfone.copy(deep=False)
dfx[x_column] = d
df_new = pd.concat([df_new, dfx])
return df_new
def pandas_interpolate(
df: pd.DataFrame,
x_column: str,
x_data: Iterable[Union[float, int]],
fill_columns: Iterable[str],
kind: str="polybest",
progress: Union[None,bool,Dict[str,Any]]=None) -> pd.DataFrame:
"""Add interpolated data.
This function add new data rows with interpolate data
Args:
df: DataFrame
x_column: column name for data add
x_data: adding values
fill_columns: target columns for interpolate
kind: interpolate function
(like as scipy.interpolate.inter1d, or "polybest")
Returns:
New DataFrame
"""
df_tmp = pandas_addnews(df, x_column, x_data, fill_na=fill_columns)
df_tmp.sort_values(
x_column, inplace=True, ignore_index=True)
df_mat = df.drop(columns=[x_column] + list(fill_columns))
df_mat.drop_duplicates(
inplace=True, ignore_index=True)
df_mat_c = df_mat.columns
df_mat_v = [ set(df_mat[n].values) for n in df_mat_c ]
df_new = None
iter_ = itertools.product(*df_mat_v)
if tqdm is not None and progress:
df_mat_total = functools.reduce(operator.mul, (len(v) for v in df_mat_v), 1)
if isinstance(progress, dict):
iter_ = tqdm(iter_, total=df_mat_total, **progress)
else:
iter_ = tqdm(iter_, total=df_mat_total)
for vs in iter_:
mask = np.array([ True ] * len(df_tmp))
for c, v in zip(df_mat_c, vs):
mask *= (df_tmp[c] == v)
dfx = df_tmp[mask].copy()
X = dfx[x_column].values
for y_column in fill_columns:
Y = dfx[y_column].values
nans = np.isnan(Y) #type: ignore
X_ = X[~nans]
if kind == "polybest":
if len(X_) == 2:
k = "sliner"
elif len(X_) == 3:
k = "quadratic"
else:
k = "cubic"
else:
k = kind
f = interpolate.interp1d(
X_, Y[~nans],
kind=k, fill_value="extrapolate",
)
Y[nans] = f(X[nans])
dfx[y_column] = Y
if df_new is None:
df_new = dfx
else:
df_new = pd.concat([df_new, dfx])
if df_new is None:
raise Exception()
df_new.sort_values(x_column, inplace=True, ignore_index=True)
return df_new
if __name__ == "__main__":
import random
from pprint import pprint
import matplotlib.pyplot as plt #type: ignore
I = np.arange(6) * 0.5
A = [ 1, 2, 3 ]
B = [ 10, 11 ]
m = {
"I": [], "A": [], "B": [],
"V1": [], "V2": [], "V3": [],
} #type: ignore
for i, a, b in itertools.product(I, A, B):
m["I"].append(i)
m["A"].append(a)
m["B"].append(b)
m["V1"].append(i * a + b)
m["V2"].append(i**a - b)
m["V3"].append(i * 2 * a + b - random.random())
df = pd.DataFrame(m)
print(df)
df2 = pandas_interpolate(df, "I", [ -1.2, 1.3, 5.0 ],
[ "V1", "V2", "V3" ],
progress={ "ascii": True })
print(df2)
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V1")
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V2", ax=ax)
ax = df2[(df2.A == 2) & (df2.B == 11)].plot(x="I", y="V3", ax=ax)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment