Wind data and crime correlation
$ bin/pyspark --packages com.databricks:spark-csv_2.11:1.2.0
# load data from csv into a data frame
>>> df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/home/oxclo/datafiles/wind2014/*.csv')
>>> winds = df1.rdd
# View a sample of the loaded rdd to check if all is ok so far
>>> winds.first()
>>> df.registerTempTable('wind')
>>> sqlContext.sql("select count(*) from wind").show()
+------+
| _c0|
+------+
|816487|
+------+
>>> sqlContext.sql("select * from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 limit 10").show()
>>> sqlContext.sql("select count(*) from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null").show()
+------+
| _c0|
+------+
|596960|
+------+
>>> sqlContext.sql("select count(*) from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0").show()
+------+
| _c0|
+------+
|596960|
+------+
>>> sqlContext.sql("select Station_ID as stationId, avg(Wind_Velocity_Mtr_Sec) as avgWind, avg(Ambient_Temperature_Deg_C) as avgTemperature from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null group by Station_ID").show()
+---------+------------------+------------------+
|stationId| avgWind| avgTemperature|
+---------+------------------+------------------+
| SF34|6.3740521103512915|14.494108599723695|
| SF36| 2.442013898181282|15.204654248524948|
| SF37|2.3502400254803724|15.292148253530083|
| SF04| 2.227858731246673|15.384222958393572|
| SF07|1.3990926855761996|13.981300258260502|
| SF11|2.1650892651019635|11.780572527895345|
| SF12|0.7872557840616458|10.079832015028632|
| SF14|0.0129030837004405|12.481982378854621|
| SF15|1.9965524302746909|18.572942784700963|
| SF17|0.9781646326033956|18.406523367396776|
| SF18| 2.622396383983099|15.385942213222885|
| SF19|1.7161732752693128|12.656468026587195|
+---------+------------------+------------------+
>>> sqlContext.sql("select Station_ID as stationId, Interval_End_Time as time, avg(Wind_Velocity_Mtr_Sec) as avgWind, avg(Ambient_Temperature_Deg_C) as avgTemperature from wind where Wind_Velocity_Mtr_Sec!=0.0 and Ambient_Temperature_Deg_C!=0.0 and Interval_End_Time is not null group by Station_ID, Interval_End_Time").show()
+---------+----------------+-------------------+------------------+
|stationId| time| avgWind| avgTemperature|
+---------+----------------+-------------------+------------------+
| SF17|2014-01-6? 14:00| 0.9454| 18.928|
| SF17|2014-01-2? 16:20| 0.12625|17.387500000000003|
| SF17|2014-01-5? 23:05| 2.093| 10.06|
| SF17|2014-01-6? 18:15| 1.623| 10.09|
| SF17|2014-02-7? 17:30| 0.9445| 12.9175|
| SF17|2014-02-3? 13:05| 0.844|16.226666666666667|
| SF17|2014-02-7? 01:55| 1.656| 12.78|
| SF17|2014-02-7? 06:05| 0.675| 12.18|
| SF17|2014-02-1? 20:10| 1.855| 10.4755|
| SF17|2014-02-4? 08:15| 0.051| 12.71|
| SF17|2014-02-5? 13:10| 1.713|17.865000000000002|
| SF17|2014-02-5? 17:25| 0.1835| 15.61|
| SF17|2014-02-4? 04:00| 2.184| 12.96|
| SF17|2014-02-4? 19:40| 1.496| 13.84|
| SF17|2014-02-6? 08:20| 2.888| 13.44|
| SF17|2014-03-7? 00:55|0.40900000000000003| 11.02|
| SF17|2014-03-7? 05:05| 1.1505|11.670000000000002|
| SF17|2014-03-1? 14:30| 1.584| 19.694|
| SF17|2014-03-4? 21:15| 1.2215| 14.235|
| SF17|2014-03-6? 21:20|0.37566666666666665| 9.696666666666667|
+---------+----------------+-------------------+------------------+
only showing top 20 rows
>>> from dateutil.parser import parse
>>> from datetime import datetime
>>> def date_and_hour(s):
... dt = parse(s.replace('?', ' '))
... hour = dt.hour
... return (dt.strftime("%Y-%m-%d"), hour)
...
>>> windsMapped = winds.map(lambda s: (s.Station_ID, date_and_hour(s.Interval_End_Time), s.Wind_Velocity_Mtr_Sec, s.Ambient_Temperature_Deg_C))
>>> windsFilter1 = windsMapped.filter(lambda l: (l[2] != 0 and l[2] != None))
>>> windsFilter2 = windsFilter1.filter(lambda l: l[3] != 0 and l[2] != None)
>>> windsFilter2.first()
(u'SF17', ('2014-01-04', 8), 0.019, 1.403)
>>> windsMapped2 = windsFilter2.map(lambda l: ((l[0], l[1][0], l[1][1]), (l[2], l[3], 1)))
>>> windsMapped2.first()
((u'SF17', '2014-01-04', 8), (0.019, 1.403, 1))
>>> windsReduce1 = windsMapped2.reduceByKey(lambda (a,b,n), (c,d,m):(a+c,b+d,n+m))
>>> windsReduce1.first()
((u'SF15', '2014-07-04', 13), (165.48799999999994, 1946.18, 60))
>>> windsMapped3 = windsReduce1.map(lambda ((station,date, hour), (sumOfTemp ,sumOfWind, count)): ((station,date,hour), (sumOfTemp/count, sumOfWind/count)))
>>> windsMapped3.first()
((u'SF15', '2014-07-04', 13), (2.7581333333333324, 32.43633333333334))
>>> smallerResult = mapped3.sample(False, 0.1)
>>> smallerResult.collect()
>>> df.registerTempTable('mapped')
>>> mapped.first()
>>> df2 = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/home/oxclo/datafiles/incidents/*.csv')
>>> incidents = df2.rdd
>>> incidents.first()
Row(IncidntNum=150944099, Category=u'NON-CRIMINAL', Descript=u'AIDED CASE, MENTAL DISTURBED', DayOfWeek=u'Wednesday', Date=u'10/28/2015', Time=u'23:46', PdDistrict=u'NORTHERN', Resolution=u'NONE', Address=u'JACKSON ST / VANNESS AV', X=-122.423049294192, Y=37.7939338236045, Location=u'(37.7939338236045, -122.423049294192)', PdId=15094409964020)
>>> incidMapped1 = incidents.map(lambda i: (parse(i.Date).strftime("%Y-%m-%d"), parse(i.Date).year, i.Time.split(":",1)[0], [i.Y, i.X]))
>>> incidMapped1.first()
(u'2015-10-28', 2015, u'23', [37.7939338236045, -122.423049294192])
>>> incidMapped2 = incidMapped1.filter(lambda l: l[1] == 2014)
>>> incidMapped2.first()
('2014-12-31', 2014, u'23', [37.7799444052046, -122.414317857881])
>>> incidRemapped = incidMapped2.map(lambda (a,b,c,d): (a,c,d))
>>> incidRemapped.first()
('2014-12-31', u'23', [37.7799444052046, -122.414317857881])
>>> from scipy import spatial
>>> from numpy import array
>>> def locate(l,index,locations):
... distance,i = index.query(l)
... return locations[i]
...
>>> located = incidRemapped.map(lambda (d, h, l): (locate(l, \
... spatial.KDTree(array( \
... [[37.7816834,-122.3887657],\
... [37.7469112,-122.4821759],\
... [37.7411022,-120.804151],\
... [37.4834543,-122.3187302],\
... [37.7576436,-122.3916382],\
... [37.7970013,-122.4140409],\
... [37.748496,-122.4567461],\
... [37.7288155,-122.4210133],\
... [37.5839487,-121.9499339],\
... [37.7157156,-122.4145311],\
... [37.7329613,-122.5051491],\
... [37.7575891,-122.3923824],\
... [37.7521169,-122.4497687]])),
... ["SF18", "SF04", "SF15", "SF17", "SF36", "SF37",\
... "SF07", "SF11", "SF12", "SF14", "SF16", "SF19", "SF34"] ),d,h))
>>> located.first()
('SF37', '2014-12-31', u'23')
>>> incidMapped3 = located.map(lambda l: ((l[0], l[1], l[2]), 1))
>>> incidMapped3.first()
(('SF37', '2014-12-31', u'23'), 1)
>>> incidReduced1 = incidMapped3.reduceByKey(lambda a, b: a+b)
>>> incidReduced1.first()
(('SF37', '2014-02-14', u'06'), 2)