Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dineshdharme/94b2455adc7d120dad505275d68d62a3 to your computer and use it in GitHub Desktop.
Save dineshdharme/94b2455adc7d120dad505275d68d62a3 to your computer and use it in GitHub Desktop.
Interpolate missing values in a timeseries
https://stackoverflow.com/questions/78304441/how-can-i-interpolate-missing-values-based-on-the-sum-of-the-gap-using-pyspark/
This was a nice fun problem to solve.
In pyspark, you can populate a column over a window specification with first not Null value or last not Null value.
Then we can also identify the groups of nulls which come together as a bunch
and then rank over them.
Once, we have those above two values, calculating the interpolated values is
just matter of arithmetic using the populated values and rank.
Here's a working example.
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, lag, lead
from pyspark.sql.window import Window
# Sample data
data = [
("1", "2023-01-01 00:00", 3, 10),
("1", "2023-01-01 01:00", None, None),
("1", "2023-01-01 02:00", None, None),
("1", "2023-01-01 03:00", None, None),
("1", "2023-01-01 04:00", 4, 20),
("1", "2023-01-01 05:00", None, None),
("1", "2023-01-01 06:00", None, None),
("1", "2023-01-01 07:00", None, None),
("1", "2023-01-01 08:00", 4, 30)
]
schema = ["user_id", "timestamp", "miles", "total_mileage"]
spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate()
df = spark.createDataFrame(data, schema=schema)
df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0))
df.show()
windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing)
windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp")
df = df.withColumn("diff_mile", col("total_mileage") - col("miles"))
# Identify the rows that are immediately after a non-null row (start of window) and
# rows immediately before a non-null row (end of window)
df = df.withColumn("last_nonnull_mileage", last("total_mileage", ignorenulls=True).over(windowSpecLast))
df = df.withColumn("first_nonnull_mileage", first("diff_mile", ignorenulls=True).over(windowSpecFirst))
df = df.withColumn("first_mileage", lead("total_mileage").over(windowSpecNormal))
df.show()
df = df.withColumn("start_col", F.when(
((F.col("total_mileage").isNotNull()) & (F.col("first_mileage").isNull())), 1).otherwise(0))
windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df = df.withColumn("groupId", F.sum("start_col").over(windowSpec))
df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(F.when(F.col("miles").isNull(), 1)).alias("nulls_in_miles"))
# Display the result
df_nulls_grouped.show()
df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner")
df_new.orderBy("timestamp").show()
windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df_ranked = df_new.withColumn("ranked", rank().over(windowSpecRankOverNulls))
df_ranked.show()
df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles"))
df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * ( col("ranked") - 1)))
df_ranked.show()
df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter")))
df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter")))
df_final.show()
df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing")
df_final.show()
Final Output :
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+
Total Output :
+-------+----------------+-----+-------------+-----------+
|user_id| timestamp|miles|total_mileage|was_missing|
+-------+----------------+-----+-------------+-----------+
| 1|2023-01-01 00:00| 3| 10| 0|
| 1|2023-01-01 01:00| NULL| NULL| 1|
| 1|2023-01-01 02:00| NULL| NULL| 1|
| 1|2023-01-01 03:00| NULL| NULL| 1|
| 1|2023-01-01 04:00| 4| 20| 0|
| 1|2023-01-01 05:00| NULL| NULL| 1|
| 1|2023-01-01 06:00| NULL| NULL| 1|
| 1|2023-01-01 07:00| NULL| NULL| 1|
| 1|2023-01-01 08:00| 4| 30| 0|
+-------+----------------+-----+-------------+-----------+
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
|user_id| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL|
| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20|
| 1|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL|
| 1|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30|
| 1|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
+-------+-------+--------------+
|groupId|user_id|nulls_in_miles|
+-------+-------+--------------+
| 1| 1| 3|
| 2| 1| 3|
| 3| 1| 0|
+-------+-------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|miles_final|total_mileage_final|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0| 3.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0| 4.0| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL| 4.0| 30.0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment