Skip to content

Instantly share code, notes, and snippets.

@d3d9
Last active October 25, 2020 15:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d3d9/c2c796b44220eaf70eeaf876b2f6e100 to your computer and use it in GitHub Desktop.
Save d3d9/c2c796b44220eaf70eeaf876b2f6e100 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from requests import get
from typing import Set, List, Dict, Callable, Union, Optional, Any, Tuple, Iterable
import xml.etree.ElementTree as ET
@dataclass
class Meldung:
symbol: str
text: str
color: Optional[str] = None
efa: bool = False
# blocking: bool = False
def __post_init__(self):
if self.text is None:
self.text = ""
class MOT(Enum):
TRAIN = 1
HISPEED = 2
TRAM = 3
BUS = 4
HANGING = 5
trainTMOTefa = {0, 1, 13, 14, 15, 16, 18}
trainMOT = {MOT.TRAIN, MOT.HISPEED}
@dataclass
class Departure:
linenum: str
direction: str
direction_planned: str
deptime: datetime
deptime_planned: datetime
realtime: bool
delay: int = 0 # minutes
messages: Union[List[str], List[Meldung]] = field(default_factory=list) # str als Zwischenschritt
coursesummary: Optional[str] = None
mot: Optional[MOT] = None
# accessibility?
# operator?
platformno: Optional[str] = None
platformno_planned: Optional[str] = None
platformtype: Optional[str] = None
stopname: Optional[str] = None
stopid: Optional[str] = None
place: Optional[str] = None
cancelled: Optional[bool] = None
earlytermination: Optional[bool] = None
headsign: Optional[str] = None
color: Optional[str] = None
arrtime: Optional[datetime] = None
arrtime_planned: Optional[datetime] = None
disp_countdown: Optional[int] = None # minutes
disp_linenum: Optional[str] = None
disp_direction: Optional[str] = None
type_getpayload = Dict[str, Union[str, int, Iterable[Union[str, int]]]]
type_data = Dict[str, Any]
type_depmsgdata = Tuple[List[Departure], List[Meldung], type_data]
type_depfnlist = List[Tuple[Callable[..., type_depmsgdata],
List[Dict[str, Any]]]]
type_depfns = Dict[Tuple[str, bool], type_depfnlist]
def readefaxml(root: ET.Element, tz: timezone,
ignore_infoTypes: Optional[Set] = None, ignore_infoIDs: Optional[Set] = None,
content_for_short_titles: bool = True) -> type_depmsgdata:
deps: List[Departure] = []
stop_messages: List[Meldung] = []
# (itdStopInfoList bei Abfahrten bzw. infoLink bei itdOdvName)
# treten (alle?) auch bei einzelnen Abfahrten auf.. erstmal keine daten hierbei
# evtl. auslesen und schauen, was wirklich haltbezogen ist und nicht anderswo dabei ist
_itdOdv = root.find('itdDepartureMonitorRequest').find('itdOdv')
_itdOdvPlace = _itdOdv.find('itdOdvPlace')
if _itdOdvPlace.get('state') != "identified":
return deps, stop_messages, {}
place = _itdOdvPlace.findtext('odvPlaceElem')
_itdOdvName = _itdOdv.find('itdOdvName')
if _itdOdvName.get('state') != "identified":
return deps, stop_messages, {}
_itdOdvNameElem = _itdOdvName.find('odvNameElem')
stopname = _itdOdvNameElem.text or _itdOdvNameElem[0].tail or next((t.tail for t in _itdOdvNameElem if t is not None), None)
for dep in root.iter('itdDeparture'):
servingline = dep.find('itdServingLine')
_itdNoTrain = servingline.find('itdNoTrain')
_itdNoTrainName = _itdNoTrain.get('name', '')
linenum = servingline.attrib['number']
_trainName = servingline.attrib.get('trainName', '')
if linenum.endswith(" "+_itdNoTrainName):
linenum = linenum.replace(" "+_itdNoTrainName, "")
elif linenum.endswith(" "+_trainName):
linenum = linenum.replace(" "+_trainName, "")
countdown = int(dep.attrib['countdown'])
isrealtime = bool(int(servingline.attrib['realtime']))
if isrealtime:
delay = int(_itdNoTrain.attrib['delay'])
else:
delay = 0
cancelled = delay == -9999
messages: List[str] = []
genAttrList = dep.find('genAttrList')
direction_planned = servingline.get('direction')
direction_actual = direction_planned
earlytermination = False
_earlytermv = genAttrList.findtext("./genAttrElem[name='EarlyTermination']/value") if genAttrList else None
if (not cancelled) and _earlytermv:
direction_actual = _earlytermv
earlytermination = True
# Beobachtungen bzgl. Steigänderung:
# genAttrElem mit name platformChange und value changed
# platform bei itdDeparture entspricht originaler, platformName der neuen..
# haben aber eigentlich unterschiedliche Bedeutungen
# (bei Bussen steht dann da z. B. "Bstg. 1" in platformName
# Auseinanderhalten eigentlich sinnvoll, bei platformChange muss aber wohl ne Ausnahme gemacht werden
# weiter beobachten, wie sowas in weiteren Fällen aussieht..
# Sowas wie "Aachen, Hbf,Aachen" verbessern
_ds = direction_actual.split(",")
if len(_ds) > 1 and direction_actual.startswith(_ds[-1].strip()):
disp_direction = ",".join(_ds[:-1])
else:
disp_direction = direction_actual
itddatetime = dep.find('itdDateTime')
itddatea = itddatetime.find('itdDate').attrib
itdtimea = itddatetime.find('itdTime').attrib
deptime_planned = datetime(int(itddatea['year']), int(itddatea['month']), int(itddatea['day']), int(itdtimea['hour']), int(itdtimea['minute']), tzinfo=tz)
deptime = deptime_planned
if isrealtime and not cancelled:
itdrtdatetime = dep.find('itdRTDateTime')
itdrtdatea = itdrtdatetime.find('itdDate').attrib
itdrttimea = itdrtdatetime.find('itdTime').attrib
deptime = datetime(int(itdrtdatea['year']), int(itdrtdatea['month']), int(itdrtdatea['day']), int(itdrttimea['hour']), int(itdrttimea['minute']), tzinfo=tz)
for _infoLink in dep.iter('infoLink'):
if ((ignore_infoTypes and _infoLink.findtext("./paramList/param[name='infoType']/value") in ignore_infoTypes)
or (ignore_infoIDs and _infoLink.findtext("./paramList/param[name='infoID']/value") in ignore_infoIDs)):
continue
_iLTtext = _infoLink.findtext('infoLinkText')
if _iLTtext:
# kurze, inhaltslose (DB-)Meldungstitel
if content_for_short_titles and _iLTtext in {"Störung.", "Bauarbeiten.", "Information."}:
_infoLink_infoText = _infoLink.find('infoText')
if _infoLink_infoText is None: continue
_iLiTcontent = _infoLink_infoText.findtext('content')
if _iLiTcontent:
messages.append(f"{_iLTtext[:-1]}: {_iLiTcontent}")
continue
# else: weiter, nächste Zeile
messages.append(_iLTtext)
else:
_infoLink_infoText = _infoLink.find('infoText')
if _infoLink_infoText is None: continue
_iLiTsubject = _infoLink_infoText.findtext('subject')
_iLiTsubtitle = _infoLink_infoText.findtext('subtitle')
_msg = ""
if _iLiTsubject: _msg += (_iLiTsubject + (" " if _iLiTsubject.endswith(":") else ": "))
if _iLiTsubtitle: _msg += _iLiTsubtitle
if _msg: messages.append(_msg)
itdNoTrainText = servingline.findtext('itdNoTrain')
if itdNoTrainText:
messages.append(f"{linenum}: {itdNoTrainText}")
mot = None
motType = int(servingline.get('motType'))
if motType in {5, 6, 7, 10, 17, 19}:
mot = MOT.BUS
elif motType in {0, 1, 13, 14, 15, 16, 18}:
if motType in {15, 16} or (genAttrList and any(s in {"HIGHSPEEDTRAIN", "LONG_DISTANCE_TRAINS"} for s in (x.findtext('value') for x in genAttrList.findall('genAttrElem')))):
mot = MOT.HISPEED
else:
mot = MOT.TRAIN
elif motType in {2, 3, 4, 8}:
mot = MOT.TRAM
elif motType == 11:
mot = MOT.HANGING
deps.append(Departure(linenum=linenum,
direction=direction_actual,
direction_planned=direction_planned,
deptime=deptime,
deptime_planned=deptime_planned,
realtime=isrealtime,
delay=delay,
messages=messages,
coursesummary=servingline.findtext('itdRouteDescText'),
mot=mot,
platformno=dep.get('platform'),
platformtype=dep.get('pointType', ""),
stopname=(dep.get('nameWO') or stopname),
stopid=dep.get('gid'),
place=place,
cancelled=cancelled,
earlytermination=earlytermination,
disp_countdown=countdown,
disp_direction=disp_direction))
return deps, stop_messages, {}
def getefadeps(serverurl: str, timeout: Union[int, float], ifopt: str, limit: int, tz: timezone,
# added:
getdeps_placelist = ["Hagen ", "HA-"],
getdeps_mincountdown = 0,
#
userealtime: bool = True, exclMOT: Optional[Set[int]] = None, inclMOT: Optional[Set[int]] = None,
ignore_infoTypes: Optional[Set] = None, ignore_infoIDs: Optional[Set] = None, content_for_short_titles: bool = True) -> type_depmsgdata:
payload: type_getpayload = {'name_dm': ifopt, 'type_dm': 'any', 'mode': 'direct', 'useRealtime': int(userealtime), 'limit': str(limit)}
if inclMOT:
payload['includedMeans'] = inclMOT
elif exclMOT:
payload['excludedMeans'] = exclMOT
payload['useProxFootSearch'] = 0
payload['deleteAssignedStops_dm'] = 1
r = get(serverurl, timeout=timeout, params=payload)
r.raise_for_status()
try:
root = ET.fromstring(r.content)
deps, messages, data = readefaxml(root, tz, ignore_infoTypes, ignore_infoIDs, content_for_short_titles)
nowtime = datetime.now(tz)
for dep in deps:
# ggf. anders runden?
# dep.disp_countdown = dep.disp_countdown if dep.disp_countdown is not None else int(round((dep.deptime-nowtime).total_seconds()/60))
dep.disp_countdown = dep.disp_countdown if dep.disp_countdown is not None else int((dep.deptime-nowtime.replace(second=0, microsecond=0)).total_seconds()/60)
dep.disp_linenum = (dep.disp_linenum or dep.linenum)
if not dep.disp_direction:
if dep.headsign:
dep.disp_direction = dep.headsign.replace("\n", "/")
else:
dep.disp_direction = dep.direction
if getdeps_placelist:
for place in getdeps_placelist: # auslagern, feiner machen, +abk.verz.?
dep.disp_direction = dep.disp_direction.replace(place, "")
if dep.mot is None:
dep.mot = MOT.BUS
if dep.delay is None:
dep.delay = 0
sorteddeps = sorted([dep for dep in deps if (dep.disp_countdown or 0) >= getdeps_mincountdown],
key=lambda dep: (dep.disp_countdown, not dep.cancelled, -dep.delay, not dep.earlytermination, dep.disp_linenum))
return sorteddeps, messages, data
except Exception as e:
# logger.debug(f"request data:\n{r.content}")
print("error. request data: \n{r.content}")
print(e)
raise e
#!/usr/bin/env python3.7
from flask import Flask, request, abort, make_response
from flask_cors import CORS
from datetime import datetime
from dataclasses import asdict
from depdata import getefadeps
from enum import Enum
from json import dumps, JSONEncoder
from sys import stderr
from time import sleep
from traceback import print_exc
app = Flask(__name__)
CORS(app)
#efaserver = 'https://openservice.vrr.de/vrr/XML_DM_REQUEST'
efaserver = 'https://efa.vrr.de/vrr/XML_DM_REQUEST'
servertimeout = 6
removekeys = ("direction", "platformno_planned", "stopname", "headsign",
"color", "arrtime", "arrtime_planned", "disp_linenum")
replacements = {
"Stadtmitte/Volme Galerie": "Stadtmitte",
"Hbf": "Hagen Hbf",
"Hauptbahnhof": "Hagen Hbf",
"Ennepetal Busbahnhof": "Ennepetal Busbf",
"Breckerfeld Busbahnhof": "Breckerfeld Busbf",
"Breckerf. Wengeberg Penninckw.": "Breckerfeld-Wengeberg",
"Schwerte, Bahnhof": "Schwerte Bf",
"Essen Hauptbahnhof": "Essen Hbf",
"MG Hbf /Europaplatz": "Mönchengladbach Hbf",
"Kierspe, Feuerwehrgerätehaus": "Kierspe Feuerwehrgeräteh."
}
manydeps = {"de:05914:2003", "de:05914:2007", "de:05914:2008", "de:05914:2006",
"de:05914:2059", "de:05914:2060", "de:05914:2194", "de:05914:2020",
"de:05914:2016", "de:05914:2280", "de:05914:2083"}
limit_normal = 20
limit_high = 30
class CustomEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
if isinstance(o, Enum):
return o.name
return JSONEncoder.default(self, o)
def cleandir(_d):
if _d.startswith("Hagen "): # fuer direction_planned
_d = _d[6:]
if _d in replacements:
_d = replacements[_d]
if "f." in _d:
_d = _d.replace("Hbf.", "Hbf").replace("Bf.", "Bf")
return _d
def cleandict(depd):
for k in removekeys:
depd.pop(k, None)
depd["disp_direction"] = cleandir(depd["disp_direction"])
if depd["earlytermination"]:
depd["direction_planned"] = cleandir(depd["direction_planned"])
else:
depd.pop("direction_planned", None)
return depd
@app.route("/")
def deps():
if not "stopid" in request.args:
abort(500, description="no get parameter stopid")
ifopt = request.args.get("stopid")
_last_e = None
for _try in range(1,4):
try:
efa = getefadeps(efaserver, servertimeout, ifopt, limit_high if ifopt in manydeps else limit_normal, datetime.utcnow().astimezone().tzinfo)
break
except Exception as e:
sleep(0.3 * _try)
print(f"\n! {e.__class__.__name__}, try {_try}", file=stderr)
print_exc()
_last_e = e
else:
abort(500, description=f"EFA request: {_last_e.__class__.__name__}\n{_last_e}")
d = [cleandict(asdict(_dep)) for di, _dep in enumerate(efa[0]) if (di < 10 or _dep.disp_countdown <= 180)]
o = dumps(d, cls=CustomEncoder)
r = make_response(o)
r.mimetype = 'application/json'
return r
if __name__ == "__main__":
app.run(host='0.0.0.0', ssl_context=('fullchain.pem', 'privkey.pem'))
[Unit]
Description=depflask
[Service]
ExecStart=/bin/sh -c 'gunicorn -w8 --access-logfile - --certfile fullchain.pem --keyfile=privkey.pem --bind 0.0.0.0:8007 depflask:app'
Restart=always
User=root
Group=root
WorkingDirectory=/var/depflask
Environment="PYTHONUNBUFFERED=TRUE"
[Install]
WantedBy=multi-user.target
document.querySelectorAll("*").forEach(obj => {
if (!obj.style) return;
if (!!obj.style.opacity && obj.style.opacity == "1") obj.style.opacity = null;
if (!!obj.style.fill && obj.style.fill == "rgb(0, 0, 0)") obj.style.fill = null;
if (!!obj.style.fill && obj.style.fill == "none") obj.style.fillRule = null;
if (!!obj.style.fillRule && obj.style.fillRule == "nonzero") obj.style.fillRule = null;
if (!!obj.style.fillOpacity && obj.style.fillOpacity == "1") obj.style.fillOpacity = null;
if (!!obj.style.stroke && obj.style.stroke == "none") obj.style.stroke = null;
if (!!obj.style.strokeWidth && parseFloat(obj.style.strokeWidth) == 1) obj.style.strokeWidth = null;
if (!!obj.style.strokeOpacity && obj.style.strokeOpacity == "1") obj.style.strokeOpacity = null;
if (!!obj.style.strokeDashoffset && parseFloat(obj.style.strokeDashoffset) == 0) obj.style.strokeDashoffset = null;
if (!!obj.style.strokeDasharray && obj.style.strokeDasharray == "none") obj.style.strokeDasharray = null;
if (!!obj.style.strokeMiterlimit && obj.style.strokeMiterlimit == 4) obj.style.strokeMiterlimit = null;
if (!!obj.style.strokeLinecap && obj.style.strokeLinecap == "butt") obj.style.strokeLinecap = null;
if (!!obj.style.strokeLinejoin && obj.style.strokeLinejoin == "miter") obj.style.strokeLinejoin = null;
if (!!obj.style.display && obj.style.display == "inline") obj.style.display = null;
if (!!obj.attributes.style && obj.attributes.style.value.includes("-inkscape-font-specification"))
obj.attributes.style.value =
obj.attributes.style.value
.replace("-inkscape-font-specification:'Fira Sans';", "")
.replace("-inkscape-font-specification:'Fira Sans, Normal';", "")
.replace("-inkscape-font-specification:'Fira Sans Normal';", "")
.replace("-inkscape-font-specification:'Fira Sans Medium';", "")
.replace("-inkscape-font-specification:'Fira Sans Italic';", "")
.replace("-inkscape-font-specification:'Fira Sans Light Italic';", "")
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold';", "")
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold Italic';", "")
.replace("-inkscape-font-specification:'Fira Sans, Bold';", "")
.replace("-inkscape-font-specification:'Fira Sans Bold';", "")
.replace("-inkscape-font-specification:'Fira Sans'", "")
.replace("-inkscape-font-specification:'Fira Sans, Normal'", "")
.replace("-inkscape-font-specification:'Fira Sans Normal'", "")
.replace("-inkscape-font-specification:'Fira Sans Medium'", "")
.replace("-inkscape-font-specification:'Fira Sans Italic'", "")
.replace("-inkscape-font-specification:'Fira Sans Light Italic'", "")
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold'", "")
.replace("-inkscape-font-specification:'Fira Sans Semi-Bold Italic'", "")
.replace("-inkscape-font-specification:'Fira Sans, Bold'", "")
.replace("-inkscape-font-specification:'Fira Sans Bold'", "");
Array.from(obj.attributes).forEach(attr => {
if (attr.namespaceURI == "http://www.inkscape.org/namespaces/inkscape" || attr.namespaceURI == "http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd") {
obj.removeAttribute(attr.nodeName);
}
});
if (obj.tagName == "tspan") return;
if (!!obj.style.fontStyle && obj.style.fontStyle == "normal") obj.style.fontStyle = null;
if (!!obj.style.fontVariant && obj.style.fontVariant == "normal") obj.style.fontVariant = null;
if (!!obj.style.fontWeight && obj.style.fontWeight == "normal") obj.style.fontWeight = null;
if (!!obj.style.fontStretch && obj.style.fontStretch == "normal") obj.style.fontStretch = null;
if (!!obj.style.writingMode && obj.style.writingMode == "horizontal-tb") obj.style.writingMode = null;
if (!!obj.style.textAnchor && obj.style.textAnchor == "start") obj.style.textAnchor = null;
});
stylestrings = {}
document.querySelectorAll("*").forEach(obj => {
if (!obj.style || !obj.style.cssText || (obj.parentElement && obj.parentElement.classList.contains("bficon"))) return;
if (obj.style.cssText in stylestrings) stylestrings[obj.style.cssText].push(obj);
else stylestrings[obj.style.cssText] = [obj];
});
var svg = document.getElementById("svg2");
var style = document.createElementNS(svg.namespaceURI, 'style');
style.setAttribute("type", "text/css");
svg.prepend(style);
i = 0;
csscontent = "";
for (stylestring in stylestrings) {
elems = stylestrings[stylestring];
if (elems.length < 3) continue;
classname = "_gen" + i;
csscontent += "."+classname+" {\n"+stylestring+"\n}\n\n";
elems.forEach(obj => {
obj.classList.add(classname);
obj.removeAttribute("style");
});
i++;
}
var CDATASectionNode = document.createCDATASection(csscontent);
style.appendChild(CDATASectionNode);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment