Last active
February 4, 2021 11:44
-
-
Save hadoopsters/e96f290a74dcfb694f4c0bd235ee03de to your computer and use it in GitHub Desktop.
A simple way to join static and streaming datasets using the transform() function of a Dstream.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// create a case class to represent a Transaction (from streaming) | |
case class Transaction( | |
ts: Int, | |
customer_id: Int, | |
transaction_id: String, | |
amount: Double | |
) | |
// create a case class to represent a TransactionDetail (from static) | |
case class TransactionDetail( | |
transaction_id: String, | |
country: String, | |
state: String, | |
zip: Long, | |
RepeatCustomer: Boolean | |
) | |
// Pull in a static dataset by whatever means you prefer | |
// For this example, this object is Dataset[String, TransactionDetail] --> transaction_id, rest_of_the_data | |
val transaction_details = .. // some static data that returns in this form: Dataset[(String, TransactionDetail)] | |
// where String is the transaction_id and the latter is data in whatever form you prefer (string, dataframe, case class, etc) | |
// simply use a map like the one below to map your data into this structure | |
// the only thing that matters is getting the column you wish to join on as the first position in your object, as the join implicitly joins on this value | |
// Pull in a Dstream of some streaming data | |
val live_transactions = ... // some streaming data from Kafka, Flink, etc | |
// Create a new DStream where each object has two positions: first: the column you want to join on (transaction_id), second: the whole data object (Transaction object) | |
// which gives it this structure: DStream[transaction_id, Transaction] | |
// you'll need transaction_details to be in a similar format of: (transaction_id, TransactionDetail) | |
val transactions = live_transactions.map(x => (x.transaction_id, Transaction( | |
x.ts, | |
x.customer_id, | |
x.transaction_id, | |
x.amount) | |
)) | |
// Create an RDD from your static dataset (if it's not an RDD already, and instead a dataset or dataframe) | |
val transaction_details_rdd = transaction_details.rdd | |
// Take the new DStream and use the transform() command to join the static dataset to it (using transaction_id) | |
val complete_transaction_data = transactions.transform(live_transaction => | |
live_transaction.join(transaction_details_rdd) // <-- you can use left, right and full outer joins as well | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment