Skip to content

Instantly share code, notes, and snippets.

@jobliz
Created June 18, 2018 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jobliz/990991ba501086c83a442b2913833dca to your computer and use it in GitHub Desktop.
Save jobliz/990991ba501086c83a442b2913833dca to your computer and use it in GitHub Desktop.
Pandas/Spark dataframe conversion with retail dataset
"""
CSV Dataset
Download XLSX dataset from http://archive.ics.uci.edu/ml/datasets/online+retail
Convert to CSV with LibreOffice Calc
Spark timestamp and datetime conversion from string:
https://stackoverflow.com/questions/46295879/how-to-read-date-in-custom-format-from-csv-file
https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html
Pandas dataframe to spark
https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error
https://github.com/awantik/pyspark-tutorial/wiki/Migrating-from-Pandas-to-Apache-Spark%E2%80%99s-DataFrame
Spark dataframe to Pandas
https://stackoverflow.com/questions/40651003/attributeerror-sparkcontext-object-has-no-attribute-createdataframe-using-s
There is no datetime in pandas,
https://stackoverflow.com/questions/21269399/datetime-dtypes-in-pandas-read-csv
https://stackoverflow.com/questions/17465045/can-pandas-automatically-recognize-dates
"""
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
context = SparkContext()
spark = SparkSession(context)
spark.conf.set("spark.sql.shuffle.partitions", "5")
# dateFormat apparently does nothing here
spark_df = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.option("dateFormat", "dd/MM/yyyy H:m")\
.option("timestampFormat", "dd/MM/yyyy H:m")\
.csv("retail.csv")
print("SCHEMA")
print(spark_df.printSchema())
print("")
# pandas dataframe
parse_dates = ['InvoiceDate']
# works but slow, better to parse date explicitly
# pdf = pd.read_csv('retail.csv', parse_dates=parse_dates)
# dateparse = lambda x: pd.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
dateparse = lambda x: pd.datetime.strptime(x, '%m/%d/%Y %H:%M')
pdf = pd.read_csv('retail.csv', parse_dates=parse_dates, date_parser=dateparse)
print(pdf.dtypes)
# spark/pandas conversions
print("\nSpark to Pandas:")
print(spark_df.toPandas())
print("\nPandas to Spark:")
sqlcontext = SQLContext(context)
# https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
schema = sparktypes.StructType([
sparktypes.StructField('InvoiceNo', sparktypes.StringType(), True),
sparktypes.StructField('StockCode', sparktypes.StringType(), True),
sparktypes.StructField('Description', sparktypes.StringType(), True),
sparktypes.StructField('Quantity', sparktypes.IntegerType(), True),
sparktypes.StructField('InvoiceDate', sparktypes.DateType(), True),
sparktypes.StructField('UnitPrice', sparktypes.FloatType(), True),
sparktypes.StructField('CustomerID', sparktypes.FloatType(), True),
sparktypes.StructField('Country', sparktypes.StringType(), True),
])
spark_df = sqlcontext.createDataFrame(pdf, schema)
print(spark_df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment