|
import time |
|
from datetime import datetime |
|
from textwrap import wrap |
|
import requests |
|
from requests_html import HTMLSession |
|
import paramiko |
|
import schedule |
|
|
|
|
|
def get_traffic(road): |
|
lines = [] |
|
# url = f'http://www.trafficengland.com/api/events/getByRoad?road={road}&events=CONGESTION,INCIDENT,ROADWORKS,MAJOR_ORGANISED_EVENTS,ABNORMAL_LOADS&direction=all&includeUnconfirmedRoadworks=true' |
|
url = f'http://www.trafficengland.com/api/events/getByRoad?road={road}&events=CONGESTION,INCIDENT,MAJOR_ORGANISED_EVENTS,ABNORMAL_LOADS&direction=all&includeUnconfirmedRoadworks=true' |
|
r = requests.get(url) |
|
for event in r.json(): |
|
if 0.01 < event['longitude'] < 2.0: |
|
line = f'{event["teEventType"].capitalize()}: ' + ', '.join(event['gdpFormatted']) |
|
lines.append(wrap(line, 50)) |
|
lines.append('\n') |
|
return lines |
|
|
|
|
|
def get_solar(): |
|
session = HTMLSession() |
|
r = session.get(solarurl) |
|
return r.html.find('table')[1].text.split('\n')[1::2] |
|
|
|
|
|
def get_weather(): |
|
r = requests.get('https://api.openweathermap.org/data/2.5/weather?q=&appid=') |
|
json = r.json() |
|
return json |
|
|
|
|
|
def get_weather_forecast(): |
|
r = requests.get('https://api.openweathermap.org/data/2.5/forecast?q=&appid=') |
|
json = r.json() |
|
return json |
|
|
|
|
|
def forcasts(weather): |
|
for forcast in weather: |
|
if 'rain' in forecast: |
|
rain = forecast["rain"].get('3h', '0.0') |
|
else: |
|
rain = '0.0' |
|
a, b, c = forecast["dt_txt"][-11:-3], f'{forecast["main"]["temp"]-273.15:.1f}', forecast["main"]["humidity"], |
|
d, e = f'{forecast["wind"]["speed"]:.1f}/{int(forecast["wind"]["deg"])}', forecast["clouds"]["all"] |
|
yield f'{a} |{b:^6}|{c:^4}|{d:^10}|{rain:^8}|{e:^7}' |
|
|
|
|
|
def kindleprint(client, line, col, row): |
|
command = f'/usr/sbin/eips {col:2} {row:2} "{line.ljust(50)}"' # chars 50 wide, 40 tall |
|
print("Executing {}".format(command)) |
|
stdin, stdout, stderr = client.exec_command(command) |
|
stdout.read(), stderr.read() # seems that you must read or commands don't work |
|
|
|
|
|
def k_to(k): |
|
return f'{k-273.15:.1f}' |
|
|
|
|
|
def dt_to(dt): |
|
return datetime.fromtimestamp(dt).isoformat()[11:16] |
|
|
|
|
|
def update_kindle(): |
|
client = paramiko.SSHClient() |
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
|
client.load_system_host_keys() |
|
client.connect('10.10.10.104', port=22, username='root') |
|
|
|
commands = [ |
|
'/usr/bin/lipc-set-prop -- com.lab126.powerd preventScreenSaver 1', # keeps wifi alive |
|
'/etc/init.d/framework stop', # Stop the main software |
|
# '/usr/sbin/eips -c', # clears the display |
|
#'/usr/sbin/eips 0 0 " Headline"', |
|
] |
|
|
|
for command in commands: |
|
print("Executing {}".format(command)) |
|
stdin, stdout, stderr = client.exec_command(command) |
|
stdout.read(), stderr.read() |
|
|
|
# gather current data |
|
solar = get_solar() |
|
weather = get_weather() |
|
traffic = get_traffic('A1') |
|
cast = forcasts(get_weather_forecast()['list']) |
|
|
|
lines = [ |
|
# '', |
|
f'Updated: {datetime.now().isoformat()[:19]}', |
|
'', |
|
f'Weather:', |
|
f' Temperature:{"Min":^7}|{"Now":^7}|{"Max":^7}', |
|
f' Temperature:{k_to(weather["main"]["temp_min"]):^7}|{k_to(weather["main"]["temp"]):^7}|{k_to(weather["main"]["temp_max"]):^7}', |
|
f' Humidity: {weather["main"]["humidity"]} percent', |
|
f' Pressure: {weather["main"]["pressure"]} hPa', |
|
f' Visibility: {weather["visibility"]}', |
|
f' Wind: {weather["wind"]["speed"]} m/s', # / {json["wind"]["deg"]} deg', |
|
f' Cloud: {weather["clouds"]["all"]} percent', |
|
'', |
|
f'Solar:', |
|
f' Sunrise : {dt_to(weather["sys"]["sunrise"])} | {dt_to(weather["sys"]["sunset"])} : Sunset', |
|
f'{"Now":^8}|{"Today":^11}|{"Week":^11}|{"Total":^11}', |
|
f'{solar[1]:^8}|{solar[2]:^11}|{solar[3]:^11}|{solar[4]:^11}', |
|
f'', |
|
f'{"Time":^9}|{"Temp":^6}|{"Hu":^4}|{"Wind/deg":^10}|{"mm/3hr":^8}|{"Cloud":^7}', |
|
|
|
] |
|
|
|
for i in traffic: |
|
for x in i: |
|
lines.append(x) |
|
|
|
for index, line in enumerate(lines): |
|
kindleprint(client, line, 0, index) |
|
else: # fill unused rows with weather forecast |
|
for line in range(index+1, 40): |
|
kindleprint(client, next(cast), 0, line) |
|
|
|
client.close() |
|
|
|
|
|
if __name__ == '__main__': |
|
try: |
|
update_kindle() |
|
schedule.every(5).minutes.do(update_kindle).tag('kindle') |
|
|
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
except paramiko.ssh_exception.NoValidConnectionsError: |
|
print('No Connection') |
|
except KeyboardInterrupt: |
|
schedule.clear('kindle') |
Holly shit! That's brilliant! And working almost out of box :)