Skip to content

Instantly share code, notes, and snippets.

@rampage644
Last active June 19, 2019 11:54
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rampage644/f0fdb964b7f437beba62 to your computer and use it in GitHub Desktop.
Save rampage644/f0fdb964b7f437beba62 to your computer and use it in GitHub Desktop.
spark etl sample, attempt #1
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import com.databricks.spark.csv.CsvSchemaRDD
import org.apache.spark.sql.functions._
object JDBCTester {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("JDBC Tester")
val sc = new SparkContext(conf)
val sqlCtx = new SQLContext(sc)
import sqlCtx.implicits._
val volumes = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volumes.data", "header" -> "false")).toDF("CREATED_AT",
"UPDATED_AT",
"DELETED_AT",
"DELETED",
"ID",
"EC2_ID",
"USER_ID",
"PROJECT_ID",
"HOST",
"SIZE",
"AVAILABILITY_ZONE",
"INSTANCE_UUID",
"MOUNTPOINT",
"ATTACH_TIME",
"STATUS",
"ATTACH_STATUS",
"SCHEDULED_AT",
"LAUNCHED_AT",
"TERMINATED_AT",
"DISPLAY_NAME",
"DISPLAY_DESCRIPTION",
"PROVIDER_LOCATION",
"PROVIDER_AUTH",
"SNAPSHOT_ID",
"VOLUME_TYPE_ID",
"SOURCE_SYSTEM_NAME",
"SOURCE_VOLID",
"bootable",
"attached_host",
"provider_geometry",
"_name_id",
"encryption_key_id",
"migration_status").as('V)
val volume_type = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volume_types.data", "header" -> "false")).toDF(
"CREATED_AT","UPDATED_AT","DELETED_AT","DELETED","ID","NAME","SOURCE_SYSTEM_NAME", "DC").as('VT)
val description_ = udf((source: String, name:String) => s"$name in $source datacenter")
val instanceType = udf( () => "cbs")
val keyGenerator = udf( () => 0)
val getdate = udf( () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date()))
val instance = udf( () => "m_Dim_Instance")
val source_system = udf( () => "cbs_instance")
val current_record = udf( () => 1)
val extended_table = udf( () => "Dim_Instance_Extended_CBS")
volumes.join(volume_type, $"VT.ID" === $"V.VOLUME_TYPE_ID", "inner")
.select(
keyGenerator() as 'Instance_Key,
$"V.ID" as 'Instance_NK,
instanceType() as 'Instance_Type,
$"V.ID" as 'Assigned_Instance_Number,
'PROJECT_ID as 'Assigned_Account_Number,
'DISPLAY_NAME as 'Instance_Name,
description_($"V.SOURCE_SYSTEM_NAME", 'NAME) as 'Instance_Description,
'STATUS as 'Instance_Status,
$"V.SOURCE_SYSTEM_NAME" as 'Instance_Datacenter,
getdate() as 'Rec_Created_Date,
getdate() as 'Rec_Updated_Date,
$"V.CREATED_AT" as 'Instance_Creation_Date,
$"V.UPDATED_AT" as 'Instance_Update_Date,
instance() as 'Instance_Created_by,
instance() as 'Instance_Updated_by,
$"V.CREATED_AT" as 'Effective_Start_Datetime,
$"V.DELETED_AT" as 'Effective_End_Datetime,
source_system() as 'Source_System_Name,
current_record() as 'Current_Record,
extended_table() as 'Extended_Table_Name
).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim"))
}
}
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import com.databricks.spark.csv.CsvSchemaRDD
import org.apache.spark.sql.functions._
object JDBCTester {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("JDBC Tester")
val sc = new SparkContext(conf)
val sqlCtx = new SQLContext(sc)
import sqlCtx.implicits._
val new_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/new_dim.csv", "header" -> "true")).as('ndim)
val ext_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/ext_dim.csv", "header" -> "true")).as('pdim)
val res = new_dim.join(ext_dim, new_dim("nk") === ext_dim("nk"), "inner")
val now = new SimpleDateFormat().format(new Date())
// scd type 2 changes
val scd2 = res.filter($"ndim.t2" !== $"pdim.t2").flatMap( row => {
val (newRowSeq, prevRowSeq) = row.toSeq.splitAt(row.length / 2)
val (newRow, prevRow) = (
Row.fromSeq(newRowSeq),
Row.fromSeq(prevRowSeq.updated(6, now).updated(7, 0))
)
Array(prevRow, newRow)
})
// scd type 1 changes
val scd1 = res.filter($"ndim.t1" !== $"pdim.t1").select($"ndim.*")
sqlCtx.createDataFrame(scd2, scd1.schema).unionAll(scd1).saveAsCsvFile("/home/ramp/tmp/dim.csv")
}
}
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import com.databricks.spark.csv.CsvSchemaRDD
import org.apache.spark.sql.functions._
object JDBCTester {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("JDBC Tester")
val sc = new SparkContext(conf)
val sqlCtx = new SQLContext(sc)
val volumes = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volumes.data", "header" -> "false")).toDF("CREATED_AT",
"UPDATED_AT",
"DELETED_AT",
"DELETED",
"ID",
"EC2_ID",
"USER_ID",
"PROJECT_ID",
"HOST",
"SIZE",
"AVAILABILITY_ZONE",
"INSTANCE_UUID",
"MOUNTPOINT",
"ATTACH_TIME",
"STATUS",
"ATTACH_STATUS",
"SCHEDULED_AT",
"LAUNCHED_AT",
"TERMINATED_AT",
"DISPLAY_NAME",
"DISPLAY_DESCRIPTION",
"PROVIDER_LOCATION",
"PROVIDER_AUTH",
"SNAPSHOT_ID",
"VOLUME_TYPE_ID",
"SOURCE_SYSTEM_NAME",
"SOURCE_VOLID",
"bootable",
"attached_host",
"provider_geometry",
"_name_id",
"encryption_key_id",
"migration_status")
val volume_type = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/volume_types.data", "header" -> "false")).toDF(
"CREATED_AT","UPDATED_AT","DELETED_AT","DELETED","ID","NAME","SOURCE_SYSTEM_NAME", "DC")
sqlCtx.udf.register("description_" , (source: String, name:String) => s"$name in $source datacenter")
sqlCtx.udf.register("instanceType" , () => "cbs")
sqlCtx.udf.register("keyGenerator" , () => 0)
sqlCtx.udf.register("getdate" , () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date()))
sqlCtx.udf.register("instance" , () => "m_Dim_Instance")
sqlCtx.udf.register("source_system" , () => "cbs_instance")
sqlCtx.udf.register("current_record" , () => 1)
sqlCtx.udf.register("extended_table" , () => "Dim_Instance_Extended_CBS")
volume_type.registerTempTable("VT")
volumes.registerTempTable("V")
val stmt =
"""
|SELECT
| keyGenerator() as Instance_Key,
| V.ID as Instance_NK,
| instanceType() as Instance_Type,
| V.ID as Assigned_Instance_Number,
| V.PROJECT_ID as Assigned_Account_Number,
| V.DISPLAY_NAME as Instance_Name,
| description_(V.SOURCE_SYSTEM_NAME, VT.NAME) as Instance_Description,
| V.STATUS as Instance_Status,
| V.SOURCE_SYSTEM_NAME as Instance_Datacenter,
| getdate() as Rec_Created_Date,
| getdate() as Rec_Updated_Date,
| V.CREATED_AT as Instance_Creation_Date,
| V.UPDATED_AT as Instance_Update_Date,
| instance() as Instance_Created_by,
| instance() as Instance_Updated_by,
| V.CREATED_AT as Effective_Start_Datetime,
| V.DELETED_AT as Effective_End_Datetime,
| source_system() as Source_System_Name,
| current_record() as Current_Record,
| extended_table() as Extended_Table_Name
|FROM
| V INNER JOIN VT ON
| V.VOLUME_TYPE_ID = VT.ID
""".stripMargin
sqlCtx.sql(stmt).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim"))
}
}
@pkit
Copy link

pkit commented Sep 15, 2015

A lot of hardcoded numbers, need at least some comments or better self-describing variable names...

@gsrikanth99
Copy link

Can you please share the data/input file ? /home/ramp/tmp/new_dim.csv

@rambabu03
Copy link

sir, great work

@GrigorievNick
Copy link

Hy can you explain me, what filter on "nk" column do?
What nk column is?

same to 'ndim.t2?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment