Skip to content

Instantly share code, notes, and snippets.

@anuragkapur
Last active November 26, 2015 08:48
Show Gist options
  • Save anuragkapur/6797d4d4a8f7c2d3b604 to your computer and use it in GitHub Desktop.
Save anuragkapur/6797d4d4a8f7c2d3b604 to your computer and use it in GitHub Desktop.
Bigdata analysis using Spark

Wind data and crime correlation

Part A

$ bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0

# load data from csv into a data frame
>>> df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/home/oxclo/datafiles/wind2014/*.csv')

>>> winds = df1.rdd

# View a sample of the loaded rdd to check if all is ok so far
>>> winds.first()

>>> df.registerTempTable('wind')

>>> sqlContext.sql("select count(*) from wind").show()
+------+
|   _c0|
+------+
|816487|
+------+

>>> sqlContext.sql("select * from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 limit 10").show()

>>> sqlContext.sql("select count(*) from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null").show()
+------+
|   _c0|
+------+
|596960|
+------+

>>> sqlContext.sql("select count(*) from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0").show()
+------+
|   _c0|
+------+
|596960|
+------+

>>> sqlContext.sql("select Station_ID as stationId, avg(Wind_Velocity_Mtr_Sec) as avgWind, avg(Ambient_Temperature_Deg_C) as avgTemperature from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null group by Station_ID").show()
+---------+------------------+------------------+
|stationId|           avgWind|    avgTemperature|
+---------+------------------+------------------+
|     SF34|6.3740521103512915|14.494108599723695|
|     SF36| 2.442013898181282|15.204654248524948|
|     SF37|2.3502400254803724|15.292148253530083|
|     SF04| 2.227858731246673|15.384222958393572|
|     SF07|1.3990926855761996|13.981300258260502|
|     SF11|2.1650892651019635|11.780572527895345|
|     SF12|0.7872557840616458|10.079832015028632|
|     SF14|0.0129030837004405|12.481982378854621|
|     SF15|1.9965524302746909|18.572942784700963|
|     SF17|0.9781646326033956|18.406523367396776|
|     SF18| 2.622396383983099|15.385942213222885|
|     SF19|1.7161732752693128|12.656468026587195|
+---------+------------------+------------------+

>>> sqlContext.sql("select Station_ID as stationId, Interval_End_Time as time, avg(Wind_Velocity_Mtr_Sec) as avgWind, avg(Ambient_Temperature_Deg_C) as avgTemperature from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null group by Station_ID, Interval_End_Time").show()
+---------+----------------+-------------------+------------------+
|stationId|            time|            avgWind|    avgTemperature|
+---------+----------------+-------------------+------------------+
|     SF17|2014-01-6? 14:00|             0.9454|            18.928|
|     SF17|2014-01-2? 16:20|            0.12625|17.387500000000003|
|     SF17|2014-01-5? 23:05|              2.093|             10.06|
|     SF17|2014-01-6? 18:15|              1.623|             10.09|
|     SF17|2014-02-7? 17:30|             0.9445|           12.9175|
|     SF17|2014-02-3? 13:05|              0.844|16.226666666666667|
|     SF17|2014-02-7? 01:55|              1.656|             12.78|
|     SF17|2014-02-7? 06:05|              0.675|             12.18|
|     SF17|2014-02-1? 20:10|              1.855|           10.4755|
|     SF17|2014-02-4? 08:15|              0.051|             12.71|
|     SF17|2014-02-5? 13:10|              1.713|17.865000000000002|
|     SF17|2014-02-5? 17:25|             0.1835|             15.61|
|     SF17|2014-02-4? 04:00|              2.184|             12.96|
|     SF17|2014-02-4? 19:40|              1.496|             13.84|
|     SF17|2014-02-6? 08:20|              2.888|             13.44|
|     SF17|2014-03-7? 00:55|0.40900000000000003|             11.02|
|     SF17|2014-03-7? 05:05|             1.1505|11.670000000000002|
|     SF17|2014-03-1? 14:30|              1.584|            19.694|
|     SF17|2014-03-4? 21:15|             1.2215|            14.235|
|     SF17|2014-03-6? 21:20|0.37566666666666665| 9.696666666666667|
+---------+----------------+-------------------+------------------+
only showing top 20 rows

>>> from dateutil.parser import parse
>>> from datetime import datetime
>>> def date_and_hour(s):
...   dt = parse(s.replace('?', ' '))
...   hour = dt.hour
...   return (dt.strftime("%Y-%m-%d"), hour)
... 

>>> windsMapped = winds.map(lambda s: (s.Station_ID, date_and_hour(s.Interval_End_Time), s.Wind_Velocity_Mtr_Sec, s.Ambient_Temperature_Deg_C))

>>> windsFilter1 = windsMapped.filter(lambda l: (l[2] != 0 and l[2] != None))

>>> windsFilter2 = windsFilter1.filter(lambda l: l[3] != 0 and l[2] != None)

>>> windsFilter2.first()
(u'SF17', ('2014-01-04', 8), 0.019, 1.403)

>>> windsMapped2 = windsFilter2.map(lambda l: ((l[0], l[1][0], l[1][1]), (l[2], l[3], 1)))

>>> windsMapped2.first()
((u'SF17', '2014-01-04', 8), (0.019, 1.403, 1))

>>> windsReduce1 = windsMapped2.reduceByKey(lambda (a,b,n), (c,d,m):(a+c,b+d,n+m))

>>> windsReduce1.first()
((u'SF15', '2014-07-04', 13), (165.48799999999994, 1946.18, 60))

>>> windsMapped3 = windsReduce1.map(lambda ((station,date, hour), (sumOfTemp ,sumOfWind, count)): ((station,date,hour), (sumOfTemp/count, sumOfWind/count)))

>>> windsMapped3.first()
((u'SF15', '2014-07-04', 13), (2.7581333333333324, 32.43633333333334))

>>> smallerResult = mapped3.sample(False, 0.1)

>>> smallerResult.collect()

>>> df.registerTempTable('mapped')

>>> mapped.first()

Part B

>>> df2 = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/home/oxclo/datafiles/incidents/*.csv')

>>> incidents = df2.rdd

>>> incidents.first()
Row(IncidntNum=150944099, Category=u'NON-CRIMINAL', Descript=u'AIDED CASE, MENTAL DISTURBED', DayOfWeek=u'Wednesday', Date=u'10/28/2015', Time=u'23:46', PdDistrict=u'NORTHERN', Resolution=u'NONE', Address=u'JACKSON ST / VANNESS AV', X=-122.423049294192, Y=37.7939338236045, Location=u'(37.7939338236045, -122.423049294192)', PdId=15094409964020)

>>> incidMapped1 = incidents.map(lambda i: (parse(i.Date).strftime("%Y-%m-%d"), parse(i.Date).year, i.Time.split(":",1)[0], [i.Y, i.X]))

>>> incidMapped1.first()
(u'2015-10-28', 2015, u'23', [37.7939338236045, -122.423049294192])

>>> incidMapped2 = incidMapped1.filter(lambda l: l[1] == 2014)

>>> incidMapped2.first()
('2014-12-31', 2014, u'23', [37.7799444052046, -122.414317857881])

>>> incidRemapped = incidMapped2.map(lambda (a,b,c,d): (a,c,d))

>>> incidRemapped.first()
('2014-12-31', u'23', [37.7799444052046, -122.414317857881])

>>> from scipy import spatial
>>> from numpy import array

>>> def locate(l,index,locations):
...   distance,i = index.query(l)
...   return locations[i]
... 

>>> located = incidRemapped.map(lambda (d, h, l): (locate(l, \
... spatial.KDTree(array( \
... [[37.7816834,-122.3887657],\
... [37.7469112,-122.4821759],\
... [37.7411022,-120.804151],\
... [37.4834543,-122.3187302],\
... [37.7576436,-122.3916382],\
... [37.7970013,-122.4140409],\
... [37.748496,-122.4567461],\
... [37.7288155,-122.4210133],\
... [37.5839487,-121.9499339],\
... [37.7157156,-122.4145311],\
... [37.7329613,-122.5051491],\
... [37.7575891,-122.3923824],\
... [37.7521169,-122.4497687]])),
... ["SF18", "SF04", "SF15", "SF17", "SF36", "SF37",\
... "SF07", "SF11", "SF12", "SF14", "SF16", "SF19", "SF34"] ),d,h))

>>> located.first()
('SF37', '2014-12-31', u'23')

>>> incidMapped3 = located.map(lambda l: ((l[0], l[1], l[2]), 1))
>>> incidMapped3.first()
(('SF37', '2014-12-31', u'23'), 1)

>>> incidReduced1 = incidMapped3.reduceByKey(lambda a, b: a+b)
>>> incidReduced1.first()
(('SF37', '2014-02-14', u'06'), 2)





Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment