Skip to content

Instantly share code, notes, and snippets.

@drj42
Last active August 29, 2015 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drj42/a1ff0e57e11e930291da to your computer and use it in GitHub Desktop.
Save drj42/a1ff0e57e11e930291da to your computer and use it in GitHub Desktop.
Load csv's in spark
"""
Launch pyspark with the flag:
--packages com.databricks:spark-csv_2.10:1.0.3
"""
from pyspark.sql import SQLContext
from pyspark.sql.types import StructField, StringType, StructType
def create_schema(fields):
""" Create a dataframe schema from a list of field names.
"""
schema = [StructField(str(field), StringType(), True) for field in fields]
return StructType(schema)
def load_csv(sq, path, delimiter=",", fields=None):
""" Create a dataframe from a csv file. If no fields are specified,
it will assume headers are included in the file.
"""
args = {
'source': 'com.databricks.spark.csv',
'path': path,
'header': 'true',
'delimiter': delimiter,
}
if fields:
args['schema'] = create_schema(fields)
args['header'] = "false"
return sq.load(**args)
# load a file with a header included
csv_dataframe = load_csv(sqlContext, path)
# load a file without a header, but apply your own schema
csv_dataframe = load_csv(sqlContext, path, fields=['name','age'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment