Skip to content

Instantly share code, notes, and snippets.

@nddipiazza
Created April 11, 2017 15:43
Show Gist options
  • Save nddipiazza/9b9c007e06965291dab68fe1e630d71e to your computer and use it in GitHub Desktop.
Save nddipiazza/9b9c007e06965291dab68fe1e630d71e to your computer and use it in GitHub Desktop.
Spark JDBC Job - With Joins into Field String Arrays
val jdbcUrl = "jdbc:sqlserver://SERVER_IP;user=USERNAME;password=PASSWORD"
val maxId = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person]) as tmp").select("maxId").collect()(0)(0).toString
val dbOpts = Map(
"url" -> jdbcUrl,
"dbtable" -> "[Export].[dbo].[Person]",
"partitionColumn" -> "Person_Number",
"numPartitions" -> "4",
"lowerBound" -> "0",
"upperBound" -> maxId,
"fetchSize" -> "1000"
)
var jdbcDF = sqlContext.read.format("jdbc").options(dbOpts).load
jdbcDF.registerTempTable("Person")
val maxId2 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Telephone]) as tmp").select("maxId").collect()(0)(0).toString
val dbOpts2 = Map(
"url" -> jdbcUrl,
"dbtable" -> "[Export].[dbo].[Person_Telephone]",
"partitionColumn" -> "Person_Number",
"numPartitions" -> "4",
"lowerBound" -> "0",
"upperBound" -> maxId2,
"fetchSize" -> "1000"
)
var jdbcDF2 = sqlContext.read.format("jdbc").options(dbOpts2).load
val maxId3 = sqlContext.jdbc(jdbcUrl, "(select max(Person_Number) as maxId from [Export].[dbo].[Person_Resume]) as tmp").select("maxId").collect()(0)(0).toString
val dbOpts3 = Map(
"url" -> jdbcUrl,
"dbtable" -> "[Export].[dbo].[Person_Resume]",
"partitionColumn" -> "Person_Number",
"numPartitions" -> "10",
"lowerBound" -> "0",
"upperBound" -> maxId3,
"fetchSize" -> "100"
)
var jdbcDF3 = sqlContext.read.format("jdbc").options(dbOpts3).load
var joinDF = jdbcDF.join(jdbcDF2.select("Person_Number", "Telephone_Number"), Seq("Person_Number"), "left_outer")
joinDF = joinDF.join(jdbcDF3.select("Person_Number", "Resume"), Seq("Person_Number"), "left_outer")
var joinDFPhone = joinDF.groupBy("Person_Number").agg(collect_set("Telephone_Number"))
var joinDFResume = joinDF.groupBy("Person_Number").agg(collect_set("Resume"))
var res = joinDFPhone.join(joinDFResume, Seq("Person_Number"), "left")
res.withColumnRenamed("collect_set(Telephone_Number)", "Telephone_Number_ss").withColumnRenamed("Person_Number", "id").withColumnRenamed("collect_set(Resume)", "Resume_txt").write.format("solr").option("collection", "default").save
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment