Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created April 2, 2016 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/af27f70c76dc6c6ae05c465271331ade to your computer and use it in GitHub Desktop.
Save rjurney/af27f70c76dc6c6ae05c465271331ade to your computer and use it in GitHub Desktop.
PySpark sorted reduce question: is reduce/sorted() the best way to prepare tuples that hold a sorted list?
# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet')
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x:
(x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
# Group flights by tail number, sorted by date, then flight number, then origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4])))
# Do same in a map step, more efficient or does pySpark know how to optimize the above?
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tuple:
(
tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4])))
)
flights_per_airplane.first()
(u'N163US',
[(u'AA', u'2015-07-01', 425, u'SEA', u'PHX', u'N163US'),
(u'AA', u'2015-07-01', 466, u'BOS', u'CLT', u'N163US'),
(u'AA', u'2015-07-01', 466, u'CLT', u'DEN', u'N163US'),
(u'AA', u'2015-07-01', 521, u'DEN', u'PHX', u'N163US'),
(u'AA', u'2015-07-01', 521, u'PHX', u'SEA', u'N163US'),
(u'AA', u'2015-07-02', 462, u'PHX', u'DFW', u'N163US'),
(u'AA', u'2015-07-02', 550, u'LAX', u'PHX', u'N163US'),
(u'AA', u'2015-07-02', 550, u'PHX', u'BOS', u'N163US'),
(u'AA', u'2015-07-02', 721, u'CLT', u'LAX', u'N163US'),
(u'AA', u'2015-07-02', 1807, u'LGA', u'CLT', u'N163US'),
(u'AA', u'2015-07-02', 2064, u'CLT', u'LGA', u'N163US'),
(u'AA', u'2015-07-02', 2064, u'DFW', u'CLT', u'N163US'),
(u'AA', u'2015-07-03', 450, u'BOS', u'PHX', u'N163US'),
(u'AA', u'2015-07-03', 450, u'PHX', u'SLC', u'N163US'),
(u'AA', u'2015-07-03', 494, u'PHX', u'DEN', u'N163US'),
(u'AA', u'2015-07-03', 494, u'SLC', u'PHX', u'N163US'),
(u'AA', u'2015-07-04', 1783, u'CLT', u'BOS', u'N163US'),
(u'AA', u'2015-07-04', 1861, u'LGA', u'CLT', u'N163US'),
(u'AA', u'2015-07-04', 2068, u'CLT', u'LGA', u'N163US'),
(u'AA', u'2015-07-04', 2068, u'DEN', u'CLT', u'N163US'),
(u'AA', u'2015-07-05', 752, u'BOS', u'CLT', u'N163US'),
(u'US', u'2015-04-18', 622, u'PHX', u'JFK', u'N163US'),
(u'US', u'2015-04-19', 1820, u'CLT', u'RSW', u'N163US'),
(u'US', u'2015-04-19', 2057, u'JFK', u'CLT', u'N163US'),
...])
@rjurney
Copy link
Author

rjurney commented Apr 3, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment