Skip to content

Instantly share code, notes, and snippets.

@rampage644
Created September 15, 2015 18:02
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save rampage644/cc4659edd11d9a288c1b to your computer and use it in GitHub Desktop.
Save rampage644/cc4659edd11d9a288c1b to your computer and use it in GitHub Desktop.
Spark ETL resume

Introduction

This document describes sample process of implementing part of existing Dim_Instance ETL.

I took only Clound Block Storage source to simplify and speedup the process. I also ignnored creation of extended tables (specific for this particular ETL process). Below are code and final thoughts about possible Spark usage as primary ETL tool.

TL;DR

Implementation

Basic ETL implementation is really straightforward. The only real problem (I mean, really problem) is to find correct and comprehensive Mapping document (description what source fields go where).

ETL with Spark DSL:

val description_ = udf((source: String, name:String) => s"$name in $source datacenter")
    val instanceType = udf( () => "cbs")
    val keyGenerator = udf( () => 0)
    val getdate = udf( () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date()))
    val instance = udf( () => "m_Dim_Instance")
    val source_system = udf( () => "cbs_instance")
    val current_record = udf( () => 1)
    val extended_table = udf( () => "Dim_Instance_Extended_CBS")

    volumes.join(volume_type, $"VT.ID" === $"V.VOLUME_TYPE_ID", "inner")
      .select(
        keyGenerator() as 'Instance_Key,
        $"V.ID" as 'Instance_NK,
        instanceType() as 'Instance_Type,
        $"V.ID" as 'Assigned_Instance_Number,
        'PROJECT_ID as 'Assigned_Account_Number,
        'DISPLAY_NAME as 'Instance_Name,
        description_($"V.SOURCE_SYSTEM_NAME", 'NAME) as 'Instance_Description,
        'STATUS as 'Instance_Status,
        $"V.SOURCE_SYSTEM_NAME" as 'Instance_Datacenter,
        getdate() as 'Rec_Created_Date,
        getdate() as 'Rec_Updated_Date,
        $"V.CREATED_AT" as 'Instance_Creation_Date,
        $"V.UPDATED_AT" as 'Instance_Update_Date,
        instance() as 'Instance_Created_by,
        instance() as 'Instance_Updated_by,
        $"V.CREATED_AT" as 'Effective_Start_Datetime,
        $"V.DELETED_AT" as 'Effective_End_Datetime,
        source_system() as 'Source_System_Name,
        current_record() as 'Current_Record,
        extended_table() as 'Extended_Table_Name
      ).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim"))

Spark "SQL-style":

    sqlCtx.udf.register("description_" , (source: String, name:String) => s"$name in $source datacenter")
    sqlCtx.udf.register("instanceType" ,  () => "cbs")
    sqlCtx.udf.register("keyGenerator" ,  () => 0)
    sqlCtx.udf.register("getdate" ,  () => new SimpleDateFormat("dd/mm/yy").format(new java.util.Date()))
    sqlCtx.udf.register("instance" ,  () => "m_Dim_Instance")
    sqlCtx.udf.register("source_system" ,  () => "cbs_instance")
    sqlCtx.udf.register("current_record" ,  () => 1)
    sqlCtx.udf.register("extended_table" ,  () => "Dim_Instance_Extended_CBS")

    volume_type.registerTempTable("VT")
    volumes.registerTempTable("V")

    val stmt =
      """
        |SELECT
        | keyGenerator() as Instance_Key,
        | V.ID as Instance_NK,
        | instanceType() as Instance_Type,
        | V.ID as Assigned_Instance_Number,
        | V.PROJECT_ID as Assigned_Account_Number,
        | V.DISPLAY_NAME as Instance_Name,
        | description_(V.SOURCE_SYSTEM_NAME, VT.NAME) as Instance_Description,
        | V.STATUS as Instance_Status,
        | V.SOURCE_SYSTEM_NAME as Instance_Datacenter,
        | getdate() as Rec_Created_Date,
        | getdate() as Rec_Updated_Date,
        | V.CREATED_AT as Instance_Creation_Date,
        | V.UPDATED_AT as Instance_Update_Date,
        | instance() as Instance_Created_by,
        | instance() as Instance_Updated_by,
        | V.CREATED_AT as Effective_Start_Datetime,
        | V.DELETED_AT as Effective_End_Datetime,
        | source_system() as Source_System_Name,
        | current_record() as Current_Record,
        | extended_table() as Extended_Table_Name
        |FROM
        | V INNER JOIN VT ON
        |  V.VOLUME_TYPE_ID = VT.ID
      """.stripMargin

    sqlCtx.sql(stmt).save("com.databricks.spark.csv", SaveMode.Overwrite, Map("path" -> "/home/ramp/tmp/dim"))

Essentually, ETL is just SQL ETL and should be implemented with every QL-engine (Hive, Spark, RDBMS..)

Slowly Changing dimensions type 1/2

When implementing SCD type 2 Spark really helps over plain SQL. Here i used stub dimensions:

val new_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/new_dim.csv", "header" -> "true")).as('ndim)
    val ext_dim = sqlCtx.load("com.databricks.spark.csv", Map("path" -> "/home/ramp/tmp/ext_dim.csv", "header" -> "true")).as('pdim)

    val res = new_dim.join(ext_dim, new_dim("nk") === ext_dim("nk"), "inner")
    val now = new SimpleDateFormat().format(new Date())

    // scd type 2 changes
    val scd2 = res.filter($"ndim.t2" !== $"pdim.t2").flatMap( row => {
      val (newRowSeq, prevRowSeq) = row.toSeq.splitAt(row.length / 2)
      val (newRow, prevRow) = (
        Row.fromSeq(newRowSeq),
        Row.fromSeq(prevRowSeq.updated(6, now).updated(7, 0))
      )
      Array(prevRow, newRow)
    })
    // scd type 1 changes
    val scd1 = res.filter($"ndim.t1" !== $"pdim.t1").select($"ndim.*")

    sqlCtx.createDataFrame(scd2, scd1.schema).unionAll(scd1).saveAsCsvFile("/home/ramp/tmp/dim.csv")

One of possible problems is join operation which is really fast once it gets fit into memory. I found most big dimension table in production (Dim_Device) to be 4 billion record, but join only affects 700K records as we need only "actual" records. Spark joins two 1M (equal sized) tables in about 10s using regular dev laptop.

See all sources here: https://gist.github.com/rampage644/f0fdb964b7f437beba62

Conclusions

  1. Main problem is to get proper documentation
  2. Spark fits well given proper Spark/Scala acquaintance (I still need more experience with it to speedup development)
  3. Any SQL engine is good, too
  4. Most of problems described (and solved) here: http://dbtr.cs.aau.dk/DBPublications/DBTR-31.pdf
@pkit
Copy link

pkit commented Sep 15, 2015

Hmm, val keyGenerator = udf( () => 0) does it mean that it will always re-key the dimension?

@rampage644
Copy link
Author

It's a stub. I personaly don't want to deal with PK in ETL (though such problem could be solved, see 4th item doc).

@pkit
Copy link

pkit commented Sep 15, 2015

Yeah, I have skimmed through the paper and it looks legit.
We will probably need to implement everything they say there, so, let's start. :)

@abhineet13
Copy link

Is there a way to get NewRow, PrevRow without flat mapping and using the Seq, just by using Dataframe functions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment