Skip to content

Instantly share code, notes, and snippets.

@mmahmoodictbd
Created February 6, 2023 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmahmoodictbd/915270d49457cabb4bb3a5b57f53c57e to your computer and use it in GitHub Desktop.
Save mmahmoodictbd/915270d49457cabb4bb3a5b57f53c57e to your computer and use it in GitHub Desktop.
AppDaemon app: Expose Bus Trip info as HA Sensor
# https://appdaemon.readthedocs.io/en/latest/
import appdaemon.plugins.hass.hassapi as hass
import requests
from jsonpath_ng.ext import parser
import json
import os
import time
class BusTrips(hass.Hass):
def initialize(self):
self.log("BusTrips: You are now ready to run!")
self.fix_timezone()
self.listen_event(self.update_sensor_bus_trips, "fetch_bus_trips_event")
def fix_timezone(self):
os.environ['TZ'] = 'Europe/Amsterdam'
time.tzset()
def update_sensor_bus_trips(self, event_name, data, kwargs):
trips = self.fetch_api_response()
self.set_state("sensor.bus_trip_now", state = trips[0]['noorderplassen_zuid_arrival_time'], attributes = trips[0])
self.set_state("sensor.bus_trip_next", state = trips[1]['noorderplassen_zuid_arrival_time'], attributes = trips[1])
def fetch_api_response(self):
url = "https://keolis-reizen-api.wipkip.com/v1/directions/9292/routes"
headers = {
'content-type': 'application/x-www-form-urlencoded',
'x-api-key': '',
}
payload='date={}&from_location=almere/bushalte-noorderplassen-zuid&to_location=almere/bushalte-station-centrum&time_type=Departure&time={}&vehicle_types=Bus'.format(time.strftime("%Y-%m-%d"), time.strftime("%H:%M"))
print(payload)
response = requests.request("POST", url, headers=headers, data=payload)
trips = parser.parse("$.included[?(@.type=='trip')]").find(json.loads(response.text))
for i in range(len(trips)):
trips[i] = self.extract_relevant_data(trips[i].value)
for trip in list(trips):
if self.is_old_trip(trip):
trips.remove(trip)
# Only interested in the first 2 trips
trips = trips[:2]
for trip in trips:
print(trip)
return trips
def extract_relevant_data(self, trip):
out = {}
out['trip_number'] = trip['attributes']['trip_number']
out['first_departure_time'] = trip['attributes']['first_departure_time']
out['noorderplassen_zuid_arrival_time'] = trip['attributes']['planned_arrival_times'][1]
out['noorderplassen_zuid_arrival_hour'] = out['noorderplassen_zuid_arrival_time'][:2]
out['noorderplassen_zuid_arrival_minute'] = out['noorderplassen_zuid_arrival_time'][3:5]
out['last_fetched_at'] = time.strftime("%Y-%m-%d %H:%M")
return out
def is_old_trip(self, trip):
return trip['noorderplassen_zuid_arrival_time'] < time.strftime("%H:%M")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment