Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
A simple way to join static and streaming datasets using the transform() function of a Dstream.
// create a case class to represent a Transaction (from streaming)
case class Transaction(
ts: Int,
customer_id: Int,
transaction_id: String,
amount: Double
// create a case class to represent a TransactionDetail (from static)
case class TransactionDetail(
transaction_id: String,
country: String,
state: String,
zip: Long,
RepeatCustomer: Boolean
// Pull in a static dataset by whatever means you prefer
// For this example, this object is Dataset[String, TransactionDetail] --> transaction_id, rest_of_the_data
val transaction_details = .. // some static data that returns in this form: Dataset[(String, TransactionDetail)]
// where String is the transaction_id and the latter is data in whatever form you prefer (string, dataframe, case class, etc)
// simply use a map like the one below to map your data into this structure
// the only thing that matters is getting the column you wish to join on as the first position in your object, as the join implicitly joins on this value
// Pull in a Dstream of some streaming data
val live_transactions = ... // some streaming data from Kafka, Flink, etc
// Create a new DStream where each object has two positions: first: the column you want to join on (transaction_id), second: the whole data object (Transaction object)
// which gives it this structure: DStream[transaction_id, Transaction]
// you'll need transaction_details to be in a similar format of: (transaction_id, TransactionDetail)
val transactions = => (x.transaction_id, Transaction(
// Create an RDD from your static dataset (if it's not an RDD already, and instead a dataset or dataframe)
val transaction_details_rdd = transaction_details.rdd
// Take the new DStream and use the transform() command to join the static dataset to it (using transaction_id)
val complete_transaction_data = transactions.transform(live_transaction =>
live_transaction.join(transaction_details_rdd) // <-- you can use left, right and full outer joins as well
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.