Skip to content

Instantly share code, notes, and snippets.

@markmo
Created March 12, 2016 01:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markmo/7653ba15bd054887286e to your computer and use it in GitHub Desktop.
Save markmo/7653ba15bd054887286e to your computer and use it in GitHub Desktop.
Transformation DSL Example
/* WIP - NOT READY
"DSL notation" should "execute a pipeline" in {
import diamond.transformation._
// import helper functions such as fieldLocator
import RowTransformation._
// create a new context object that can pass state between transformations
val ctx = new TransformationContext
// set the schema into the context
ctx(RowTransformation.SCHEMA_KEY, rawDF.schema)
given(rawDF, ctx) {
rows {
transform(name = "Hello") { (row, ctx) => {
val f = fieldLocator(row, ctx)
Row(
f("entityIdType"),
f("entityId"),
s"Hello ${f("eventType")}",
f("ts"),
f("value"),
f("properties"),
f("processTime")
)
}} /> {
transform(name = "World") { (row, ctx) =>
Row(
row.getString(0),
row.getString(1),
"World " + row.getString(2),
row.getString(3),
row.getString(4),
row.getString(5),
row.getString(6)
)
}
},
append(
name = "AddFive",
columnName = "column_8",
dataType = IntegerType
) { (row, ctx) =>
55
} /> {
append(
name = "Fifty",
columnName = "column_7",
dataType = IntegerType
) { (row, ctx) =>
50
}
}
}
}
}*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment