Skip to content

Instantly share code, notes, and snippets.

@jitsejan
Created February 26, 2019 12:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jitsejan/28efa365d1322ea018b9b2c34ffbf248 to your computer and use it in GitHub Desktop.
Save jitsejan/28efa365d1322ea018b9b2c34ffbf248 to your computer and use it in GitHub Desktop.
Interacting with Blob storage (no Spark writing)
from azure.storage.blob import BlockBlobService
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
from configparser import RawConfigParser
from pyspark import SparkConf, SparkContext, SQLContext
CONTAINER_NAME = "userjj"
BLOB_NAME = "characters.parquet"
def setup_spark(config):
""" Setup Spark to connect to Azure Blob Storage """
jars = [
"spark-2.4.0-bin-hadoop2.7/jars/hadoop-azure-2.7.3.jar",
"spark-2.4.0-bin-hadoop2.7/jars/azure-storage-6.1.0.jar",
]
conf = (
SparkConf()
.setAppName("Spark Blob Test")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.set(
f"fs.azure.account.key.{config['blob-store']['blob_account_name']}.blob.core.windows.net",
config["blob-store"]["blob_account_key"],
)
)
sc = SparkContext(conf=conf).getOrCreate()
return SQLContext(sc)
def write_pandas_dataframe_to_blob(blob_service, df, container_name, blob_name):
""" Write Pandas dataframe to blob storage """
buffer = BytesIO()
df.to_parquet(buffer)
blob_service.create_blob_from_bytes(
container_name=container_name, blob_name=blob_name, blob=buffer.getvalue()
)
def get_pandas_dataframe_from_parquet_on_blob(blob_service, container_name, blob_name):
""" Get a dataframe from Parquet file on blob storage """
byte_stream = BytesIO()
try:
blob_service.get_blob_to_stream(
container_name=container_name, blob_name=blob_name, stream=byte_stream
)
df = pq.read_table(source=byte_stream).to_pandas()
finally:
byte_stream.close()
return df
def get_pyspark_dataframe_from_parquet_on_blob(
config, sql_context, container_name, blob_name
):
""" Get a dataframe from Parquet file on blob storage using PySpark """
path = f"wasbs://{container_name}@{config['blob-store']['blob_account_name']}.blob.core.windows.net/{blob_name}"
return sql_context.read.parquet(path)
def main():
# Read the configuration
config = RawConfigParser()
config.read("blobconfig.ini")
# Create blob_service
blob_service = BlockBlobService(
account_name=config["blob-store"]["blob_account_name"],
account_key=config["blob-store"]["blob_account_key"],
)
# Create Spark context
sql_context = setup_spark(config)
# Create dataframe
df = pd.DataFrame.from_dict(
[("Mario", "Red"), ("Luigi", "Green"), ("Princess", "Pink")]
).rename(columns={0: "name", 1: "color"})
print(df.head())
# Write to blob using pyarrow
write_pandas_dataframe_to_blob(blob_service, df, CONTAINER_NAME, BLOB_NAME)
# Read from blob using pyarrow
rdf = get_pandas_dataframe_from_parquet_on_blob(
blob_service, CONTAINER_NAME, BLOB_NAME
)
print(rdf.head())
# Read from blob using PySpark
sdf = get_pyspark_dataframe_from_parquet_on_blob(
config, sql_context, CONTAINER_NAME, BLOB_NAME
)
print(sdf.show())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment