Skip to content

Instantly share code, notes, and snippets.

@ravila4
Created January 27, 2020 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ravila4/336cb80cff4bc0d293737c0c018c0e37 to your computer and use it in GitHub Desktop.
Save ravila4/336cb80cff4bc0d293737c0c018c0e37 to your computer and use it in GitHub Desktop.
Databricks Programming Guidance

Databricks Programming Guidance

This document contains lessons learned with regard to Databricks programming, but also contains some best practices

Mapping to a Azure Data Lake Generation 2

blobname = "miraw"  
storageaccount = "rdmidlgen2"  
mountname = "/rdmi"

configs = {"fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-secret"),
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/63982aff-fb6c-4c22-973b-70e4acfb63e6/oauth2/token"}

source = "abfss://" + blobname + "@" + storageaccount + ".dfs.core.windows.net/"
mount = "/mnt" + mountname

dbutils.fs.mount(source = source, mount_point = mount, extra_configs = configs)

Key-Vault Secrets

Secreta are stored in an Azure Key-Vault. The KV needs to be attached (made known) to the Databricks Workspace, before you are able to access it. Below is a typical command to access data from the KV.

dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-id")

Instructions for creating a Key Vaut Secret Scope are listed here.

Working with DBFS Tables and Views

DBFS is the Databricks File System and it allows you to create and store tables as parquet or Delta or Hive tables.

Create an Empty Dataframe and Spark Table

# Create empty dataframe of id and text_en
from pyspark.sql.types import *
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("text_en", StringType(), True)
])
df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

# write it to a DBFS table
df.write.mode("overwrite").saveAsTable("tmpTbl")

Insert into a Spark Table

# Create temporary dataframe of the data to be written and write to Spark DBFS Table
t_df = spark.createDataFrame([{"Id":row["id"], "Text_en":response}])
df_writer = DataFrameWriter(t_df)
df_writer.insertInto("tmpTbl")

Merging Spark Dataframes

The following code provides an example of merging data from one dataframe to another, where the Id column from both tables match up.

# Merge the original and the new dataframes to add good product names and lmp_ids to the VCRM data
from pyspark.sql.functions import coalesce

# Create a Spark dataframe of the original data
sqlContext.refreshTable("insights_text")
df_all = spark.sql("select * from insights_text order by Id")

# Merge dataframes
df_new = df_all.join(df, df_all.Id == df.Id, "full_outer")\
            .select(df_all.Id,\
                    df_all.Text,\
                    coalesce(df.Text_en, df_all.Text_en).alias("Text_en"),\
                    coalesce(df.LangCode, df_all.LangCode).alias("LangCode"),\
                    df_all.Sentiment,\
                    df_all.Pos,\
                    df_all.Neu,\
                    df_all.Neg)


print("Number of rows {} updated of a total {}".format(df.count(), df_all.count()))
df_new.printSchema()

Overwrite a Spark table that is being used in a dataframe

On occassions, you may need to overwrite a table while using it in a dataframe. The code below creates a temporary table and then creates a new dataframe from the temporary table to create the wanted table.

df_new.write.mode("overwrite").saveAsTable("temp_table")
df = sqlContext.table("temp_table")
df.write.mode("overwrite").saveAsTable("insights_text")
spark.sql("drop table temp_table")
sqlContext.refreshTable("insights_text")

Refresh Spark Table

Sometimes it is necessary to refresh a table before you can read from it. Reason for this may be that the table was just overwritten in another notebook or session.

sqlContext.refreshTable("vcrm_mvoc_raw")

Working with JSON in Python

Create JSON Data

import json
v_json = json.dumps({"status": "OK", "table": "myTable"})

Read JSON Data

Note: JSON object has square brackets around it

import json
v_json = json.loads([{"status": "OK", "table": "myTable"}])
status = v_json["status"]
table = v_json["table"]

API Call Throttleing due to Rate Limits

Often an API call has a rate limit, which prevents you from posting too many calls per second. This piece of source code is an example of how to overcome the limit.

import requests, time, random

max_attempts = 10
attempts = 0

while attempts < max_attempts:
    # Make a request to Clover REST API
    response = requests.post(translate_api_url+lang, headers=headers, json=text)

    # If not rate limited, break out of while loop and continue with the rest of the code
    if response.status_code != 429:
        break

    # If rate limited, wait and try again
    time.sleep((2 ** attempts) + random.random())
    attempts = attempts + 1    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment