Skip to content

Instantly share code, notes, and snippets.

@hadoopsters
Last active February 4, 2021 11:44
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hadoopsters/e96f290a74dcfb694f4c0bd235ee03de to your computer and use it in GitHub Desktop.
Save hadoopsters/e96f290a74dcfb694f4c0bd235ee03de to your computer and use it in GitHub Desktop.
A simple way to join static and streaming datasets using the transform() function of a Dstream.
// create a case class to represent a Transaction (from streaming)
case class Transaction(
ts: Int,
customer_id: Int,
transaction_id: String,
amount: Double
)
// create a case class to represent a TransactionDetail (from static)
case class TransactionDetail(
transaction_id: String,
country: String,
state: String,
zip: Long,
RepeatCustomer: Boolean
)
// Pull in a static dataset by whatever means you prefer
// For this example, this object is Dataset[String, TransactionDetail] --> transaction_id, rest_of_the_data
val transaction_details = .. // some static data that returns in this form: Dataset[(String, TransactionDetail)]
// where String is the transaction_id and the latter is data in whatever form you prefer (string, dataframe, case class, etc)
// simply use a map like the one below to map your data into this structure
// the only thing that matters is getting the column you wish to join on as the first position in your object, as the join implicitly joins on this value
// Pull in a Dstream of some streaming data
val live_transactions = ... // some streaming data from Kafka, Flink, etc
// Create a new DStream where each object has two positions: first: the column you want to join on (transaction_id), second: the whole data object (Transaction object)
// which gives it this structure: DStream[transaction_id, Transaction]
// you'll need transaction_details to be in a similar format of: (transaction_id, TransactionDetail)
val transactions = live_transactions.map(x => (x.transaction_id, Transaction(
x.ts,
x.customer_id,
x.transaction_id,
x.amount)
))
// Create an RDD from your static dataset (if it's not an RDD already, and instead a dataset or dataframe)
val transaction_details_rdd = transaction_details.rdd
// Take the new DStream and use the transform() command to join the static dataset to it (using transaction_id)
val complete_transaction_data = transactions.transform(live_transaction =>
live_transaction.join(transaction_details_rdd) // <-- you can use left, right and full outer joins as well
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment