Skip to content

Instantly share code, notes, and snippets.

@Ackuq
Last active June 4, 2022 00:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ackuq/8bb53e2ab0e24c9c19646f560bdb1d26 to your computer and use it in GitHub Desktop.
Save Ackuq/8bb53e2ab0e24c9c19646f560bdb1d26 to your computer and use it in GitHub Desktop.
Pre-processing for the Yelp dataset
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, to_timestamp
TS_FORMAT = "yyyy-MM-dd HH:mm:ss"
checkins_path = ...
tips_path = ...
spark = SparkSession.builder.master("local").appName("preprocessing").getOrCreate()
checkins = spark.read.json(checkins_path)
tips = spark.read.json(tips_path)
checkins_exploded = checkins.select(
checkins["business_id"], explode(split(checkins["date"], ", ")).alias("date")
)
checkins_ts = (
checkins_exploded.withColumn(
"ts", to_timestamp(checkins_exploded["date"], TS_FORMAT).cast("long")
)
.drop("date")
.orderBy("business_id", "ts")
)
tips_ts = (
tips.withColumn("ts", to_timestamp(tips["date"], TS_FORMAT).cast("long"))
.drop("date")
.orderBy("business_id", "ts")
)
tips_ts.write.parquet("target/yelp_tips.parquet")
checkins_ts.write.parquet("target/yelp_checkins.parquet")
object Yelp {
final val TS_FORMAT = "yyyy-MM-dd HH:mm:ss"
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("Preprocessing")
.getOrCreate()
val checkIns = spark.read.json(
CHECK_IN_PATH
)
val tips = spark.read.json(
TIP_PATH
)
val checkInsExploded =
checkIns.select(
checkIns("business_id"),
explode(split(checkIns("date"), ", ")).as("date")
)
val checkInsTS =
checkInsExploded
.withColumn(
"ts",
to_timestamp(checkInsExploded("date"), TS_FORMAT).cast("long")
)
.drop("date")
.orderBy(col("business_id"), col("ts"))
val tipsTS =
tips
.withColumn("ts", to_timestamp(tips("date"), TS_FORMAT).cast("long"))
.drop("date")
.orderBy(col("business_id"), col("ts"))
tipsTS.write.parquet("target/yelp_tips.parquet")
checkInsTS.write.parquet("target/yelp_checkins.parquet")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment