|
import argparse |
|
import asyncio |
|
import board |
|
import datetime as dt |
|
import digitalio |
|
import json |
|
import logging |
|
import requests |
|
from systemd import journal |
|
|
|
from adafruit_rgb_display.rgb import color565 |
|
import adafruit_rgb_display.st7789 as st7789 |
|
from PIL import Image, ImageDraw, ImageFont |
|
from stats import CovidStats |
|
|
|
#COVID_19_API = "https://covid19api.herokuapp.com/" |
|
# https://github.com/ExpDev07/coronavirus-tracker-api |
|
COVID_19_API = "https://coronavirus-tracker-api.herokuapp.com/v2/" |
|
HEADER = "COVID-19" |
|
|
|
class CovidTracker: |
|
|
|
def __init__(self, refresh_rate=600, width=240, height=240): |
|
self.get_stats_interval_sec = refresh_rate |
|
self.width = width |
|
self.height = height |
|
|
|
self.logger = logging.getLogger(__name__) |
|
self.logger.propagate = False |
|
self.logger.addHandler(journal.JournaldLogHandler()) |
|
self.logger.setLevel(logging.INFO) |
|
|
|
cs_pin = digitalio.DigitalInOut(board.CE0) |
|
dc_pin = digitalio.DigitalInOut(board.D25) |
|
reset_pin = None |
|
BAUDRATE = 64000000 |
|
|
|
self.display = st7789.ST7789( |
|
board.SPI(), |
|
cs=cs_pin, |
|
dc=dc_pin, |
|
rst=reset_pin, |
|
baudrate=BAUDRATE, |
|
width=width, |
|
height=height, |
|
y_offset=90, |
|
rotation=180 |
|
) |
|
|
|
backlight = digitalio.DigitalInOut(board.D22) |
|
backlight.switch_to_output() |
|
backlight.value = True |
|
self.buttonA = digitalio.DigitalInOut(board.D23) |
|
self.buttonB = digitalio.DigitalInOut(board.D24) |
|
self.buttonA.switch_to_input() |
|
self.buttonB.switch_to_input() |
|
self._button_hold_time = 2.0 |
|
|
|
self.image = Image.new("RGB", (width, height)) |
|
self.draw = ImageDraw.Draw(self.image) |
|
self.draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0)) |
|
self.display.image(self.image)#, rotation) |
|
self.font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 24) |
|
|
|
self.stats = CovidStats() |
|
|
|
loop = asyncio.get_event_loop() |
|
loop.create_task(self.update_loop()) |
|
# loop.create_task(self.button_loop()) |
|
|
|
def update_display(self): |
|
if self.stats.new_data: |
|
self.logger.info('New data at ' + self.stats.last_updated.strftime('%d-%b-%Y (%H:%M)')) |
|
self.draw.rectangle((0, 0, self.width, self.height), outline=0, fill=0) |
|
hoffest = (240 - self.font.getsize(HEADER)[0]) / 2 |
|
cases = self.stats.confirmed |
|
deaths = self.stats.deaths |
|
|
|
padding = 10 |
|
x = 0 |
|
y = padding |
|
self.draw.text((hoffest, y), HEADER, font=self.font, fill="#FFFFFF") |
|
y += self.font.getsize(HEADER)[1]+padding |
|
self.draw.text((x, y), cases, font=self.font, fill="#FF0000") |
|
y += self.font.getsize(cases)[1]*3 + padding |
|
self.draw.text((x, y), deaths, font=self.font, fill="#FF0000") |
|
|
|
self.display.image(self.image)#, rotation) |
|
|
|
async def get_stats_from_server(self) -> None: |
|
self.logger.debug("Fetching update") |
|
r = requests.get(COVID_19_API + 'latest') |
|
if r.status_code == 200: |
|
data = json.loads(r.content) |
|
confirmed_world = int(data["latest"]["confirmed"]) |
|
deaths_world = int(data["latest"]["deaths"]) |
|
recovered_world = int(data["latest"]["recovered"]) |
|
|
|
r = requests.get(COVID_19_API + 'locations/223') |
|
if r.status_code == 200: |
|
data = json.loads(r.content) |
|
last_update = dt.datetime.strptime(data["location"]["last_updated"], '%Y-%m-%dT%H:%M:%S.%fZ') |
|
confirmed_uk = int(data["location"]["latest"]["confirmed"]) |
|
confirmed_uk_timeline = data["location"]["timelines"]["confirmed"] |
|
deaths_uk = int(data["location"]["latest"]["deaths"]) |
|
deaths_uk_timeline = data["location"]["timelines"]["deaths"] |
|
recovered_uk = int(data["location"]["latest"]["recovered"]) |
|
|
|
r = requests.get(COVID_19_API + 'locations/200') #?country_code=ZA;timelines=1') |
|
if r.status_code == 200: |
|
data = json.loads(r.content) |
|
confirmed_sa = int(data["location"]["latest"]["confirmed"]) |
|
confirmed_sa_timeline = data["location"]["timelines"]["confirmed"] |
|
deaths_sa = int(data["location"]["latest"]["deaths"]) |
|
deaths_sa_timeline = data["location"]["timelines"]["deaths"] |
|
recovered_sa = int(data["location"]["latest"]["recovered"]) |
|
|
|
self.stats.update(confirmed_world, confirmed_uk, confirmed_sa, deaths_world, deaths_uk, deaths_sa, last_update) |
|
self.update_display() |
|
self.logger.debug("Confirmed: {0} UK: {1} SA {2}".format(confirmed_world, confirmed_uk, confirmed_sa)) |
|
self.logger.debug("Deaths: {0} UK: {1} SA {2}".format(deaths_world, deaths_uk, deaths_sa)) |
|
else: |
|
self.logger.error("Error refreshing data") |
|
|
|
async def update_loop(self) -> None: |
|
while True: |
|
await self.get_stats_from_server() |
|
await asyncio.sleep(self.get_stats_interval_sec) |
|
|
|
async def button_loop(self) -> None: |
|
last_a = True |
|
last_b = True |
|
|
|
while True: |
|
if last_a and not self.buttonA.value: |
|
self._t_a_pressed = time.time() |
|
self._hold_a_fired = False |
|
self._t_a_repeat = time.time() |
|
Thread(target=self._button_press_handler).start() |
|
|
|
if not last_a and self.buttonA.value: |
|
Thread(target=self._button_release_handler, args=(self._hold_a_fired,)).start() |
|
|
|
if not self.buttonA.value: |
|
if not self._hold_a_fired and (time.time() - self._t_a_pressed) > self._button_hold_time: |
|
Thread(target=self._button_hold_handler).start() |
|
self._hold_a_fired = True |
|
|
|
last_a = self.buttonA.value |
|
|
|
await asyncio.sleep(0.05) |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--update', type=float, default=600, help='Rate at which to update the data') |
|
args = parser.parse_args() |
|
|
|
try: |
|
ct = CovidTracker(refresh_rate=args.update) |
|
loop = asyncio.get_event_loop() |
|
loop.run_forever() |
|
except KeyboardInterrupt: |
|
pass |