Table definition:
CREATE TYPE physician(
physician_first_name text,
physician_last_name text,
physician_license_state_code1 text,
physician_license_state_code2 text,
physician_license_state_code3 text,
physician_license_state_code4 text,
physician_license_state_code5 text,
physician_middle_name text,
physician_name_suffix text,
physician_ownership_indicator text,
physician_primary_type text,
physician_profile_id text,
physician_specialty text
);
CREATE TYPE recipient(
recipient_city text,
recipient_country text,
recipient_postal_code text,
recipient_primary_business_street_address_line1 text,
recipient_primary_business_street_address_line2 text,
recipient_province text,
recipient_state text,
recipient_zip_code text
);
CREATE TABLE payments (
branch text,
timebucket text,
create_ts timestamp,
eventid text,
applicable_manufacturer_or_applicable_gpo_making_payment_country text,
applicable_manufacturer_or_applicable_gpo_making_payment_id text,
applicable_manufacturer_or_applicable_gpo_making_payment_name text,
applicable_manufacturer_or_applicable_gpo_making_payment_state text,
charity_indicator text,
city_of_travel text,
contextual_information text,
country_of_travel text,
covered_recipient_type text,
date_of_payment timestamp,
delay_in_publication_indicator text,
dispute_status_for_publication text,
form_of_payment_or_transfer_of_value text,
name_of_associated_covered_device_or_medical_supply1 text,
name_of_associated_covered_device_or_medical_supply2 text,
name_of_associated_covered_device_or_medical_supply3 text,
name_of_associated_covered_device_or_medical_supply4 text,
name_of_associated_covered_device_or_medical_supply5 text,
name_of_associated_covered_drug_or_biological1 text,
name_of_associated_covered_drug_or_biological2 text,
name_of_associated_covered_drug_or_biological3 text,
name_of_associated_covered_drug_or_biological4 text,
name_of_associated_covered_drug_or_biological5 text,
name_of_third_party_entity_receiving_payment_or_transfer_of_value text,
nature_of_payment_or_transfer_of_value text,
ndc_of_associated_covered_drug_or_biological1 text,
ndc_of_associated_covered_drug_or_biological2 text,
ndc_of_associated_covered_drug_or_biological3 text,
ndc_of_associated_covered_drug_or_biological4 text,
ndc_of_associated_covered_drug_or_biological5 text,
number_of_payments_included_in_total_amount double,
payment_publication_date timestamp,
physicians set<frozen<physician>>,
product_indicator text,
program_year text,
record_id text,
solr_query text,
state_of_travel text,
submitting_applicable_manufacturer_or_applicable_gpo_name text,
teaching_hospital_id text,
teaching_hospital_name text,
third_party_equals_covered_recipient_indicator text,
third_party_payment_recipient_indicator text,
total_amount_of_payment_usdollars double,
PRIMARY KEY ((branch, timebucket), create_ts, eventid)
);
Due to the size of the table we will avoid case classes and use the UDTValue.fromMap API directly.
First let's create the UDT values, in this case I'm making two:
val physicianUDT = UDTValue.fromMap(
Map(
"physician_first_name" -> "Sebastian",
"physician_last_name" -> "Estevez",
"physician_specialty" -> "spark-cassandra",
"physician_license_state_code1" -> "code1",
"physician_license_state_code2"-> "",
"physician_license_state_code3" -> "",
"physician_license_state_code4" -> "",
"physician_license_state_code5" ->"",
"physician_middle_name" -> "",
"physician_name_suffix" -> "",
"physician_ownership_indicator" -> "",
"physician_primary_type" -> "",
"physician_profile_id" -> ""
)
)
val physicianUDT2 = UDTValue.fromMap(
Map(
"physician_first_name" -> "Caroline",
"physician_last_name" -> "George",
"physician_specialty" -> "spark-cassandra",
"physician_license_state_code1" -> "",
"physician_license_state_code2"-> "",
"physician_license_state_code3" -> "",
"physician_license_state_code4" -> "",
"physician_license_state_code5" ->"",
"physician_middle_name" -> "",
"physician_name_suffix" -> "",
"physician_ownership_indicator" -> "",
"physician_primary_type" -> "",
"physician_profile_id" -> ""
)
)
Then we'll actuall create the rows of objects and include one or more UDTs from the previous step using a List. Cassandra Set collections translate to Lists in Scala.
val myRow = sc.parallelize(
Seq(
(
"New York",
"2016-01-01:00:01",
"2016-01-01","A",
List(physicianUDT,physicianUDT2)
),
(
"Canada",
"2015-01-01:00:01",
"2015-01-01","A",
List(physicianUDT)
)
)
)
Finally we take the row and save it to Cassandra:
myRow.saveToCassandra(
"payments",
"payments",
SomeColumns(
"branch",
"timebucket",
"create_ts",
"eventid",
"physicians"
)
)