Skip to content

Instantly share code, notes, and snippets.

@stuzero
Last active September 2, 2021 16:12
Show Gist options
  • Save stuzero/6476e96925202c7cc25aae7f9d28badb to your computer and use it in GitHub Desktop.
Save stuzero/6476e96925202c7cc25aae7f9d28badb to your computer and use it in GitHub Desktop.
Melt and merge the daily US County Level Covid-19 Data from USA Facts https://usafacts.org/visualizations/coronavirus-covid-19-spread-map/
// Apache Spark version 3.1.2
// Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
val casesUrl = "https://usafactsstatic.blob.core.windows.net/public/data/covid-19/covid_confirmed_usafacts.csv"
val deathsUrl = "https://usafactsstatic.blob.core.windows.net/public/data/covid-19/covid_deaths_usafacts.csv"
def getData (dataUrl:String) : org.apache.spark.sql.DataFrame = {
var responseList = scala.io.Source.fromURL(dataUrl).mkString.stripMargin.linesIterator.toList
var csvData = spark.sparkContext.parallelize(responseList).toDS()
spark.read.option("header", true).csv(csvData)
}
def meltData (df:org.apache.spark.sql.DataFrame, metric:String) : org.apache.spark.sql.DataFrame = {
var arrDateCols = df.columns.drop(4)
var df2 = df.withColumn("combined", array(arrDateCols.map(c => struct(lit(c).alias("date"), col(c).cast("int").alias("metric"))): _*)).select("countyFIPS","stateFIPS","combined")
df2.select($"countyFIPS",$"stateFIPS", explode($"combined")).select($"countyFIPS",$"stateFIPS", to_date($"col.date","yyyy-mm-dd").alias("date"), ($"col.metric").alias(metric) )
}
val covidData = (meltData(getData(deathsUrl),"deaths")).join((meltData(getData(casesUrl),"cases")), Seq("countyFIPS","stateFIPS","date"), "full")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment