Skip to content

Instantly share code, notes, and snippets.

@Cediddi
Created January 20, 2021 01:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Cediddi/cfcf84aa3541a46beeee4064e3e33e40 to your computer and use it in GitHub Desktop.
Save Cediddi/cfcf84aa3541a46beeee4064e3e33e40 to your computer and use it in GitHub Desktop.
A simple script that can run as a daemon to check and notify you about your public transport arriving in next hour.
import json
import pathlib
import sys
import time
import datetime
from copy import deepcopy
from typing import List, Dict, Any
from urllib.parse import urljoin
import appdirs
import daemonocle
import requests
from notifypy import Notify
"""
How to use?
1- Run the script with configure action `python allons_y.py configure`
2- Start the daemon as `python allons_y.py start`
3- ???
4- PROFIT
"""
class AllonsDaemon(daemonocle.Daemon):
"""
The main class Allons-Y uses. It deals with process management as well as the business logic.
"""
user_config_dir = pathlib.Path(appdirs.user_config_dir(appname="allons_y", roaming=True, version="1"))
config_file = user_config_dir / "config.json"
config_template = {
"options": {
"notification_interval": 10, # seconds
},
"routes": {
"home": {
"station_id": None, # Station id from the api
"line_id": None, # Line id from the api
"start_time": None, # When should notifications start
"end_time": None, # When should notifications end
},
"work": {"station_id": None, "line_id": None, "start_time": None, "end_time": None},
}
}
base_path = "https://v5.vbb.transport.rest/"
def __init__(self, **kwargs):
kwargs["worker"] = self.main
kwargs["pid_file"] = self.user_config_dir / "pid"
super().__init__(**kwargs)
def search_station_api(self, name: str) -> Dict[str, Dict]:
"""
Searches stations based on the name query.
:param name: A name query
:return: List of stations that match
"""
resp = requests.get(urljoin(self.base_path, "stations"), params={"query": name, "limit": 10, "fuzzy": True})
resp.raise_for_status()
return resp.json()
def get_station_api(self, station_id: str) -> Dict[str, Any]:
"""
Get's the details of a station.
:param station_id: Station_id
:return: Detailed station object
"""
resp = requests.get(urljoin(self.base_path, f"stations/{station_id}"))
resp.raise_for_status()
return resp.json()
def get_line_api(self, line_id: str) -> Dict[str, Any]:
"""
Get's the details of a line
:param line_id: Line object
:return: Detailed line object
"""
resp = requests.get(urljoin(self.base_path, f"lines/{line_id}"))
resp.raise_for_status()
return resp.json()
def get_stop_api(self, station_id: str) -> Dict[str, Any]:
"""
Gets the details of a stop.
There's an ongoing issue with this endpoint.
Please check: https://github.com/derhuerst/vbb-rest/issues/45
:param station_id: ID of the station
:return: Details of the stop
"""
resp = requests.get(urljoin(self.base_path, f"stops/{station_id}"))
resp.raise_for_status()
return resp.json()
def get_stop_arrivals_api(self, station_id: str, line_name: str) -> List[Dict[str, Any]]:
"""
Gets the arrivals of a stop/station.
There's an ongoing issue with this endpoint.
Please check: https://github.com/derhuerst/vbb-rest/issues/45
:param station_id: ID of the station
:param line_name: ID of the stop
:return: Details of the stop
"""
resp = requests.get(urljoin(self.base_path, f"stops/{station_id}/arrivals"))
resp.raise_for_status()
data = resp.json()
related_data = []
for arrival in data:
if arrival['line']['name'] == line_name:
related_data.append(arrival)
related_data.sort(key=lambda x: datetime.datetime.fromisoformat(x['when']))
return related_data
def update_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Fetches the station and line information from api.
:param config: The configuration object.
:return: Updated config object
"""
updated_config = deepcopy(config)
for key, value in config['routes'].items():
updated_config['routes'][key]['station'] = self.get_station_api(value['station'])
updated_config['routes'][key]['line'] = self.get_line_api(value['line'])
return updated_config
def check_route(self, config: Dict[str, Any]) -> List[str]:
"""
Checks the arrivals in next hour. Filters down to selected lines and notifies the user if there are any arrivals
within the notification period.
:param config: The configuration object.
:return: None
"""
notifications = []
for route in config['routes'].values():
route_arrivals = self.get_stop_arrivals_api(route['station']['id'], route['line']['name'])
for arrival in route_arrivals:
when = datetime.datetime.fromisoformat(arrival['when'])
timespan_related = (route['start_time'] > when.time() > route['end_time'])
if timespan_related:
notifications.append(f"{route['line']['name']} will arrive to {route['station']['name']} "
f"in {when - datetime.datetime.now()}. Be Ready!")
return notifications
def get_station(self, stations: List[Dict]) -> Dict[str, Any]:
"""Asks the user to choose one of the stations"""
while True:
print("Please select the station by it's index.")
for idx, station in enumerate(stations):
print(f"[{idx}] - {station['name']}")
user_input = input("> ")
if user_input.isdigit() and int(user_input) in range(len(stations)):
actual_selected = stations[int(user_input)]
print(f"You have selected {actual_selected['name']}")
return self.get_station_api(actual_selected['id'])
else:
print("Invalid input.")
continue
def filter_station(self, key: str) -> List[Dict[str, Any]]:
"""Takes a station name query from the user and gets the correct station."""
while True:
print(f"What is the nearest station to your {key}?")
user_input = input("> ")
if not user_input:
print("Invalid Input")
continue
stations = self.search_station_api(user_input)
if not stations:
print("No station has been found...")
continue
else:
return list(stations.values())
def get_line(self, station: Dict[str, Any]) -> Dict[str, Any]:
"""Asks the user to select a line that's related to the selected station."""
lines = station["lines"]
while True:
print("Please select the line by it's index.")
for idx, line in enumerate(lines):
line = self.get_line_api(line['id'])
variants = sorted(line["variants"], key=lambda x: x["trips"], reverse=True)
most_trips = variants[0]
# BUG: https://github.com/derhuerst/vbb-rest/issues/45
# first_stop = self.get_stop_api(most_trips["stops"][0])
# last_stop = self.get_stop_api(most_trips["stops"][-1])
# print(f"[{idx}] - {line['name']} ({first_stop['name']} -> {last_stop['name']})")
print(f"[{idx}] - {line['name']}")
user_input = input("> ")
if user_input.isdigit() and int(user_input) in range(len(lines)):
actual_selected = lines[int(user_input)]
print(f"You have selected {actual_selected['name']}")
return actual_selected
else:
print("Invalid input.")
continue
def ask_time(self, key: str):
"""Reads a iso8601 formatted time from user"""
while True:
print(f"When should notifications {key}? (ISO-8601 formatted)")
user_input = input("> ")
if not user_input:
print("Invalid Input")
continue
try:
return datetime.time.fromisoformat(user_input)
except ValueError:
print('Invalid iso time.')
continue
def get_route(self, route):
"""Completes a route to be saved to the config"""
stations = self.filter_station(route)
station = self.get_station(stations)
line = self.get_line(station)
start = self.ask_time("start")
end = self.ask_time("end")
return {
"station": station['id'],
"line": line['id'],
"start_time": start.isoformat(),
"end_time": end.isoformat(),
}
def ask_options(self):
"""Asks user the additional options."""
notification_interval = self.config_template['options']['notification_interval']
while True:
print(f"How fast should we notify you? [default={notification_interval}, min=1]")
user_input = input("> ")
if not user_input:
break
try:
value = int(user_input)
if value < 1:
print('Invalid Input')
continue
else:
notification_interval = value
break
except ValueError:
print('Invalid Input')
continue
return {"notification_interval": notification_interval}
def questions(self):
empty_config: Dict[str, Any] = deepcopy(self.config_template)
for route in empty_config["routes"].keys():
route_config = self.get_route(route)
empty_config["routes"][route] = route_config
empty_config["options"] = self.ask_options()
return empty_config
@daemonocle.expose_action
def configure(self):
"""
The configuration workflow. You must use it before running the main loop.
:return:
"""
config = self.questions()
self.user_config_dir.mkdir(exist_ok=True, parents=True)
if not self.config_file.exists():
with self.config_file.open("w") as f:
json.dump(config, f)
print(f"Configuration created at {self.config_file}")
exit(0)
def main(self):
"""
Application loop, checks the route notifications every 10 seconds.
:return: None
"""
if not self.config_file.exists():
print("No configuration found, please run 'allons_y configure'")
exit(1)
with self.config_file.open() as f:
try:
config = json.load(f)
except json.JSONDecodeError:
print("Corrupt configuration, please run 'allons_y configure'")
exit(1)
updated_config = self.update_config(config)
last_notified = datetime.datetime.now() - datetime.timedelta(
seconds=updated_config['options']['notification_interval'])
while True:
now = datetime.datetime.now()
notifications = self.check_route(updated_config)
time.sleep(10)
if notifications and (now - last_notified).total_seconds() > config["options"]["notification_interval"]:
notification = Notify()
notification.title = "Allons-Y Alonso, we have a public transport vehicle to catch!"
notification.message = "\n".join(notifications)
notification.send()
last_notified = now
if __name__ == '__main__':
daemon = AllonsDaemon()
daemon.cli()
appdirs
daemonocle
requests
notify-py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment