Skip to content

Instantly share code, notes, and snippets.

Last active September 29, 2021 21:45
Show Gist options
  • Save yifeihuang/45f2b6b4e504d4b20261695a6d8d00c4 to your computer and use it in GitHub Desktop.
Save yifeihuang/45f2b6b4e504d4b20261695a6d8d00c4 to your computer and use it in GitHub Desktop.
Interpolate time series data with pyspark
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w
from pyspark.sql.functions import pandas_udf, PandasUDFType
import numpy as np
import datetime
sequence = [1, 2, 4, 7, 11, 16, 22]
sequence2 = [3, 5, 6, 8, 10, 18, 20]
# create two example time series with different ticks and inconsistent spacing
df = spark.createDataFrame(
['series 1'] * len(sequence),
[datetime.datetime(2021,1,1,0,i) for i in sequence],
[i**2*np.random.uniform(0,1) for i in sequence]
+ list(zip(
['series 2'] * len(sequence2),
[datetime.datetime(2021,1,1,0,i) for i in sequence2],
[i**2*np.random.uniform(0,1) for i in sequence2]
t.StructField('id', t.StringType()),
t.StructField('time', t.TimestampType()),
t.StructField('value', t.FloatType())
# up sample the time series to regular tickspacing
regular_ticks = (
.withColumn('time', f.explode(f.sequence('min_time', 'max_time', f.lit('1 minute').cast('Interval'))))
.drop('max_time', 'min_time')
.join(df, ['id', 'time'], 'left')
# fill method 1: take last known value
fill_with_last_known = (
f.last('value', ignorenulls=True)
# fill method 2: linear interpolation
def interpolate(timestamp_col, method={'method': 'fill_zero'}, **kwargs):
Utility function to interpolate missing values in a timeseries on grouped object, assume group by key is unique id of time series
also assumes that the timestamp col is regularly spaced already
schema: output schema of the dataframe
timestamp_col: column name of timestamp
method: <map> Supported methods: fill_zero, pad (fill forward), linear, nearest, and other scipy methods
example usage
month_series.groupBy(series_id).apply(interpolate(month_series.schema, "month", {'method': 'fill_zero'}))
series_id here refers to the key/keys of the time series WITHOUT the time column
def _(pdf):
pdf.set_index(timestamp_col, inplace=True)
pdf.sort_index(axis=0, inplace=True)
if method['method'] == 'fill_zero':
pdf.fillna(0, inplace=True)
pdf.interpolate(**method, inplace=True)
pdf.reset_index(drop=False, inplace=True)
return pdf
return _
linear_interpolate = (
interpolate('time', {'method': 'linear'}),
#fill method 3: quadratic interpolation
quadratic_interpolate = (
interpolate('time', {'method': 'quadratic'}),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment