Skip to content

Instantly share code, notes, and snippets.

@lyonzy
Last active August 12, 2018 13:28
Show Gist options
  • Save lyonzy/078ce262705618a864790e15963b6e3e to your computer and use it in GitHub Desktop.
Save lyonzy/078ce262705618a864790e15963b6e3e to your computer and use it in GitHub Desktop.
PTV Station Screen with a Wemos D1 Mini

PTV Station Screen with a Wemos D1 Mini

Intro

This is an implementation of a basic PTV departures screen using the PTV API, a Wemos D1 Mini, MicroPython and an OLED shield.

image text

Getting Started

If you want to use it there's a few things you'll need to do:

  • Connect the D1 mini to your WiFi network (instructions in the documentation)
  • Set your API key and devId (more info here)
  • Set the stopId you're interested in, which you can find by playing around with the API here - exact procedure is left as an exercise for the reader ;)

To-do (maybe one day)

  • Show the last updated date as local time. This is a bit challenging because it requires knowing the timezone, which requires a more involved date library.
import utime, ure
months = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12}
# e.g. Fri, 10 Aug 2018 02:26:24 GMT
current_date_re = ure.compile('^\w+, (\d+) (\w+) (\d+) (\d+):(\d+):(\d+) GMT$')
def parse_current_date(date_str):
dateparts = current_date_re.match(date_str)
return utime.mktime((
int(dateparts.group(3)), # year
months[dateparts.group(2).decode()], # month
int(dateparts.group(1)), # day
int(dateparts.group(4)), # hour
int(dateparts.group(5)), # min
int(dateparts.group(6)), # sec
None,
None
))
# e.g. 2018-08-10T03:54:00Z
dep_date_re = ure.compile('^(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)Z$')
def parse_dep_date(date_str):
dateparts = dep_date_re.match(date_str)
return utime.mktime((
int(dateparts.group(1)), # year
int(dateparts.group(2)), # month
int(dateparts.group(3)), # day
int(dateparts.group(4)), # hour
int(dateparts.group(5)), # min
int(dateparts.group(6)), # sec
None,
None
))
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
This is modified from the original MicroPython HMAC library to
remove references to the copy function (seems that isn't supported).
"""
#from _operator import _compare_digest as compare_digest
import uhashlib as _hashlib
PendingDeprecationWarning = None
RuntimeWarning = None
trans_5C = bytes((x ^ 0x5C) for x in range(256))
trans_36 = bytes((x ^ 0x36) for x in range(256))
def translate(d, t):
return bytes(t[x] for x in d)
# The size of the digests returned by HMAC depends on the underlying
# hashing module used. Use digest_size from the instance of HMAC instead.
digest_size = None
class HMAC:
"""RFC 2104 HMAC class. Also complies with RFC 4231.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
blocksize = 64 # 512-bit HMAC; can be changed in subclasses.
def __init__(self, key, msg = None, digestmod = None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. *OR*
A hashlib constructor returning a new hash object. *OR*
A hash name suitable for hashlib.new().
Defaults to hashlib.md5.
Implicit default to hashlib.md5 is deprecated and will be
removed in Python 3.6.
Note: key and msg must be a bytes or bytearray objects.
"""
if not isinstance(key, (bytes, bytearray)):
raise TypeError("key: expected bytes or bytearray, but got %r" % type(key).__name__)
if digestmod is None:
digestmod = _hashlib.md5
if callable(digestmod):
self.digest_cons = digestmod
elif isinstance(digestmod, str):
self.digest_cons = lambda d=b'': _hashlib.new(digestmod, d)
else:
self.digest_cons = lambda d=b'': digestmod.new(d)
self.outer = self.digest_cons()
self.inner = self.digest_cons()
self.digest_size = 20 # force just for this
if hasattr(self.inner, 'block_size'):
blocksize = self.inner.block_size
if blocksize < 16:
blocksize = self.blocksize
else:
blocksize = self.blocksize
# self.blocksize is the default blocksize. self.block_size is
# effective block size as well as the public API attribute.
self.block_size = blocksize
if len(key) > blocksize:
key = self.digest_cons(key).digest()
key = key + bytes(blocksize - len(key))
self.outer.update(translate(key, trans_5C))
self.inner.update(translate(key, trans_36))
if msg is not None:
self.update(msg)
@property
def name(self):
return "hmac-" + self.inner.name
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def _current(self):
"""Return a hash object for the current state.
To be used only internally with digest() and hexdigest().
"""
h = self.outer
h.update(self.inner.digest())
return h
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self._current()
return h.digest()
def new(key, msg = None, digestmod = None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
import machine, ssd1306, urequests2 as requests, ptv_api, dates, utime
i2c = machine.I2C(-1, machine.Pin(5), machine.Pin(4))
oled = ssd1306.SSD1306_I2C(64, 48, i2c)
def update():
oled.fill(0)
oled.show()
stop = "123" #TODO: change to the stopId you're interested in
(current_date, data) = ptv_api.get("/v3/departures/route_type/0/stop/{stopId}/route/4?direction_id=1&max_results=2".format(stopId=stopId)
y_offset = 0
for departure in data['departures']:
dep_date = dates.parse_dep_date(departure['estimated_departure_utc'])
(_, run_details) = ptv_api.get('/v3/runs/' + str(departure['run_id']))
run = run_details['runs'][0]
dest = 'Flinders' if 'Flinders' in run['destination_name'] else 'CityLoop'
express = run['express_stop_count'] > 0
print(dest + " in " + str(round((dep_date - current_date) / 60)) + ", express=" + str(express))
mins_away = str(round((dep_date - current_date) / 60))
oled.text(dest, 0, y_offset)
dist = mins_away + "min" + ("" if mins_away == "1" else "s")
if mins_away == "0":
dist = "now"
oled.text(dist, 0, y_offset + 8)
if express:
oled.text('E', 55, y_offset + 8)
y_offset += 18
oled.hline(0, y_offset - 2, 999, 0xffff)
(year, month, day, hour, minute, second, _, _) = utime.localtime(current_date)
oled.text("Upd{hour:02}:{minute:02}".format(hour=hour, minute=minute), 0, 40)
oled.show()
tim = machine.Timer(-1)
tim.init(period=60000, mode=machine.Timer.PERIODIC, callback=lambda t:update())
update()
import urequests2 as requests, hmac, ubinascii, dates
from uhashlib import sha1
# TODO: get these from PTV
# https://static.ptv.vic.gov.au/PTV/PTV%20docs/API/1475462320/PTV-Timetable-API-key-and-signature-document.RTF
api_key = b"api_key"
devid = 123
def get(base_url):
raw = base_url + ('&' if ('?' in base_url) else '?') + 'devid={devid}'.format(devid=devid)
hashed = hmac.new(api_key, raw, sha1)
signature = ubinascii.hexlify(hashed.digest()).decode()
response = requests.get('http://timetableapi.ptv.vic.gov.au' + raw + '&signature=' + signature)
data = response.json()
current_date = dates.parse_current_date(response.date)
return (current_date, data)
# This is a modified version of the MicroPython requests
# library which returns the Date header from the response
# if it was present. This is my clever trick for knowing
# the time on a device without a RTC ;)
import usocket
class Response:
def __init__(self, f):
self.raw = f
self.encoding = "utf-8"
self._cached = None
def close(self):
if self.raw:
self.raw.close()
self.raw = None
self._cached = None
@property
def content(self):
if self._cached is None:
try:
self._cached = self.raw.read()
finally:
self.raw.close()
self.raw = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
ai = ai[0]
s = usocket.socket(ai[0], ai[1], ai[2])
try:
s.connect(ai[-1])
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
# Iterate over keys to avoid tuple alloc
for k in headers:
s.write(k)
s.write(b": ")
s.write(headers[k])
s.write(b"\r\n")
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
s.write(b"Content-Type: application/json\r\n")
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
#print(l)
l = l.split(None, 2)
status = int(l[1])
reason = ""
date = ""
if len(l) > 2:
reason = l[2].rstrip()
while True:
l = s.readline()
if not l or l == b"\r\n":
break
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:") and not 200 <= status <= 299:
raise NotImplementedError("Redirects not yet supported")
elif l.startswith(b"Date:"):
date=l[6:-2]
except OSError:
s.close()
raise
resp = Response(s)
resp.status_code = status
resp.reason = reason
resp.date = date
return resp
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment