Skip to content

Instantly share code, notes, and snippets.

@tonyfraser
Forked from zaloogarcia/pandas_to_spark.py
Last active May 11, 2020 17:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tonyfraser/79a255aa8a9d765bd5cf8bd13597171e to your computer and use it in GitHub Desktop.
Save tonyfraser/79a255aa8a9d765bd5cf8bd13597171e to your computer and use it in GitHub Desktop.
Script for converting Pandas DF to Spark's DF, but with support for ArrayType[StringType]]
# https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error/56895546#56895546
# modified from parent gist by creating a dict type that contains df.dtypes AND type(pd.columnname)
#
# Looks something like this.
# { 'stringtypecolumn': {'dtype': 'object', 'actual': 'str'},
# 'act_num': {'dtype': 'int32', 'actual': 'numpy.int32'},
# 'text_dat': {'dtype': 'object', 'actual': 'list'},
# 'scene_description': {'dtype': 'object', 'actual': 'NoneType'},
# 'keywords': {'dtype': 'object', 'actual': 'list'}}
#
#Updated to be a class on 5 AUG 2019
from pyspark.shell import sqlContext
from pyspark.sql.types import StringType, StructField, ArrayType, IntegerType, FloatType, DateType, LongType, StructType
class DataFrameConverter:
"""A class that takes in a pandas data frame and converts it to a spark data frame..."""
dtypeHeader = 'dtype'
actualHeader = 'actual'
def debug(self):
print(f"dataframe type{type(self.df)}")
print(f"dtypeHeader{self.dtypeHeader}")
def get_pdf_column_meta(self, column_name_set):
column_meta = {}
for ch in column_name_set:
column_meta.update({ch: {
self.dtypeHeader: str(self.df[ch].dtypes),
self.actualHeader: str(type(self.df[ch][0])).split("'")[1]
}})
return column_meta
def equivalent_type(self, dtype, actual):
if dtype == 'datetime64[ns]':
return DateType()
elif dtype == 'int64':
return LongType()
elif dtype == 'int32':
return IntegerType()
elif dtype == 'float64':
return FloatType()
elif dtype == "object" and actual == "list":
return ArrayType(StringType())
else:
return StringType()
def define_structure(self, column, tpe1, tpe2):
try:
typo = self.equivalent_type(tpe1, tpe2)
except:
typo = StringType()
print("not ok match type, resorting to string")
struct_field_return = StructField(column, typo)
return struct_field_return
def get_spark_df(self, df):
self.df = df
meta = self.get_pdf_column_meta(self.df.columns)
struct_list = []
for x in meta:
# tpe = col_attr(meta, str(x))
tpe = [str(meta.get(x).get(self.dtypeHeader)), str(meta.get(x).get(self.actualHeader))]
struct_list.append(self.define_structure(x, tpe[0], tpe[1]))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(self.df, p_schema)
# To use
# load your pandas DF as pandasDF
#
#from {path}.DataFrameConverter import DataFrameConverter as dfc
#convertedToSpark = dfc().get_spark_df(pandasDF)
#convertedToSpark.printSchema()
#check your [ArrayType[StringType]] columns.
@wangzhixuan
Copy link

Hi, Tony,

This converter looks very helpful to me.

I tried to add another simple bool type into the conditions, but somehow it failed.
Here is my addition between line 46 and line 47

        elif dtype == 'bool':
            return BooleanType()

Here is my test code

test_df = pd.DataFrame([{'a':True}, {'a':False}])
DataFrameConverter().get_spark_df(test_df).show()

Here is the error message

Caused by: java.lang.NullPointerException

Do you have any idea how this can be resolved?

Thanks,

@tonyfraser
Copy link
Author

Thanks for note @wangzhixuan. I'd have to see a lot more of the code but I will say that it was straight forward when I started having print lines on pretty much every other line. Man I had a lot of print lines and type() commands in there when I built this. It also helped when I ran it in the intellij debugger.

If you do build it out a little more, LMK and I'll collaborate. Maybe we can set up a github repo and kick it off as an open source project for us and others.

tf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment