Skip to content

Instantly share code, notes, and snippets.

@joanteixi
Created October 25, 2022 14:39
Show Gist options
  • Save joanteixi/1a82c1d6b6116ed80d8a96bcbfcf9a98 to your computer and use it in GitHub Desktop.
Save joanteixi/1a82c1d6b6116ed80d8a96bcbfcf9a98 to your computer and use it in GitHub Desktop.
Spark read csv file from line N
# reads a csv
def read_csv_from_rown_number(path, row_number = 1, sep=';', schema = None, ):
# load csv as raw text abd add a Id clumns
df = spark.read.text(path)
df = df.withColumn('idRowNbField', f.monotonically_increasing_id())
df = df.where('idRowNbField >= %s' % row_number)
#extract header and clean header
header = df.limit(1).select('value').collect()[0]
header_list= list(header.asDict()['value'].split(sep))
header_list = [field.strip() for field in header_list]
# remove header and idRowNbField field
df = df.where('idRowNbField > 1')
df = df.drop('idRowNbField')
# remove quotes from every doc...
df= df.withColumn('value', f.regexp_replace('value', '"', ''))
# write in temp table
random_name = f.rand()
df.write.csv('/tmp/%s.csv' % random_name, mode='overwrite')
# load as CSV
csvFile = spark.read.format('csv').option('sep', sep).option("mode", "PERMISSIVE").option("quote", "")
if schema:
csvFile = csvFile.schema(schema)
df = csvFile.load('/tmp/%s.csv' % random_name)
for i, column in enumerate(header_list):
df = df.withColumnRenamed('_c'+str(i), column)
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment