Created
April 11, 2024 12:28
-
-
Save dineshdharme/94b2455adc7d120dad505275d68d62a3 to your computer and use it in GitHub Desktop.
Interpolate missing values in a timeseries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://stackoverflow.com/questions/78304441/how-can-i-interpolate-missing-values-based-on-the-sum-of-the-gap-using-pyspark/ | |
This was a nice fun problem to solve. | |
In pyspark, you can populate a column over a window specification with first not Null value or last not Null value. | |
Then we can also identify the groups of nulls which come together as a bunch | |
and then rank over them. | |
Once, we have those above two values, calculating the interpolated values is | |
just matter of arithmetic using the populated values and rank. | |
Here's a working example. | |
from pyspark.sql.functions import * | |
import pyspark.sql.functions as F | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import col, sum, when, lag, lead | |
from pyspark.sql.window import Window | |
# Sample data | |
data = [ | |
("1", "2023-01-01 00:00", 3, 10), | |
("1", "2023-01-01 01:00", None, None), | |
("1", "2023-01-01 02:00", None, None), | |
("1", "2023-01-01 03:00", None, None), | |
("1", "2023-01-01 04:00", 4, 20), | |
("1", "2023-01-01 05:00", None, None), | |
("1", "2023-01-01 06:00", None, None), | |
("1", "2023-01-01 07:00", None, None), | |
("1", "2023-01-01 08:00", 4, 30) | |
] | |
schema = ["user_id", "timestamp", "miles", "total_mileage"] | |
spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate() | |
df = spark.createDataFrame(data, schema=schema) | |
df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0)) | |
df.show() | |
windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0) | |
windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing) | |
windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp") | |
df = df.withColumn("diff_mile", col("total_mileage") - col("miles")) | |
# Identify the rows that are immediately after a non-null row (start of window) and | |
# rows immediately before a non-null row (end of window) | |
df = df.withColumn("last_nonnull_mileage", last("total_mileage", ignorenulls=True).over(windowSpecLast)) | |
df = df.withColumn("first_nonnull_mileage", first("diff_mile", ignorenulls=True).over(windowSpecFirst)) | |
df = df.withColumn("first_mileage", lead("total_mileage").over(windowSpecNormal)) | |
df.show() | |
df = df.withColumn("start_col", F.when( | |
((F.col("total_mileage").isNotNull()) & (F.col("first_mileage").isNull())), 1).otherwise(0)) | |
windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0) | |
df = df.withColumn("groupId", F.sum("start_col").over(windowSpec)) | |
df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(F.when(F.col("miles").isNull(), 1)).alias("nulls_in_miles")) | |
# Display the result | |
df_nulls_grouped.show() | |
df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner") | |
df_new.orderBy("timestamp").show() | |
windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0) | |
df_ranked = df_new.withColumn("ranked", rank().over(windowSpecRankOverNulls)) | |
df_ranked.show() | |
df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles")) | |
df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * ( col("ranked") - 1))) | |
df_ranked.show() | |
df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter"))) | |
df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter"))) | |
df_final.show() | |
df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing") | |
df_final.show() | |
Final Output : | |
+-------+----------------+-----------+-------------------+-----------+ | |
|user_id| timestamp|miles_final|total_mileage_final|was_missing| | |
+-------+----------------+-----------+-------------------+-----------+ | |
| 1|2023-01-01 00:00| 3.0| 10.0| 0| | |
| 1|2023-01-01 01:00| 2.0| 12.0| 1| | |
| 1|2023-01-01 02:00| 2.0| 14.0| 1| | |
| 1|2023-01-01 03:00| 2.0| 16.0| 1| | |
| 1|2023-01-01 04:00| 4.0| 20.0| 0| | |
| 1|2023-01-01 05:00| 2.0| 22.0| 1| | |
| 1|2023-01-01 06:00| 2.0| 24.0| 1| | |
| 1|2023-01-01 07:00| 2.0| 26.0| 1| | |
| 1|2023-01-01 08:00| 4.0| 30.0| 0| | |
+-------+----------------+-----------+-------------------+-----------+ | |
Total Output : | |
+-------+----------------+-----+-------------+-----------+ | |
|user_id| timestamp|miles|total_mileage|was_missing| | |
+-------+----------------+-----+-------------+-----------+ | |
| 1|2023-01-01 00:00| 3| 10| 0| | |
| 1|2023-01-01 01:00| NULL| NULL| 1| | |
| 1|2023-01-01 02:00| NULL| NULL| 1| | |
| 1|2023-01-01 03:00| NULL| NULL| 1| | |
| 1|2023-01-01 04:00| 4| 20| 0| | |
| 1|2023-01-01 05:00| NULL| NULL| 1| | |
| 1|2023-01-01 06:00| NULL| NULL| 1| | |
| 1|2023-01-01 07:00| NULL| NULL| 1| | |
| 1|2023-01-01 08:00| 4| 30| 0| | |
+-------+----------------+-----+-------------+-----------+ | |
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+ | |
|user_id| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage| | |
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+ | |
| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| | |
| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| | |
| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| | |
| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| | |
| 1|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| | |
| 1|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| | |
| 1|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| | |
| 1|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| | |
| 1|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| | |
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+ | |
+-------+-------+--------------+ | |
|groupId|user_id|nulls_in_miles| | |
+-------+-------+--------------+ | |
| 1| 1| 3| | |
| 2| 1| 3| | |
| 3| 1| 0| | |
+-------+-------+--------------+ | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+ | |
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+ | |
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| | |
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| | |
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| | |
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| | |
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| | |
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| | |
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| | |
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| | |
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+ | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+ | |
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+ | |
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| | |
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| | |
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| | |
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| | |
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1| | |
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| | |
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| | |
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| | |
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+ | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+ | |
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+ | |
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0| | |
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0| | |
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0| | |
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0| | |
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0| | |
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0| | |
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0| | |
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0| | |
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+ | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+ | |
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|miles_final|total_mileage_final| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+ | |
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0| 3.0| 10.0| | |
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0| 2.0| 12.0| | |
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0| 2.0| 14.0| | |
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0| 2.0| 16.0| | |
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0| 4.0| 20.0| | |
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0| 2.0| 22.0| | |
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0| 2.0| 24.0| | |
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0| 2.0| 26.0| | |
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL| 4.0| 30.0| | |
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+ | |
+-------+----------------+-----------+-------------------+-----------+ | |
|user_id| timestamp|miles_final|total_mileage_final|was_missing| | |
+-------+----------------+-----------+-------------------+-----------+ | |
| 1|2023-01-01 00:00| 3.0| 10.0| 0| | |
| 1|2023-01-01 01:00| 2.0| 12.0| 1| | |
| 1|2023-01-01 02:00| 2.0| 14.0| 1| | |
| 1|2023-01-01 03:00| 2.0| 16.0| 1| | |
| 1|2023-01-01 04:00| 4.0| 20.0| 0| | |
| 1|2023-01-01 05:00| 2.0| 22.0| 1| | |
| 1|2023-01-01 06:00| 2.0| 24.0| 1| | |
| 1|2023-01-01 07:00| 2.0| 26.0| 1| | |
| 1|2023-01-01 08:00| 4.0| 30.0| 0| | |
+-------+----------------+-----------+-------------------+-----------+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment