Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Table definition:

CREATE TYPE physician(
    physician_first_name text,
    physician_last_name text,
    physician_license_state_code1 text,
    physician_license_state_code2 text,
    physician_license_state_code3 text,
    physician_license_state_code4 text,
    physician_license_state_code5 text,
    physician_middle_name text,
    physician_name_suffix text,
    physician_ownership_indicator text,
    physician_primary_type text,
    physician_profile_id text,
    physician_specialty text
);

CREATE TYPE recipient(
    recipient_city text,
    recipient_country text,
    recipient_postal_code text,
    recipient_primary_business_street_address_line1 text,
    recipient_primary_business_street_address_line2 text,
    recipient_province text,
    recipient_state text,
    recipient_zip_code text
);

CREATE TABLE payments (
    branch text,
    timebucket text,
    create_ts timestamp,
    eventid text,
    applicable_manufacturer_or_applicable_gpo_making_payment_country text,
    applicable_manufacturer_or_applicable_gpo_making_payment_id text,
    applicable_manufacturer_or_applicable_gpo_making_payment_name text,
    applicable_manufacturer_or_applicable_gpo_making_payment_state text,
    charity_indicator text,
    city_of_travel text,
    contextual_information text,
    country_of_travel text,
    covered_recipient_type text,
    date_of_payment timestamp,
    delay_in_publication_indicator text,
    dispute_status_for_publication text,
    form_of_payment_or_transfer_of_value text,
    name_of_associated_covered_device_or_medical_supply1 text,
    name_of_associated_covered_device_or_medical_supply2 text,
    name_of_associated_covered_device_or_medical_supply3 text,
    name_of_associated_covered_device_or_medical_supply4 text,
    name_of_associated_covered_device_or_medical_supply5 text,
    name_of_associated_covered_drug_or_biological1 text,
    name_of_associated_covered_drug_or_biological2 text,
    name_of_associated_covered_drug_or_biological3 text,
    name_of_associated_covered_drug_or_biological4 text,
    name_of_associated_covered_drug_or_biological5 text,
    name_of_third_party_entity_receiving_payment_or_transfer_of_value text,
    nature_of_payment_or_transfer_of_value text,
    ndc_of_associated_covered_drug_or_biological1 text,
    ndc_of_associated_covered_drug_or_biological2 text,
    ndc_of_associated_covered_drug_or_biological3 text,
    ndc_of_associated_covered_drug_or_biological4 text,
    ndc_of_associated_covered_drug_or_biological5 text,
    number_of_payments_included_in_total_amount double,
    payment_publication_date timestamp,
    physicians set<frozen<physician>>,
    product_indicator text,
    program_year text,
    record_id text,
    solr_query text,
    state_of_travel text,
    submitting_applicable_manufacturer_or_applicable_gpo_name text,
    teaching_hospital_id text,
    teaching_hospital_name text,
    third_party_equals_covered_recipient_indicator text,
    third_party_payment_recipient_indicator text,
    total_amount_of_payment_usdollars double,
    PRIMARY KEY ((branch, timebucket), create_ts, eventid)
);

Due to the size of the table we will avoid case classes and use the UDTValue.fromMap API directly.

First let's create the UDT values, in this case I'm making two:

val physicianUDT = UDTValue.fromMap(
    Map(
        "physician_first_name" -> "Sebastian",
        "physician_last_name" -> "Estevez",
        "physician_specialty" -> "spark-cassandra",
        "physician_license_state_code1" -> "code1",
        "physician_license_state_code2"-> "",
        "physician_license_state_code3" -> "",
        "physician_license_state_code4" -> "",
        "physician_license_state_code5" ->"",
        "physician_middle_name" -> "",
        "physician_name_suffix" -> "",
        "physician_ownership_indicator" -> "",
        "physician_primary_type" -> "",
        "physician_profile_id" -> ""
    )
)

val physicianUDT2 = UDTValue.fromMap(
    Map(
        "physician_first_name" -> "Caroline",
        "physician_last_name" -> "George",
        "physician_specialty" -> "spark-cassandra",
        "physician_license_state_code1" -> "",
        "physician_license_state_code2"-> "",
        "physician_license_state_code3" -> "",
        "physician_license_state_code4" -> "",
        "physician_license_state_code5" ->"",
        "physician_middle_name" -> "",
        "physician_name_suffix" -> "",
        "physician_ownership_indicator" -> "",
        "physician_primary_type" -> "",
        "physician_profile_id" -> ""
    )
)

Then we'll actuall create the rows of objects and include one or more UDTs from the previous step using a List. Cassandra Set collections translate to Lists in Scala.

val myRow = sc.parallelize(
    Seq(
        (
            "New York",
            "2016-01-01:00:01",
            "2016-01-01","A",
            List(physicianUDT,physicianUDT2)
        ),
        (
            "Canada",
            "2015-01-01:00:01",
            "2015-01-01","A",
            List(physicianUDT)
        )
    )
)

Finally we take the row and save it to Cassandra:

myRow.saveToCassandra(
    "payments",
    "payments",
    SomeColumns(
        "branch",
        "timebucket",
        "create_ts",
        "eventid",
        "physicians"
    )
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment