Skip to content

Instantly share code, notes, and snippets.

@timfpark
Created November 3, 2015 04:33
Show Gist options
  • Save timfpark/e1b672fcdc921f017837 to your computer and use it in GitHub Desktop.
Save timfpark/e1b672fcdc921f017837 to your computer and use it in GitHub Desktop.
from pyspark import SparkConf, SparkContext
def location_deserializer(line):
columns = line.split(",")
if len(columns) < 4:
return []
try:
return [{
'id': columns[0],
'latitude': float(columns[1]),
'longitude': float(columns[2]),
'timestamp': float(columns[3])
}]
except ValueError:
return []
def bounding_box_filter(latitude_north, longitude_east, latitude_south, longitude_west):
def filter(location):
outside = location['latitude'] > latitude_north or location['latitude'] < latitude_south or \
location['longitude'] < longitude_west or location['longitude'] > longitude_east
return not outside
return filter
def location_serializer(location):
return location['id'] + "," + str(location['latitude']) + "," + str(location['longitude']) + "," + str(location['timestamp'])
import time
def main(sc):
spain_filter = bounding_box_filter(36.370726, -5.235771, 35.998590, -6.203182)
locations = sc.textFile('locationsAct.csv').flatMap(location_deserializer)
filteredLocations = locations.filter(spain_filter)
jsonLocations = filteredLocations.map(location_serializer)
jsonLocations.saveAsTextFile("spain")
if __name__ == "__main__":
conf = SparkConf()
sc = SparkContext(conf=conf)
main(sc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment