Skip to content

Instantly share code, notes, and snippets.

@kagesenshi
Last active November 27, 2015 08:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kagesenshi/4bc105a35250ac47363f to your computer and use it in GitHub Desktop.
Save kagesenshi/4bc105a35250ac47363f to your computer and use it in GitHub Desktop.
PySpark CSV to DataFrame
def csvRDD_to_rowRDD(rdd):
#expect a RDD that stores csv
# eg: rdd = sc.textFile('myfile.csv')
from pyspark.sql import Row
rdd = rdd.zipWithIndex()
fail_key = 'X_IMPORT_FAIL'
def extract_row(keys):
def extract(x):
t = x[0].strip().split(',')
param = {}
for idx,h in enumerate(keys):
try:
param[h.upper()] = t[idx]
except:
param.setdefault(fail_key, [])
param[fail_key].append(h.upper())
failed_import = param.get(fail_key, None)
if failed_import:
return {
'data': x,
'failed_columns': failed_import,
fail_key : True
}
return param
return extract
header = rdd.first()
contents = rdd.filter(lambda x: x[1] != 0).filter(lambda x: True if x[0].strip() else False)
keys = header[0].strip().split(',')
result = contents.map(extract_row(keys))
def cast_row(x):
if fail_key in x.keys():
del x[fail_key]
return Row(**x)
success = result.filter(lambda x: not x.get(fail_key, False)).map(cast_row)
failed = result.filter(lambda x: x.get(fail_key, False)).map(cast_row)
return success, failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment