Skip to content

Instantly share code, notes, and snippets.

@zouzias
Last active October 28, 2021 14:25
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zouzias/44723de11222535223fe59b4b0bc228c to your computer and use it in GitHub Desktop.
Save zouzias/44723de11222535223fe59b4b0bc228c to your computer and use it in GitHub Desktop.
Spark DataFrame zipWithIndex
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType}
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment