Skip to content

Instantly share code, notes, and snippets.

@wagnerjgoncalves
Last active August 31, 2018 13:24
Show Gist options
  • Save wagnerjgoncalves/d2622d291a2a63681f38aa60327bf9a8 to your computer and use it in GitHub Desktop.
Save wagnerjgoncalves/d2622d291a2a63681f38aa60327bf9a8 to your computer and use it in GitHub Desktop.
Pyspark using SparkContext example
# -*- coding: utf-8 -*-
"""
Example of Python RDD with SparkContext
"""
import csv
from pyspark import SparkContext
from pyspark.conf import SparkConf
from collections import OrderedDict
# variables
app = 'exampleRDDApi'
# Spark context configurations
conf = (SparkConf().setAppName(app)
.setMaster("local[8]")
.set('spark.driver.maxResultSize', '8g')
.set('spark.logConf', 'true'))
sparkContext = SparkContext(conf=conf)
# Load CSV into a RDD
rdd = sparkContext.textFile('censo_escolar_ensino_medio_2017.csv')
# Count all records of df
rdd.count()
# Map all rows of RDD
rows = rdd.map(lambda row: row.split(';'))
# Groups all students by their school code
cached = rows.groupBy(lambda code: code[3]).cache()
# Preparing Dict
schools = {}
for school in cached.collect():
for student in school[1]:
schools.setdefault(school[0], dict())
schools[school[0]].setdefault(student[1], dict())
schools[school[0]][student[1]].setdefault(student[2], list())
schools[school[0]][student[1]][student[2]].append(student[0])
schools = OrderedDict(sorted(schools.items(), key=lambda t: t[0]))
# Export data to csv file
with open('censo_escolar_ensino_medio_2017_processed.csv', 'wb') as output:
fieldnames = [u'school_id', u'year', u'step', u'student_ids']
wr = csv.DictWriter(output, fieldnames=fieldnames, delimiter=';')
wr.writeheader()
print 'writing csv file'
for school_id, years in schools.items():
for year, steps in years.items():
for step, student_ids in steps.items():
wr.writerow({'school_id': school_id, 'year': year, 'step': step, 'student_ids': ",".join(student_ids)})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment