This document contains lessons learned with regard to Databricks programming, but also contains some best practices
blobname = "miraw"
storageaccount = "rdmidlgen2"
mountname = "/rdmi"
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-secret"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/63982aff-fb6c-4c22-973b-70e4acfb63e6/oauth2/token"}
source = "abfss://" + blobname + "@" + storageaccount + ".dfs.core.windows.net/"
mount = "/mnt" + mountname
dbutils.fs.mount(source = source, mount_point = mount, extra_configs = configs)
Secreta are stored in an Azure Key-Vault. The KV needs to be attached (made known) to the Databricks Workspace, before you are able to access it. Below is a typical command to access data from the KV.
dbutils.secrets.get(scope = "databricks-akv", key = "service-principal-id")
Instructions for creating a Key Vaut Secret Scope are listed here.
DBFS is the Databricks File System and it allows you to create and store tables as parquet or Delta or Hive tables.
# Create empty dataframe of id and text_en
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), True),
StructField("text_en", StringType(), True)
])
df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
# write it to a DBFS table
df.write.mode("overwrite").saveAsTable("tmpTbl")
# Create temporary dataframe of the data to be written and write to Spark DBFS Table
t_df = spark.createDataFrame([{"Id":row["id"], "Text_en":response}])
df_writer = DataFrameWriter(t_df)
df_writer.insertInto("tmpTbl")
The following code provides an example of merging data from one dataframe to another, where the Id column from both tables match up.
# Merge the original and the new dataframes to add good product names and lmp_ids to the VCRM data
from pyspark.sql.functions import coalesce
# Create a Spark dataframe of the original data
sqlContext.refreshTable("insights_text")
df_all = spark.sql("select * from insights_text order by Id")
# Merge dataframes
df_new = df_all.join(df, df_all.Id == df.Id, "full_outer")\
.select(df_all.Id,\
df_all.Text,\
coalesce(df.Text_en, df_all.Text_en).alias("Text_en"),\
coalesce(df.LangCode, df_all.LangCode).alias("LangCode"),\
df_all.Sentiment,\
df_all.Pos,\
df_all.Neu,\
df_all.Neg)
print("Number of rows {} updated of a total {}".format(df.count(), df_all.count()))
df_new.printSchema()
On occassions, you may need to overwrite a table while using it in a dataframe. The code below creates a temporary table and then creates a new dataframe from the temporary table to create the wanted table.
df_new.write.mode("overwrite").saveAsTable("temp_table")
df = sqlContext.table("temp_table")
df.write.mode("overwrite").saveAsTable("insights_text")
spark.sql("drop table temp_table")
sqlContext.refreshTable("insights_text")
Sometimes it is necessary to refresh a table before you can read from it. Reason for this may be that the table was just overwritten in another notebook or session.
sqlContext.refreshTable("vcrm_mvoc_raw")
import json
v_json = json.dumps({"status": "OK", "table": "myTable"})
Note: JSON object has square brackets around it
import json
v_json = json.loads([{"status": "OK", "table": "myTable"}])
status = v_json["status"]
table = v_json["table"]
Often an API call has a rate limit, which prevents you from posting too many calls per second. This piece of source code is an example of how to overcome the limit.
import requests, time, random
max_attempts = 10
attempts = 0
while attempts < max_attempts:
# Make a request to Clover REST API
response = requests.post(translate_api_url+lang, headers=headers, json=text)
# If not rate limited, break out of while loop and continue with the rest of the code
if response.status_code != 429:
break
# If rate limited, wait and try again
time.sleep((2 ** attempts) + random.random())
attempts = attempts + 1