Skip to content

Instantly share code, notes, and snippets.

@justyns
Last active December 17, 2015 20:00
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justyns/b1fb76c2e4056ed004c3 to your computer and use it in GitHub Desktop.
Save justyns/b1fb76c2e4056ed004c3 to your computer and use it in GitHub Desktop.
Quick hack to get the fastest route from/to home to/from work, and post it to mqtt so that Home Assistant/Openhab can pick it up. I had issues getting the paho mqtt library working for some reason, so I just resorted to using the mosquitto_pub client.
- platform: mqtt
state_topic: "traffic/worktohome/fastest"
name: "Traffic work to home"
unit_of_measurement: "minutes"
state_format: "json:minutes"
- platform: mqtt
state_topic: "traffic/hometowork/fastest"
name: "Traffic home to work"
unit_of_measurement: "minutes"
state_format: "json:minutes"
import googlemaps
import json
from datetime import datetime
from collections import OrderedDict
API_KEY = 'API KEY from google traffic api'
home = 'home street address'
work = 'work street address'
def getRoute(fromAddr, toAddr):
gmaps = googlemaps.Client(key=API_KEY)
now = datetime.now()
results = OrderedDict()
directions_result = gmaps.directions(fromAddr,
toAddr,
mode="driving",
departure_time=now,
alternatives=True)
results['description'] = "{fromAddr} to {toAddr}".format(fromAddr=fromAddr, toAddr=toAddr)
results['fastest'] = {'time': None, 'text': None, 'name': None}
for route in directions_result:
# print json.dumps(directions_result, indent=4)
# print route['summary']
total_distance = 0.0
total_time = 0.0
results[route['summary']] = OrderedDict(legs=[])
for leg in route['legs']:
# print leg['distance']
total_distance += leg['distance']['value']
total_time += leg['duration']['value']
for step in leg['steps']:
results[route['summary']]['legs'].append("{distance} {duration} {desc}".format(
distance=step['distance']['text'],
duration=step['duration']['text'],
desc=step['html_instructions']
))
# print "\t", step['distance']['text'], step['duration']['text'], step['html_instructions']
results[route['summary']]['total_miles'] = (total_distance * 0.00062137)
results[route['summary']]['total_minutes'] = (total_time / 60)
if not results['fastest']['time'] or results['fastest']['time'] > total_time:
# new fastest route
results['fastest'].update(time=total_time,
name=route['summary'],
text="{minutes:.2f} min via {name}".format(minutes=results[route['summary']]['total_minutes'], name=route['summary']))
# print "Total Distance: %.2f miles" % (total_distance * 0.00062137)
# print "Total Duration: %.2f minutes" % (total_time / 60)
# print ""
return results
res = {
'worktohome': getRoute(work, home),
'hometowork': getRoute(home, work)
}
print json.dumps(res, indent=4)
from subprocess import check_output
msgs = []
for x in res:
res[x]['fastest']['minutes'] = res[x]['fastest']['time'] / 60
print "creating msg %s: %r" % (x, res[x]['fastest'])
print "sending message to mqtt: %s" % json.dumps(res[x]['fastest'])
print check_output(['/usr/bin/mosquitto_pub',
'-d',
'-h', 'mqtt',
'-p', '8884',
'-u', 'USERNAME',
'-P', 'PASSWORD',
'--cafile', '/root/cacert.pem',
'-q', '1',
'-t', 'traffic/%s/fastest' % x,
'-r',
'-m', json.dumps(res[x]['fastest'])
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment