-
-
Save karlkec/7052c0db8eb98ea1bc597e49efcf74d2 to your computer and use it in GitHub Desktop.
Weatherflow Pi Console mod to display time of max wind gust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Returns the derived weather variables required by the Raspberry Pi Python | |
console for WeatherFlow Tempest and Smart Home Weather stations. | |
Copyright (C) 2018-2023 Peter Davis | |
This program is free software: you can redistribute it and/or modify it under | |
the terms of the GNU General Public License as published by the Free Software | |
Foundation, either version 3 of the License, or (at your option) any later | |
version. | |
This program is distributed in the hope that it will be useful, but WITHOUT | |
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License along with | |
this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
# Import required library modules | |
from lib.request_api import weatherflow_api | |
from lib.system import system | |
from lib import derivedVariables as derive | |
# Import required Python modules | |
from kivy.logger import Logger | |
from datetime import datetime | |
import bisect | |
import ephem | |
import math | |
import pytz | |
import time | |
def dewPoint(outTemp, humidity): | |
""" Calculate the dew point from the temperature and relative humidity | |
INPUTS: | |
outTemp Temperature from AIR module [C] | |
humidity Relative humidity from AIR module [%] | |
OUTPUT: | |
DewPoint Dew point [C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'c'] | |
if outTemp[0] is None: | |
Logger.warning(f'dewPoint: {system().log_time()} - outTemp is None') | |
return errorOutput | |
elif humidity[0] is None: | |
Logger.warning(f'dewPoint: {system().log_time()} - humidity is None') | |
return errorOutput | |
# Calculate dew point | |
if humidity[0] > 0: | |
A = 17.625 | |
B = 243.04 | |
N = B * (math.log(humidity[0] / 100.0) + (A * outTemp[0]) / (B + outTemp[0])) | |
D = A - math.log(humidity[0] / 100.0) - (A * outTemp[0]) / (B + outTemp[0]) | |
dewPoint = N / D | |
else: | |
dewPoint = None | |
# Return Dew Point | |
return [dewPoint, 'c'] | |
def feelsLike(outTemp, humidity, windSpd, config): | |
""" Calculate the Feels Like temperature from the temperature, relative | |
humidity, and wind speed | |
INPUTS: | |
Temp Temperature from AIR module [C] | |
Humidity Relative humidity from AIR module [%] | |
windSpd Wind speed from SKY module [m/s] | |
config Station configuration | |
OUTPUT: | |
FeelsLike Feels Like temperature [C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'c', '-', '-'] | |
if outTemp[0] is None: | |
Logger.warning(f'feelsLike: {system().log_time()} - outTemp is None') | |
return errorOutput | |
elif humidity[0] is None: | |
Logger.warning(f'feelsLike: {system().log_time()} - humidity is None') | |
return errorOutput | |
elif windSpd[0] is None: | |
Logger.warning(f'feelsLike: {system().log_time()} - windSpd is None') | |
return errorOutput | |
# Convert observation units as required | |
TempF = [outTemp[0] * (9 / 5) + 32, 'f'] | |
WindMPH = [windSpd[0] * 2.2369362920544, 'mph'] | |
WindKPH = [windSpd[0] * 3.6, 'kph'] | |
# If temperature is less than 10 degrees celcius and wind speed is higher | |
# than 3 mph, calculate wind chill using the Joint Action Group for | |
# Temperature Indices formula | |
if outTemp[0] <= 10 and WindMPH[0] > 3: | |
WindChill = (+ 13.12 + 0.6215 * outTemp[0] | |
- 11.37 * (WindKPH[0])**0.16 + 0.3965 * outTemp[0] | |
* (WindKPH[0])**0.16) | |
FeelsLike = [WindChill, 'c'] | |
# If temperature is at or above 80 degress farenheit (26.67 C), and humidity | |
# is at or above 40%, calculate the Heat Index | |
elif TempF[0] >= 80 and humidity[0] >= 40: | |
HeatIndex = (-42.379 + (2.04901523 * TempF[0]) | |
+ (10.1433127 * humidity[0]) | |
- (0.22475541 * TempF[0] * humidity[0]) | |
- (6.83783e-3 * TempF[0]**2) | |
- (5.481717e-2 * humidity[0]**2) | |
+ (1.22874e-3 * TempF[0]**2 * humidity[0]) | |
+ (8.5282e-4 * TempF[0] * humidity[0]**2) | |
- (1.99e-6 * TempF[0]**2 * humidity[0]**2)) | |
FeelsLike = [(HeatIndex - 32) * (5 / 9), 'c'] | |
# Else set Feels Like temperature to observed temperature | |
else: | |
FeelsLike = outTemp | |
# Define 'FeelsLike' temperature cutoffs | |
Cutoffs = [float(item) for item in list(config['FeelsLike'].values())] | |
# Define 'FeelsLike temperature text and icon | |
Description = ['Feeling extremely cold', 'Feeling freezing cold', 'Feeling very cold', | |
'Feeling cold', 'Feeling mild', 'Feeling warm', 'Feeling hot', | |
'Feeling very hot', 'Feeling extremely hot', '-'] | |
Icon = ['ExtremelyCold', 'FreezingCold', 'VeryCold', 'Cold', 'Mild', 'Warm', | |
'Hot', 'VeryHot', 'ExtremelyHot', '-'] | |
if config['Units']['Temp'] == 'f': | |
Ind = bisect.bisect(Cutoffs, FeelsLike[0] * (9 / 5) + 32) | |
else: | |
Ind = bisect.bisect(Cutoffs, FeelsLike[0]) | |
# Return 'Feels Like' temperature | |
return [FeelsLike[0], FeelsLike[1], Description[Ind], Icon[Ind]] | |
def SLP(pressure, device, config): | |
""" Calculate the sea level pressure from the station pressure | |
INPUTS: | |
pressure Station pressure from AIR/TEMPEST module [mb] | |
device Device ID that observation originates from | |
config Station configuration | |
OUTPUT: | |
SLP Sea level pressure [mb] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mb', None] | |
if pressure[0] is None: | |
Logger.warning(f'SLP: {system().log_time()} - pressure is None') | |
return errorOutput | |
# Extract required configuration variables | |
elevation = config['Station']['Elevation'] | |
if str(device) == config['Station']['OutAirID']: | |
height = config['Station']['OutAirHeight'] | |
elif str(device) == config['Station']['TempestID']: | |
height = config['Station']['TempestHeight'] | |
# Define required constants | |
P0 = 1013.25 | |
Rd = 287.05 | |
GammaS = 0.0065 | |
g = 9.80665 | |
T0 = 288.15 | |
elevation = float(elevation) + float(height) | |
# Calculate and return sea level pressure | |
SLP = (pressure[0] | |
* (1 + ((P0 / pressure[0])**((Rd * GammaS) / g)) | |
* ((GammaS * elevation) / T0))**(g / (Rd * GammaS)) | |
) | |
return [SLP, 'mb', SLP] | |
def SLPTrend(pressure, obTime, device, apiData, config): | |
""" Calculate the pressure trend from the sea level pressure over the last | |
three hours | |
INPUTS: | |
pressure Current station pressure [mb] | |
obTime Time of latest observation [s] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
Trend Sea level pressure trend [mb] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mb/hr', '-', '-'] | |
if pressure[0] is None: | |
Logger.warning(f'SLPTrend: {system().log_time()} - pressure is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'SLPTrend: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define index of pressure in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 1 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 6 | |
# Extract required observations from WeatherFlow API data based on device | |
# type indicated in API call | |
if '24Hrs' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'): | |
data24hrs = apiData[device]['24Hrs'].json()['obs'] | |
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None] | |
apiPres = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None] | |
try: | |
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime] | |
if min(dTime) < 5 * 60: | |
pres3h = [apiPres[dTime.index(min(dTime))], 'mb'] | |
time3h = [apiTime[dTime.index(min(dTime))], 's'] | |
pres0h = pressure | |
time0h = obTime | |
else: | |
Logger.warning(f'SLPTrend: {system().log_time()} - no data in 3 hour window') | |
return errorOutput | |
except Exception as Error: | |
Logger.warning(f'SLPTrend: {system().log_time()} - {Error}') | |
return errorOutput | |
else: | |
return errorOutput | |
# Convert station pressure into sea level pressure | |
pres3h = SLP(pres3h, device, config) | |
pres0h = SLP(pres0h, device, config) | |
# Calculate three hour temperature trend | |
try: | |
Trend = (pres0h[0] - pres3h[0]) / ((time0h[0] - time3h[0]) / 3600) | |
except Exception as Error: | |
Logger.warning(f'SLPTrend: {system().log_time()} - {Error}') | |
return errorOutput | |
# Define pressure trend text | |
if Trend > 2 / 3: | |
TrendTxt = '[color=ff8837ff]Rising rapidly[/color]' | |
elif Trend >= 1 / 3: | |
TrendTxt = '[color=ff8837ff]Rising[/color]' | |
elif Trend <= -2 / 3: | |
TrendTxt = '[color=00a4b4ff]Falling rapidly[/color]' | |
elif Trend <= -1 / 3: | |
TrendTxt = '[color=00a4b4ff]Falling[/color]' | |
else: | |
TrendTxt = '[color=9aba2fff]Steady[/color]' | |
# Define weather tendency based on pressure and trend | |
if pres0h[0] >= 1023: | |
if 'Falling rapidly' in TrendTxt: | |
Tendency = 'Becoming cloudy and warmer' | |
else: | |
Tendency = 'Fair conditions likely' | |
elif 1009 < pres0h[0] < 1023: | |
if 'Falling rapidly' in TrendTxt: | |
Tendency = 'Rainy conditions likely' | |
else: | |
Tendency = 'Conditions unchanged' | |
elif pres0h[0] <= 1009: | |
if 'Falling rapidly' in TrendTxt: | |
Tendency = 'Stormy conditions likely' | |
elif 'Falling' in TrendTxt: | |
Tendency = 'Rainy conditions likely' | |
else: | |
Tendency = 'Becoming clearer and cooler' | |
else: | |
Tendency = '-' | |
# Return pressure trend | |
return [Trend, 'mb/hr', TrendTxt, Tendency] | |
def SLPMax(pressure, obTime, maxPres, device, apiData, config): | |
""" Calculate maximum pressure since midnight station time | |
INPUTS: | |
Time Current observation time [s] | |
Temp Current pressure [mb] | |
maxPres Current maximum pressure [mb] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
maxPres Maximum pressure [mb] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mb', '-', None, time.time()] | |
if pressure[0] is None: | |
Logger.warning(f'SLPMax: {system().log_time()} - pressure is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'SLPMax: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Calculate sea level pressure | |
SLP = derive.SLP(pressure, device, config) | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of temperature in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 1 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 6 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily maximum and minimum pressure | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
obTime = [item[0] for item in dataToday if item[index_bucket_a] is not None] | |
pressure = [[item[index_bucket_a], 'mb'] for item in dataToday if item[index_bucket_a] is not None] | |
SLP = [derive.SLP(P, device, config) for P in pressure] | |
try: | |
maxPres = [max(SLP)[0], 'mb', obTime[SLP.index(max(SLP))], 's', max(SLP)[0], obTime[SLP.index(max(SLP))]] | |
except Exception as Error: | |
Logger.warning(f'SLPMax: {system().log_time()} - {Error}') | |
maxPres = errorOutput | |
else: | |
maxPres = errorOutput | |
# Else if midnight has passed, reset maximum pressure | |
elif Now.date() > datetime.fromtimestamp(maxPres[5], Tz).date(): | |
maxPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]] | |
# Else if current pressure is greater than maximum recorded pressure, update | |
# maximum pressure | |
elif SLP[0] > maxPres[4]: | |
maxPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]] | |
# Else maximum pressure unchanged, return existing values | |
else: | |
maxPres = [maxPres[4], 'mb', maxPres[2], 's', maxPres[4], obTime[0]] | |
# Return required variables | |
return maxPres | |
def SLPMin(pressure, obTime, minPres, device, apiData, config): | |
""" Calculate minimum pressure since midnight station time | |
INPUTS: | |
pressure Current pressure [mb] | |
obTime Current observation time [s] | |
minPres Current minimum pressure [mb] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
minPres Minumum pressure [mb] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mb', '-', None, time.time()] | |
if pressure[0] is None: | |
Logger.warning(f'SLPMin: {system().log_time()} - pressure is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'SLPMin: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Calculate sea level pressure | |
SLP = derive.SLP(pressure, device, config) | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of temperature in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 1 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 6 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily maximum and minimum pressure | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
obTime = [item[0] for item in dataToday if item[index_bucket_a] is not None] | |
pressure = [[item[index_bucket_a], 'mb'] for item in dataToday if item[index_bucket_a] is not None] | |
SLP = [derive.SLP(P, device, config) for P in pressure] | |
try: | |
minPres = [min(SLP)[0], 'mb', obTime[SLP.index(min(SLP))], 's', min(SLP)[0], obTime[SLP.index(min(SLP))]] | |
except Exception as Error: | |
Logger.warning(f'SLPMin: {system().log_time()} - {Error}') | |
minPres = errorOutput | |
else: | |
minPres = errorOutput | |
# Else if midnight has passed, reset maximum and minimum pressure | |
elif Now.date() > datetime.fromtimestamp(minPres[5], Tz).date(): | |
minPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]] | |
# Else if current pressure is less than minimum recorded pressure, update | |
# minimum pressure and time | |
elif SLP[0] < minPres[4]: | |
minPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]] | |
# Else minimum pressure unchanged, return existing values | |
else: | |
minPres = [minPres[4], 'mb', minPres[2], 's', minPres[4], obTime[0]] | |
# Return required variables | |
return minPres | |
def tempDiff(outTemp, obTime, device, apiData, config): | |
""" Calculate 24 hour temperature difference | |
INPUTS: | |
outTemp Current temperature [deg C] | |
obTime Current observation time [s] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
dTemp 24 hour temperature difference [deg C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'dc', '-'] | |
if outTemp[0] is None: | |
Logger.warning(f'tempDiff: {system().log_time()} - outTemp is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'tempDiff: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define index of temperature in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 2 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 7 | |
# Extract required observations from WeatherFlow API data based on device | |
# type indicated in API call | |
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'): | |
data24hrs = apiData[device]['24Hrs'].json()['obs'] | |
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None] | |
apiTemp = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None] | |
try: | |
dTime = obTime[0] - apiTime[0] | |
if dTime > 86400 - (5 * 60) and dTime < 86400 + (5 * 60): | |
temp24h = apiTemp[0] | |
temp0h = outTemp[0] | |
else: | |
Logger.warning(f'tempDiff: {system().log_time()} - no data in 24 hour window') | |
return errorOutput | |
except Exception as Error: | |
Logger.warning(f'tempDiff: {system().log_time()} - {Error}') | |
return errorOutput | |
else: | |
return errorOutput | |
# Calculate 24 hour temperature Difference | |
try: | |
dTemp = temp0h - temp24h | |
except Exception as Error: | |
Logger.warning(f'tempDiff: {system().log_time()} - {Error}') | |
return errorOutput | |
# Define temperature difference text | |
if abs(dTemp) < 0.05: | |
diffTxt = '[color=c8c8c8ff][/color]' | |
elif dTemp > 0: | |
diffTxt = '[color=f05e40ff] warmer[/color]' | |
elif dTemp < 0: | |
diffTxt = '[color=00a4b4ff] colder[/color]' | |
# Return 24 hour temperature difference | |
return [dTemp, 'dc', diffTxt] | |
def tempTrend(outTemp, obTime, device, apiData, config): | |
""" Calculate 3 hour temperature trend | |
INPUTS: | |
outTemp Current temperature [deg C] | |
obTime Current observation time [s] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
Trend 24 hour temperature difference [deg C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'c/hr', 'c8c8c8ff'] | |
if outTemp[0] is None: | |
Logger.warning(f'tempTrend: {system().log_time()} - outTemp is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'tempTrend: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define index of temperature in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 2 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 7 | |
# Extract required observations from WeatherFlow API data based on device | |
# type indicated in API call | |
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'): | |
data24hrs = apiData[device]['24Hrs'].json()['obs'] | |
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None] | |
apiTemp = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None] | |
try: | |
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime] | |
if min(dTime) < 5 * 60: | |
temp3h = apiTemp[dTime.index(min(dTime))] | |
time3h = apiTime[dTime.index(min(dTime))] | |
temp0h = outTemp[0] | |
time0h = obTime[0] | |
else: | |
Logger.warning(f'tempTrend: {system().log_time()} - no data in 3 hour window') | |
return errorOutput | |
except Exception as Error: | |
Logger.warning(f'tempTrend: {system().log_time()} - {Error}') | |
return errorOutput | |
else: | |
return errorOutput | |
# Calculate three hour temperature trend | |
try: | |
Trend = (temp0h - temp3h) / ((time0h - time3h) / 3600) | |
except Exception as Error: | |
Logger.warning(f'tempTrend: {system().log_time()} - {Error}') | |
return errorOutput | |
# Define temperature trend color | |
if abs(Trend) < 0.05: | |
Color = 'c8c8c8ff' | |
elif Trend > 0: | |
Color = 'f05e40ff' | |
elif Trend < 1 / 3: | |
Color = '00a4b4ff' | |
# Return temperature trend | |
return [Trend, 'c/hr', Color] | |
def tempMax(Temp, obTime, maxTemp, device, apiData, config): | |
""" Calculate maximum temperature for specified device since midnight | |
station time | |
INPUTS: | |
Temp Current temperature [deg C] | |
obTime Current observation time [s] | |
maxTemp Current maximum temperature [deg C] | |
device Device ID that observation originates from | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
maxTemp Maximum temperature [deg C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'c', '-', None, time.time()] | |
if Temp[0] is None: | |
Logger.warning(f'tempMax: {system().log_time()} - Temp is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'tempMax: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of temperature in websocket packets | |
if (str(device) == config['Station']['OutAirID'] | |
or str(device) == config['Station']['InAirID']): | |
index_bucket_a = 2 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 7 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily maximum temperature | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
apiTime = [item[0] for item in dataToday if item[index_bucket_a] is not None] | |
apiTemp = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None] | |
try: | |
maxTemp = [max(apiTemp), 'c', apiTime[apiTemp.index(max(apiTemp))], 's', max(apiTemp), apiTime[apiTemp.index(max(apiTemp))]] | |
except Exception as Error: | |
Logger.warning(f'tempMax: {system().log_time()} - {Error}') | |
maxTemp = errorOutput | |
else: | |
maxTemp = errorOutput | |
# Else if midnight has passed, reset maximum temperature | |
elif Now.date() > datetime.fromtimestamp(maxTemp[5], Tz).date(): | |
maxTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]] | |
# Else if current temperature is greater than maximum recorded temperature, | |
# update maximum temperature | |
elif Temp[0] > maxTemp[4]: | |
maxTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]] | |
# Else maximum temperature unchanged, return existing values | |
else: | |
maxTemp = [maxTemp[4], 'c', maxTemp[2], 's', maxTemp[4], obTime[0]] | |
# Return required variables | |
return maxTemp | |
def tempMin(Temp, obTime, minTemp, device, apiData, config): | |
""" Calculate minimum temperature for specified device since midnight | |
station time | |
INPUTS: | |
Temp Current temperature [deg C] | |
obTime Current observation time [s] | |
minTemp Current minimum temperature [deg C] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
minTemp Minumum temperature [deg C] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'c', '-', None, time.time()] | |
if Temp[0] is None: | |
Logger.warning(f'tempMin: {system().log_time()} - Temp is None') | |
return errorOutput | |
elif obTime[0] is None: | |
Logger.warning(f'tempMin: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of temperature in websocket packets | |
if (str(device) == config['Station']['OutAirID'] | |
or str(device) == config['Station']['InAirID']): | |
index_bucket_a = 2 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 7 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily minimum temperature | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
apiTime = [item[0] for item in dataToday if item[index_bucket_a] is not None] | |
apiTemp = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None] | |
try: | |
minTemp = [min(apiTemp), 'c', apiTime[apiTemp.index(min(apiTemp))], 's', min(apiTemp), apiTime[apiTemp.index(min(apiTemp))]] | |
except Exception as Error: | |
Logger.warning(f'tempMin: {system().log_time()} - {Error}') | |
minTemp = errorOutput | |
else: | |
minTemp = errorOutput | |
# Else if midnight has passed, reset minimum temperature | |
elif Now.date() > datetime.fromtimestamp(minTemp[5], Tz).date(): | |
minTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]] | |
# Else if current temperature is less than minimum recorded temperature, | |
# update minimum temperature | |
elif Temp[0] < minTemp[4]: | |
minTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]] | |
# Else minimum temperature unchanged, return existing values | |
else: | |
minTemp = [minTemp[4], 'c', minTemp[2], 's', minTemp[4], obTime[0]] | |
# Return required variables | |
return minTemp | |
def strikeDeltaT(strikeTime): | |
""" Calculate time since last lightning strike | |
INPUTS: | |
strikeTime Time of last lightning strike [s] | |
OUTPUT: | |
strikeDeltaT Time since last lightning strike [s] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 's', None] | |
if strikeTime[0] is None: | |
Logger.warning(f'strikeDeltaT: {system().log_time()} - strikeTime is None') | |
return errorOutput | |
# Calculate time since last lightning strike | |
deltaT = time.time() - strikeTime[0] | |
deltaT = [deltaT, 's', deltaT] | |
# Return time since and distance to last lightning strike | |
return deltaT | |
def strikeFrequency(obTime, device, apiData, config): | |
""" Calculate lightning strike frequency over the previous 10 minutes and | |
three hours | |
INPUTS: | |
obTime Time of latest observation | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
strikeFrequency Strike frequency over the previous 10 [Count] | |
minutes and three hours | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, '/min', None, '/min'] | |
if obTime[0] is None: | |
Logger.warning(f'strikeFreq: {system().log_time()} - obTime is None') | |
return errorOutput | |
# Define index of total lightning strike counts in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 4 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 15 | |
# Extract lightning strike count over the last three hours. Return None for | |
# strikeFrequency if API call has failed | |
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'): | |
data24hrs = apiData[device]['24Hrs'].json()['obs'] | |
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None] | |
try: | |
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime] | |
if min(dTime) < 5 * 60: | |
count3h = [ob[index_bucket_a] for ob in data24hrs[dTime.index(min(dTime)):] if ob[index_bucket_a] is not None] | |
else: | |
Logger.warning(f'strikeFreq: {system().log_time()} - no data in 3 hour window') | |
count3h = None | |
except Exception as Error: | |
Logger.warning(f'strikeFreq: {system().log_time()} - {Error}') | |
count3h = None | |
else: | |
count3h = None | |
# Calculate average strike frequency over the last three hours | |
if count3h is not None: | |
activeStrikes = [count for count in count3h if count > 0] | |
if len(activeStrikes) > 0: | |
strikeFrequency3h = [sum(activeStrikes) / len(activeStrikes), '/min'] | |
else: | |
strikeFrequency3h = [0.0, '/min'] | |
else: | |
strikeFrequency3h = [None, '/min'] | |
# Extract lightning strike count over the last 10 minutes. Return None for | |
# strikeFrequency if API call has failed | |
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'): | |
data24hrs = apiData[device]['24Hrs'].json()['obs'] | |
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None] | |
try: | |
dTime = [abs(T - (obTime[0] - 600)) for T in apiTime] | |
if min(dTime) < 2 * 60: | |
count10m = [ob[index_bucket_a] for ob in data24hrs[dTime.index(min(dTime)):] if ob[index_bucket_a] is not None] | |
else: | |
Logger.warning(f'strikeFreq: {system().log_time()} - no data in 10 minute window') | |
count10m = None | |
except Exception as Error: | |
Logger.warning(f'strikeFreq: {system().log_time()} - {Error}') | |
count10m = None | |
else: | |
count10m = None | |
# Calculate average strike frequency over the last 10 minutes | |
if count10m is not None: | |
activeStrikes = [count for count in count10m if count > 0] | |
if len(activeStrikes) > 0: | |
strikeFrequency10m = [sum(activeStrikes) / len(activeStrikes), '/min'] | |
else: | |
strikeFrequency10m = [0.0, '/min'] | |
else: | |
strikeFrequency10m = [None, '/min'] | |
# Return strikeFrequency for last 10 minutes and last three hours | |
return strikeFrequency10m + strikeFrequency3h | |
def strikeCount(count, strikeCount, device, apiData, config): | |
""" Calculate the number of lightning strikes for the last day/month/year | |
INPUTS: | |
count Number of lightning strikes in the past minute [Count] | |
strikeCount Dictionary containing fields: | |
Today Number of lightning strikes today [Count] | |
Yesterday Number of lightning strikes in last month [Count] | |
Year Number of lightning strikes in last year [Count] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
strikeCount Dictionary containing fields: | |
Today Number of lightning strikes today [Count] | |
Yesterday Number of lightning strikes in last month [Count] | |
Year Number of lightning strikes in last year [Count] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'count', None, time.time()] | |
if count[0] is None: | |
Logger.warning(f'strikeCount: {system().log_time()} - count is None') | |
todayStrikes = monthStrikes = yearStrikes = errorOutput | |
return {'today': todayStrikes, 'month': monthStrikes, 'year': yearStrikes} | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of total lightning strike counts in websocket packets | |
if str(device) == config['Station']['OutAirID']: | |
index_bucket_a = 4 | |
index_bucket_e = 4 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 15 | |
index_bucket_e = 24 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate total daily lightning strikes | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
apiStrikes = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None] | |
try: | |
todayStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()] | |
except Exception as Error: | |
Logger.warning(f'strikeCount: {system().log_time()} - {Error}') | |
todayStrikes = errorOutput | |
else: | |
todayStrikes = errorOutput | |
# Else if midnight has passed, reset daily lightning strike count to zero | |
elif Now.date() > datetime.fromtimestamp(strikeCount['today'][3], Tz).date(): | |
todayStrikes = [count[0], 'count', count[0], time.time()] | |
# Else, calculate current daily lightning strike count | |
else: | |
currentCount = strikeCount['today'][2] | |
updatedCount = currentCount + count[0] if count[0] is not None else currentCount | |
todayStrikes = [updatedCount, 'count', updatedCount, time.time()] | |
# If console is initialising and today is the first day on the month, set | |
# monthly lightning strikes to current daily lightning strikes | |
if strikeCount['month'][0] is None and Now.day == 1: | |
monthStrikes = [todayStrikes[0], 'count', todayStrikes[0], time.time()] | |
# Else if console is initialising, calculate total monthly lightning strikes | |
# from the WeatherFlow API data | |
elif 'month' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['month'], 'obs'): | |
dataMonth = apiData[device]['month'].json()['obs'] | |
apiStrikes = [item[index_bucket_e] for item in dataMonth if item[index_bucket_e] is not None] | |
try: | |
monthStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()] | |
if todayStrikes[0] is not None: | |
monthStrikes[0] += todayStrikes[0] | |
monthStrikes[2] += todayStrikes[2] | |
except Exception as Error: | |
Logger.warning(f'strikeCount: {system().log_time()} - {Error}') | |
monthStrikes = errorOutput | |
else: | |
monthStrikes = errorOutput | |
# Else if the end of the month has passed, reset monthly lightning strike | |
# count to zero | |
elif Now.month > datetime.fromtimestamp(strikeCount['month'][3], Tz).month: | |
monthStrikes = [count[0], 'count', count[0], time.time()] | |
# Else, calculate current monthly lightning strike count | |
else: | |
currentCount = strikeCount['month'][2] | |
updatedCount = currentCount + count[0] if count[0] is not None else currentCount | |
monthStrikes = [updatedCount, 'count', updatedCount, time.time()] | |
# If console is initialising and today is the first day on the year, set | |
# yearly lightning strikes to current daily lightning strikes | |
if strikeCount['year'][0] is None and Now.timetuple().tm_yday == 1: | |
yearStrikes = [todayStrikes[0], 'count', todayStrikes[0], time.time()] | |
# Else if console is initialising, calculate total yearly lightning strikes | |
# from the WeatherFlow API data | |
elif 'year' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['year'], 'obs'): | |
dataYear = apiData[device]['year'].json()['obs'] | |
apiStrikes = [item[index_bucket_e] for item in dataYear if item[index_bucket_e] is not None] | |
try: | |
yearStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()] | |
if todayStrikes[0] is not None: | |
yearStrikes[0] += todayStrikes[0] | |
yearStrikes[2] += todayStrikes[2] | |
except Exception as Error: | |
Logger.warning(f'strikeCount: {system().log_time()} - {Error}') | |
yearStrikes = errorOutput | |
else: | |
yearStrikes = errorOutput | |
# Else if the end of the year has passed, reset monthly and yearly lightning | |
# strike count to zero | |
elif Now.year > datetime.fromtimestamp(strikeCount['year'][3], Tz).year: | |
monthStrikes = [count[0], 'count', count[0], time.time()] | |
yearStrikes = [count[0], 'count', count[0], time.time()] | |
# Else, calculate current yearly lightning strike count | |
else: | |
currentCount = strikeCount['year'][2] | |
updatedCount = currentCount + count[0] if count[0] is not None else currentCount | |
yearStrikes = [updatedCount, 'count', updatedCount, time.time()] | |
# Return Daily, Monthly, and Yearly lightning strike counts | |
return {'today': todayStrikes, 'month': monthStrikes, 'year': yearStrikes} | |
def rainRate(minuteRain): | |
""" Calculate the average windspeed since midnight station time | |
INPUTS: | |
windSpd Rain accumulation for the current minute [mm] | |
OUTPUT: | |
rainRate Current instantaneous rain rate [mm/hr] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mm/hr', '-', None] | |
if minuteRain[0] is None: | |
Logger.warning(f'rainRate: {system().log_time()} - minuteRain is None') | |
return errorOutput | |
# Calculate instantaneous rain rate from instantaneous rain accumulation | |
Rate = minuteRain[0] * 60 | |
# Define rain rate text based on calculated | |
if Rate == 0: | |
RateText = 'Currently Dry' | |
elif Rate < 0.25: | |
RateText = 'Very Light Rain' | |
elif Rate < 1.0: | |
RateText = 'Light Rain' | |
elif Rate < 4.0: | |
RateText = 'Moderate Rain' | |
elif Rate < 16.0: | |
RateText = 'Heavy Rain' | |
elif Rate < 50.0: | |
RateText = 'Very Heavy Rain' | |
else: | |
RateText = 'Extreme Rain' | |
# Return instantaneous rain rate and text | |
return [Rate, 'mm/hr', RateText, Rate] | |
def rainAccumulation(dailyRain, rainAccum, device, apiData, config): | |
""" Calculate the rain accumulation for today/yesterday/month/year | |
INPUTS: | |
dailyRain Daily rain accumulation [mm] | |
rainAccum Dictionary containing fields: | |
Today Rain accumulation for the current day [mm] | |
Yesterday Rain accumulation yesterday [mm] | |
Month Rain accumulation for the current month [mm] | |
Year Rain accumulation for the current year [mm] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
rainAccum Dictionary containing fields: | |
Today Rain accumulation for the current day [mm] | |
Yesterday Rain accumulation yesterday [mm] | |
Month Rain accumulation for the current month [mm] | |
Year Rain accumulation for the current year [mm] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mm', None, time.time()] | |
if dailyRain[0] is None: | |
Logger.warning(f'rainAccum: {system().log_time()} - dailyRain is None') | |
todayRain = yesterdayRain = monthRain = yearRain = errorOutput | |
return {'today': todayRain, 'yesterday': yesterdayRain, 'month': monthRain, 'year': yearRain} | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of total daily rain accumulation in websocket packets | |
if str(device) == config['Station']['SkyID']: | |
index_bucket_a = 3 | |
index_bucket_e = 3 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 12 | |
index_bucket_e = 28 | |
# Set current daily rainfall accumulation | |
todayRain = [dailyRain[0], 'mm', dailyRain[0], time.time()] | |
# If console is initialising, calculate yesterday's rainfall from the | |
# WeatherFlow API data | |
if 'yesterday' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['yesterday'], 'obs'): | |
yesterdayData = apiData[device]['yesterday'].json()['obs'] | |
rainData = [item[index_bucket_a] for item in yesterdayData if item[index_bucket_a] is not None] | |
try: | |
yesterdayRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()] | |
except Exception as Error: | |
Logger.warning(f'rainAccum: {system().log_time()} - {Error}') | |
yesterdayRain = errorOutput | |
else: | |
yesterdayRain = errorOutput | |
# Else if midnight has passed, set yesterday rainfall accumulation equal to | |
# rainAccum['today'] (which still contains yesterday's accumulation) | |
elif Now.date() > datetime.fromtimestamp(rainAccum['today'][3], Tz).date(): | |
yesterdayRain = [rainAccum['today'][2], 'mm', rainAccum['today'][2], time.time()] | |
# Else, set yesterday rainfall accumulation as unchanged | |
else: | |
yesterdayRain = [rainAccum['yesterday'][2], 'mm', rainAccum['yesterday'][2], time.time()] | |
# If console is initialising and today is the first day on the month, set | |
# monthly rainfall to current daily rainfall | |
if rainAccum['month'][0] is None and Now.day == 1: | |
monthRain = [dailyRain[0], 'mm', 0, time.time()] | |
# Else if console is initialising, calculate total monthly rainfall from | |
# the WeatherFlow API data | |
elif 'month' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['month'], 'obs'): | |
monthData = apiData[device]['month'].json()['obs'] | |
rainData = [item[index_bucket_e] for item in monthData if item[index_bucket_e] is not None] | |
try: | |
monthRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()] | |
if not math.isnan(dailyRain[0]): | |
monthRain[0] += dailyRain[0] | |
except Exception as Error: | |
Logger.warning(f'rainAccum: {system().log_time()} - {Error}') | |
monthRain = errorOutput | |
else: | |
monthRain = errorOutput | |
# Else if the end of the month has passed, reset monthly rain accumulation | |
# to current daily rain accumulation | |
elif Now.month > datetime.fromtimestamp(rainAccum['month'][3], Tz).month: | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
monthRain = [dailyAccum, 'mm', 0, time.time()] | |
# Else if midnight has passed, permanently add rainAccum['Today'] (which | |
# still contains yesterday's accumulation) and current daily rainfall to | |
# monthly rain accumulation | |
elif Now.date() > datetime.fromtimestamp(rainAccum['month'][3], Tz).date(): | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
monthRain = [rainAccum['month'][2] + rainAccum['today'][2] + dailyAccum, 'mm', rainAccum['month'][2] + rainAccum['today'][2], time.time()] | |
# Else, update current monthly rainfall accumulation | |
else: | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
monthRain = [rainAccum['month'][2] + dailyAccum, 'mm', rainAccum['month'][2], time.time()] | |
# If console is initialising and today is the first day on the year, set | |
# yearly rainfall to current daily rainfall | |
if rainAccum['year'][0] is None and Now.timetuple().tm_yday == 1: | |
yearRain = [dailyRain[0], 'mm', 0, time.time()] | |
# Else if console is initialising, calculate total yearly rainfall from the | |
# WeatherFlow API data | |
elif 'year' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['year'], 'obs'): | |
yearData = apiData[device]['year'].json()['obs'] | |
rainData = [item[index_bucket_e] for item in yearData if item[index_bucket_e] is not None] | |
try: | |
yearRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()] | |
if not math.isnan(dailyRain[0]): | |
yearRain[0] += dailyRain[0] | |
except Exception as Error: | |
Logger.warning(f'rainAccum: {system().log_time()} - {Error}') | |
yearRain = errorOutput | |
else: | |
yearRain = errorOutput | |
# Else if the end of the year has passed, reset monthly and yearly rain | |
# accumulation to current daily rain accumulation | |
elif Now.year > datetime.fromtimestamp(rainAccum['year'][3], Tz).year: | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
yearRain = [dailyAccum, 'mm', 0, time.time()] | |
monthRain = [dailyAccum, 'mm', 0, time.time()] | |
# Else if midnight has passed, permanently add rainAccum['Today'] (which | |
# still contains yesterday's accumulation) and current daily rainfall to | |
# yearly rain accumulation | |
elif Now.date() > datetime.fromtimestamp(rainAccum['year'][3], Tz).date(): | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
yearRain = [rainAccum['year'][2] + rainAccum['year'][2] + dailyAccum, 'mm', rainAccum['year'][2] + rainAccum['today'][2], time.time()] | |
# Else, calculate current yearly rain accumulation | |
else: | |
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0 | |
yearRain = [rainAccum['year'][2] + dailyAccum, 'mm', rainAccum['year'][2], time.time()] | |
# Return Daily, Monthly, and Yearly rainfall accumulation totals | |
return {'today': todayRain, 'yesterday': yesterdayRain, 'month': monthRain, 'year': yearRain} | |
def avgWindSpeed(windSpd, avgWind, device, apiData, config): | |
""" Calculate the average windspeed since midnight station time | |
INPUTS: | |
windSpd Current wind speed [m/s] | |
avgWind Current average wind speed since midnight [m/s] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
AvgWind Average wind speed since midnight [m/s] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mps', None, None, time.time()] | |
if windSpd[0] is None: | |
Logger.warning(f'avgSpeed: {system().log_time()} - windSpd is None') | |
return errorOutput | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of wind speed in websocket packets | |
if str(device) == config['Station']['SkyID']: | |
index_bucket_a = 5 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 2 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily averaged windspeed | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
todayData = apiData[device]['today'].json()['obs'] | |
windSpd = [item[index_bucket_a] for item in todayData if item[index_bucket_a] is not None] | |
try: | |
average = sum(x for x in windSpd) / len(windSpd) | |
windAvg = [average, 'mps', average, len(windSpd), time.time()] | |
except Exception as Error: | |
Logger.warning(f'avgSpeed: {system().log_time()} - {Error}') | |
windAvg = errorOutput | |
else: | |
windAvg = errorOutput | |
# Else if midnight has passed, reset daily averaged wind speed | |
elif Now.date() > datetime.fromtimestamp(avgWind[4], Tz).date(): | |
windAvg = [windSpd[0], 'mps', windSpd[0], 1, time.time()] | |
# Else, calculate current daily averaged wind speed | |
else: | |
length = avgWind[3] + 1 | |
currentAvg = avgWind[2] | |
updatedAvg = (length - 1) / length * currentAvg + 1 / length * windSpd[0] | |
windAvg = [updatedAvg, 'mps', updatedAvg, length, time.time()] | |
# Return daily averaged wind speed | |
return windAvg | |
def maxWindGust(windGust, obTime, maxGust, device, apiData, config): | |
""" Calculate the maximum wind gust since midnight station time | |
INPUTS: | |
windGust Current wind gust [m/s] | |
obTime Current observation time [s] | |
maxGust Current maximum wind gust since midnight [m/s] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
maxGust Maximum wind gust since midnight [m/s] | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'mps', '-', None, time.time()] | |
if windGust[0] is None: | |
Logger.warning(f'maxGust: {system().log_time()} - windGust is None') | |
return errorOutput | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Define index of wind speed in websocket packets | |
if str(device) == config['Station']['SkyID']: | |
index_bucket_a = 6 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 3 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate daily maximum wind gust | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
todayData = apiData[device]['today'].json()['obs'] | |
apiTime = [item[0] for item in todayData if item[index_bucket_a] is not None] | |
apiWind = [item[index_bucket_a] for item in todayData if item[index_bucket_a] is not None] | |
try: | |
maxGust = [max(apiWind), 'mps', apiTime[apiWind.index(max(apiWind))], 's', max(apiWind), apiTime[apiWind.index(max(apiWind))]] | |
except Exception as Error: | |
Logger.warning(f'maxGust: {system().log_time()} - {Error}') | |
maxGust = errorOutput | |
else: | |
maxGust = errorOutput | |
# Else if midnight has passed, reset maximum recorded wind gust | |
elif Now.date() > datetime.fromtimestamp(maxGust[5], Tz).date(): | |
maxGust = [windGust[0], 'mps', obTime[0], 's', windGust[0], obTime[0]] | |
# Else if current gust speed is greater than maximum recorded gust speed, | |
# update maximum gust speed | |
elif windGust[0] > maxGust[4]: | |
maxGust = [windGust[0], 'mps', obTime[0], 's', windGust[0], obTime[0]] | |
# Else maximum gust speed is unchanged, return existing value | |
else: | |
maxGust = [maxGust[4], 'mps', maxGust[2], 's', maxGust[4], obTime[0]] | |
# Return maximum wind gust | |
return maxGust | |
def cardinalWindDir(windDir, windSpd=[1, 'mps']): | |
""" Defines the cardinal wind direction from the current wind direction in | |
degrees. Sets the wind direction as "Calm" if current wind speed is zero | |
INPUTS: | |
windDir Current wind direction [degrees] | |
windSpd Current wind speed [m/s] | |
OUTPUT: | |
cardinalWind Cardinal wind direction | |
""" | |
# Return None if required variables are missing | |
errorOutput = [windDir[0], windDir[1], '-', '-'] | |
if windDir[0] is None and windSpd[0] != 0.0: | |
Logger.warning(f'cardWindDir: {system().log_time()} - windDir is None') | |
return errorOutput | |
elif windSpd[0] is None: | |
Logger.warning(f'cardWindDir: {system().log_time()} - windSpd is None') | |
return errorOutput | |
# Define all possible cardinal wind directions and descriptions | |
Direction = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N'] | |
Description = ['Due North', 'North NE', 'North East', 'East NE', 'Due East', 'East SE', 'South East', 'South SE', | |
'Due South', 'South SW', 'South West', 'West SW', 'Due West', 'West NW', 'North West', 'North NW', | |
'Due North'] | |
# Define actual cardinal wind direction and description based on current | |
# wind direction in degrees | |
if windSpd[0] == 0: | |
Direction = 'Calm' | |
Description = '[color=9aba2fff]Calm[/color]' | |
cardinalWind = [windDir[0], windDir[1], Direction, Description] | |
else: | |
Ind = int(round(windDir[0] / 22.5)) | |
Direction = Direction[Ind] | |
Description = Description[Ind].split()[0] + ' [color=9aba2fff]' + Description[Ind].split()[1] + '[/color]' | |
cardinalWind = [windDir[0], windDir[1], Direction, Description] | |
# Return cardinal wind direction and description | |
return cardinalWind | |
def beaufortScale(windSpd): | |
""" Defines the Beaufort scale value from the current wind speed | |
INPUTS: | |
windSpd Current wind speed [m/s] | |
OUTPUT: | |
beaufortScale Beaufort Scale speed, description, and icon | |
""" | |
# Return None if required variables are missing | |
errorOutput = windSpd + ['-', '-', '-'] | |
if windSpd[0] is None: | |
Logger.warning(f'beaufScale: {system().log_time()} - windSpd is None') | |
return errorOutput | |
# Define Beaufort scale cutoffs and Force numbers | |
Cutoffs = [0.5, 1.5, 3.3, 5.5, 7.9, 10.7, 13.8, 17.1, 20.7, 24.4, 28.4, 32.6] | |
Force = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] | |
Description = ['Calm Conditions', 'Light Air', 'Light Breeze', 'Gentle Breeze', | |
'Moderate Breeze', 'Fresh Breeze', 'Strong Breeze', 'Near Gale Force', | |
'Gale Force', 'Severe Gale Force', 'Storm Force', 'Violent Storm', | |
'Hurricane Force'] | |
# Define Beaufort Scale wind speed, description, and icon | |
Ind = bisect.bisect(Cutoffs, windSpd[0]) | |
Beaufort = [float(Force[Ind]), str(Force[Ind]), Description[Ind]] | |
# Return Beaufort Scale speed, description, and icon | |
return windSpd + Beaufort | |
def UVIndex(uvLevel): | |
""" Defines the UV index from the current UV level | |
INPUTS: | |
uvLevel Current UV level [m/s] | |
OUTPUT: | |
uvIndex UV index | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'index', '-', '#646464'] | |
if uvLevel[0] is None: | |
Logger.warning(f'UVIndex: {system().log_time()} - uvLevel is None') | |
return errorOutput | |
# Define UV Index cutoffs and level descriptions | |
Cutoffs = [0, 3, 6, 8, 11] | |
Level = ['None', 'Low', 'Moderate', 'High', 'Very High', 'Extreme'] | |
# Define UV index colours | |
Grey = '#646464' | |
Green = '#558B2F' | |
Yellow = '#F9A825' | |
Orange = '#EF6C00' | |
Red = '#B71C1C' | |
Violet = '#6A1B9A' | |
Color = [Grey, Green, Yellow, Orange, Red, Violet] | |
# Set the UV index | |
if uvLevel[0] > 0: | |
Ind = bisect.bisect(Cutoffs, round(uvLevel[0], 1)) | |
else: | |
Ind = 0 | |
uvIndex = [round(uvLevel[0], 1), 'index', Level[Ind], Color[Ind]] | |
# Return UV Index icon | |
return uvIndex | |
def peakSunHours(radiation, peakSun, device, apiData, config): | |
""" Calculate peak sun hours since midnight and daily solar potential | |
INPUTS: | |
Radiation Current solar radiation [W/m^2] | |
peakSun Current peak sun hours since midnight [hours] | |
device Device ID | |
apiData WeatherFlow REST API data | |
config Station configuration | |
OUTPUT: | |
peakSun Peak sun hours since midnight and solar potential | |
""" | |
# Return None if required variables are missing | |
errorOutput = [None, 'hrs', '-'] | |
if radiation[0] is None: | |
Logger.warning(f'peakSun: {system().log_time()} - radiation is None') | |
return errorOutput | |
# Define current time in station timezone | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
Now = datetime.now(pytz.utc).astimezone(Tz) | |
# Calculate time of sunrise and sunset or use existing values | |
if peakSun[0] is None or Now > datetime.fromtimestamp(peakSun[5], Tz): | |
Observer = ephem.Observer() | |
Observer.pressure = 0 | |
Observer.lat = str(config['Station']['Latitude']) | |
Observer.lon = str(config['Station']['Longitude']) | |
Observer.horizon = '-0:34' | |
sunrise = Observer.next_rising(ephem.Sun()).datetime().timestamp() | |
sunset = Observer.next_setting(ephem.Sun()).datetime().timestamp() | |
else: | |
sunrise = peakSun[4] | |
sunset = peakSun[5] | |
# Define index of radiation in websocket packets | |
if str(device) == config['Station']['SkyID']: | |
index_bucket_a = 10 | |
elif str(device) == config['Station']['TempestID']: | |
index_bucket_a = 11 | |
# If console is initialising, download all data for current day using | |
# Weatherflow API and calculate Peak Sun Hours | |
if 'today' in apiData[device]: | |
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'): | |
dataToday = apiData[device]['today'].json()['obs'] | |
radiation = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None] | |
try: | |
watthrs = sum([item * (1 / 60) for item in radiation]) | |
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()] | |
except Exception as Error: | |
Logger.warning(f'peakSun: {system().log_time()} - {Error}') | |
return errorOutput | |
else: | |
return errorOutput | |
# Else if midnight has passed, reset Peak Sun Hours | |
elif Now.date() > datetime.fromtimestamp(peakSun[6], Tz).date(): | |
watthrs = radiation[0] * (1 / 60) | |
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()] | |
# Else calculate current Peak Sun Hours | |
else: | |
watthrs = peakSun[3] + radiation[0] * (1 / 60) | |
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()] | |
# Calculate proportion of daylight hours that have passed | |
if datetime.fromtimestamp(sunrise, Tz) <= Now <= datetime.fromtimestamp(sunset, Tz): | |
daylightFactor = (time.time() - sunrise) / (sunset - sunrise) | |
else: | |
daylightFactor = 1 | |
# Define daily solar potential | |
if peakSun[0] / daylightFactor == 0: | |
peakSun.insert(2, '[color=#646464ff]None[/color]') | |
elif peakSun[0] / daylightFactor < 2: | |
peakSun.insert(2, '[color=#4575b4ff]Limited[/color]') | |
elif peakSun[0] / daylightFactor < 4: | |
peakSun.insert(2, '[color=#fee090ff]Moderate[/color]') | |
elif peakSun[0] / daylightFactor < 6: | |
peakSun.insert(2, '[color=#f46d43ff]Good[/color]') | |
else: | |
peakSun.insert(2, '[color=#d73027ff]Excellent[/color]') | |
# Return Peak Sun Hours | |
return peakSun |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Formats and sets the required units of observations displayed on the | |
Raspberry Pi Python console for Weather Flow Smart Home Weather Stations. | |
Copyright (C) 2018-2023 Peter Davis | |
This program is free software: you can redistribute it and/or modify it under | |
the terms of the GNU General Public License as published by the Free Software | |
Foundation, either version 3 of the License, or (at your option) any later | |
version. | |
This program is distributed in the hope that it will be useful, but WITHOUT | |
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License along with | |
this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
# Import required modules | |
from lib import derivedVariables as derive | |
from datetime import datetime | |
import pytz | |
def Units(Obs, Unit): | |
""" Sets the required observation units | |
INPUTS: | |
Obs Observations with current units | |
Unit Required output unit | |
OUTPUT: | |
cObs Observation converted into required unit | |
""" | |
# Convert temperature observations | |
cObs = Obs[:] | |
if Unit in ['f', 'c']: | |
for ii, T in enumerate(Obs): | |
if T == 'c': | |
if Unit == 'f': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * (9 / 5) + 32 | |
cObs[ii] = 'f' | |
else: | |
cObs[ii - 1] = Obs[ii - 1] | |
cObs[ii] = 'c' | |
if T in ['dc', 'c/hr']: | |
if Unit == 'f': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * (9 / 5) | |
if T == 'dc': | |
cObs[ii] = 'f' | |
elif T == 'c/hr': | |
cObs[ii] = 'f/hr' | |
else: | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] | |
if T == 'dc': | |
cObs[ii] = 'c' | |
# Convert pressure and pressure trend observations | |
elif Unit in ['inhg', 'mmhg', 'hpa', 'mb']: | |
for ii, P in enumerate(Obs): | |
if P in ['mb', 'mb/hr']: | |
if Unit == 'inhg': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 0.0295301 | |
if P == 'mb': | |
cObs[ii] = ' inHg' | |
else: | |
cObs[ii] = ' inHg/hr' | |
elif Unit == 'mmhg': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 0.750063 | |
if P == 'mb': | |
cObs[ii] = ' mmHg' | |
else: | |
cObs[ii] = ' mmHg/hr' | |
elif Unit == 'hpa': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] | |
if P == 'mb': | |
cObs[ii] = ' hPa' | |
else: | |
cObs[ii] = ' hPa/hr' | |
else: | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] | |
if P == 'mb': | |
cObs[ii] = ' mb' | |
else: | |
cObs[ii] = ' mb/hr' | |
# Convert windspeed observations | |
elif Unit in ['mph', 'lfm', 'kts', 'kph', 'bft', 'mps']: | |
for ii, W in enumerate(Obs): | |
if W == 'mps': | |
if Unit == 'mph' or Unit == 'lfm': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 2.2369362920544 | |
cObs[ii] = 'mph' | |
elif Unit == 'kts': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 1.9438 | |
cObs[ii] = 'kts' | |
elif Unit == 'kph': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 3.6 | |
cObs[ii] = 'km/h' | |
elif Unit == 'bft': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = derive.beaufortScale(Obs[ii - 1:ii + 1])[2] | |
cObs[ii] = 'bft' | |
else: | |
cObs[ii - 1] = Obs[ii - 1] | |
cObs[ii] = 'm/s' | |
# Convert wind direction observations | |
elif Unit in ['degrees', 'cardinal']: | |
for ii, W in enumerate(Obs): | |
if W == 'degrees': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
cObs[ii] = '' | |
elif cObs[ii - 1] == 'calm': | |
cObs[ii - 1] = 'Calm' | |
cObs[ii] = '' | |
elif Unit == 'cardinal': | |
cObs[ii - 1] = derive.cardinalWindDir(Obs[ii - 1:ii + 1])[2] | |
cObs[ii] = '' | |
else: | |
cObs[ii - 1] = Obs[ii - 1] | |
cObs[ii] = 'degrees' | |
# Convert rain accumulation and rain rate observations | |
elif Unit in ['in', 'cm', 'mm']: | |
for ii, Prcp in enumerate(Obs): | |
if Prcp in ['mm', 'mm/hr']: | |
if Unit == 'in': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 0.0393701 | |
if Prcp == 'mm': | |
cObs[ii] = ' in' | |
else: | |
cObs[ii] = ' in/hr' | |
elif Unit == 'cm': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 0.1 | |
if Prcp == 'mm': | |
cObs[ii] = ' cm' | |
else: | |
cObs[ii] = ' cm/hr' | |
else: | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] | |
if Prcp == 'mm': | |
cObs[ii] = ' mm' | |
else: | |
cObs[ii] = ' mm/hr' | |
# Convert distance observations | |
elif Unit in ['km', 'mi']: | |
for ii, Dist in enumerate(Obs): | |
if Dist == 'km': | |
if Unit == 'mi': | |
if Obs[ii - 1] is not None: | |
cObs[ii - 1] = Obs[ii - 1] * 0.62137 | |
cObs[ii] = 'miles' | |
# Convert other observations | |
elif Unit in ['metric', 'imperial']: | |
for ii, other in enumerate(Obs): | |
if other == 'Wm2': | |
pass | |
elif other == 'index': | |
pass | |
elif other == 'hrs': | |
pass | |
elif other == '/min': | |
pass | |
elif other == 'count': | |
pass | |
elif other == 's': | |
pass | |
elif other == '%': | |
pass | |
# Return converted observations | |
return cObs | |
def Format(Obs, obType, config=[]): | |
""" Formats the observation for display on the console | |
INPUTS: | |
Obs Observations with units | |
obType Observation type | |
OUTPUT: | |
cObs Formatted observation based on specified obType | |
""" | |
# Convert obType to list if required | |
if not isinstance(obType, list): | |
obType = [obType] | |
# Format temperature observations | |
cObs = Obs[:] | |
for Type in obType: | |
if Type == 'Temp': | |
for ii, T in enumerate(Obs): | |
if isinstance(T, str) and T.strip() in ['c', 'f']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
elif round(cObs[ii - 1], 1) == 0.0: | |
cObs[ii - 1] = '{:.1f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
if T.strip() == 'c': | |
cObs[ii] = u'\N{DEGREE CELSIUS}' | |
elif T.strip() == 'f': | |
cObs[ii] = u'\N{DEGREE FAHRENHEIT}' | |
elif isinstance(T, str) and T.strip() in ['c/hr', 'f/hr']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
elif round(cObs[ii - 1], 1) == 0.0: | |
cObs[ii - 1] = '{:.1f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:+.1f}'.format(cObs[ii - 1]) | |
if T.strip() == 'c/hr': | |
cObs[ii] = u'\N{DEGREE CELSIUS}/hr' | |
elif T.strip() == 'f/hr': | |
cObs[ii] = u'\N{DEGREE FAHRENHEIT}/hr' | |
elif Type == 'forecastTemp': | |
for ii, T in enumerate(Obs): | |
if isinstance(T, str) and T.strip() in ['c', 'f']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
elif round(cObs[ii - 1], 1) == 0.0: | |
cObs[ii - 1] = '{:.0f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
if T.strip() == 'c': | |
cObs[ii] = u'\N{DEGREE CELSIUS}' | |
elif T.strip() == 'f': | |
cObs[ii] = u'\N{DEGREE FAHRENHEIT}' | |
# Format pressure observations | |
elif Type == 'Pressure': | |
for ii, P in enumerate(Obs): | |
if isinstance(P, str) and P.strip() in ['inHg/hr', 'inHg', 'mmHg/hr', 'mmHg', 'hPa/hr', 'mb/hr', 'hPa', 'mb']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if P.strip() in ['inHg/hr', 'inHg']: | |
if round(cObs[ii - 1], 3) == 0.0: | |
cObs[ii - 1] = '{:+.3f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:+.3f}'.format(cObs[ii - 1]) | |
elif P.strip() in ['mmHg/hr', 'mmHg']: | |
if round(cObs[ii - 1], 2) == 0.0: | |
cObs[ii - 1] = '{:+.2f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:+.2f}'.format(cObs[ii - 1]) | |
elif P.strip() in ['hPa/hr', 'mb/hr', 'hPa', 'mb']: | |
if round(cObs[ii - 1], 1) == 0.0: | |
cObs[ii - 1] = '{:+.1f}'.format(abs(cObs[ii - 1])) | |
else: | |
cObs[ii - 1] = '{:+.1f}'.format(cObs[ii - 1]) | |
# Format windspeed observations | |
elif Type == 'Wind': | |
for ii, W in enumerate(Obs): | |
if isinstance(W, str) and W.strip() in ['mph', 'kts', 'km/h', 'bft', 'm/s']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if round(cObs[ii - 1], 1) < 100: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif Type == 'forecastWind': | |
for ii, W in enumerate(Obs): | |
if isinstance(W, str) and W.strip() in ['mph', 'kts', 'km/h', 'bft', 'm/s']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
# Format wind direction observations | |
elif Type == 'Direction': | |
for ii, D in enumerate(Obs): | |
if isinstance(D, str) and D.strip() in ['degrees']: | |
cObs[ii] = u'\u00B0' | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
# Format rain accumulation and rain rate observations | |
elif Type == 'Precip': | |
for ii, Prcp in enumerate(Obs): | |
if isinstance(Prcp, str): | |
if Prcp.strip() == 'mm': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if cObs[ii - 1] == 0: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif cObs[ii - 1] < 0.127: | |
cObs[ii - 1] = 'Trace' | |
cObs[ii] = '' | |
elif round(cObs[ii - 1], 1) < 10: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif Prcp.strip() == 'cm': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if cObs[ii - 1] == 0: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif cObs[ii - 1] < 0.0127: | |
cObs[ii - 1] = 'Trace' | |
cObs[ii] = '' | |
elif round(cObs[ii - 1], 2) < 10: | |
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1]) | |
elif round(cObs[ii - 1], 1) < 100: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif Prcp.strip() == 'in': | |
cObs[ii] = u'\u0022' | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if cObs[ii - 1] == 0: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif cObs[ii - 1] < 0.005: | |
cObs[ii - 1] = 'Trace' | |
cObs[ii] = '' | |
elif round(cObs[ii - 1], 2) < 10: | |
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1]) | |
elif round(cObs[ii - 1], 1) < 100: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif Prcp.strip() == 'mm/hr': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if cObs[ii - 1] == 0: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif cObs[ii - 1] < 0.1: | |
cObs[ii - 1] = '<0.1' | |
elif round(cObs[ii - 1], 1) < 10: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif Prcp.strip() in ['in/hr', 'cm/hr']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
if cObs[ii - 1] == 0: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
elif cObs[ii - 1] < 0.01: | |
cObs[ii - 1] = '<0.01' | |
elif round(cObs[ii - 1], 2) < 10: | |
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1]) | |
elif round(cObs[ii - 1], 1) < 100: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
# Format humidity observations | |
elif Type == 'Humidity': | |
for ii, H in enumerate(Obs): | |
if isinstance(H, str) and H.strip() == '%': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
# Format solar radiation observations | |
elif Type == 'Radiation': | |
for ii, Rad in enumerate(Obs): | |
if isinstance(Rad, str) and Rad.strip() == 'Wm2': | |
cObs[ii] = ' W/m' + u'\u00B2' | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
# Format UV observations | |
elif Type == 'UV': | |
for ii, UV in enumerate(Obs): | |
if isinstance(UV, str) and UV.strip() == 'index': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
cObs.extend(['-', '#646464']) | |
else: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
# Format Peak Sun Hours observations | |
elif Type == 'peakSun': | |
for ii, psh in enumerate(Obs): | |
if isinstance(psh, str) and psh.strip() == 'hrs': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1]) | |
# Format battery voltage observations | |
elif Type == 'Battery': | |
for ii, V in enumerate(Obs): | |
if isinstance(V, str) and V.strip() == 'v': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1]) | |
# Format lightning strike count observations | |
elif Type == 'StrikeCount': | |
for ii, L in enumerate(Obs): | |
if isinstance(L, str) and L.strip() == 'count': | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
elif cObs[ii - 1] < 1000: | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
else: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1] / 1000) + ' k' | |
# Format lightning strike distance observations | |
elif Type == 'StrikeDistance': | |
for ii, StrikeDist in enumerate(Obs): | |
if isinstance(StrikeDist, str): | |
if StrikeDist.strip() in ['km']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(max(cObs[ii - 1] - 3, 0)) + '-' + '{:.0f}'.format(cObs[ii - 1] + 3) | |
elif StrikeDist.strip() in ['miles']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
cObs[ii - 1] = '{:.0f}'.format(max(cObs[ii - 1] - 3 * 0.62137, 0)) + '-' + '{:.0f}'.format(cObs[ii - 1] + 3 * 0.62137) | |
# Format lightning strike frequency observations | |
elif Type == 'StrikeFrequency': | |
for ii, StrikeFreq in enumerate(Obs): | |
if isinstance(StrikeFreq, str): | |
if StrikeFreq.strip() in ['/min']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
cObs[ii] = ' /min' | |
elif cObs[ii - 1].is_integer(): | |
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1]) | |
cObs[ii] = ' /min' | |
else: | |
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1]) | |
cObs[ii] = ' /min' | |
# Format time difference observations | |
elif Type == 'Time': | |
for ii, Time in enumerate(Obs): | |
if isinstance(Time, str) and Time.strip() in ['s']: | |
if cObs[ii - 1] is None: | |
cObs[ii - 1] = '-' | |
else: | |
Tz = pytz.timezone(config['Station']['Timezone']) | |
if config['Display']['TimeFormat'] == '12 hr': | |
if config['System']['Hardware'] == 'Other': | |
Format = '%#I:%M %p' | |
else: | |
Format = '%-I:%M %p' | |
else: | |
Format = '%H:%M' | |
cObs[ii - 1] = datetime.fromtimestamp(cObs[ii - 1], Tz).strftime(Format) | |
# Format time difference observations | |
elif Type == 'TimeDelta': | |
for ii, Delta in enumerate(Obs): | |
if isinstance(Delta, str) and Delta.strip() in ['s']: | |
if cObs[ii - 1] is None: | |
cObs = ['-', '-', '-', '-', cObs[2]] | |
else: | |
days, remainder = divmod(cObs[ii - 1], 86400) | |
hours, remainder = divmod(remainder, 3600) | |
minutes, seconds = divmod(remainder, 60) | |
if days >= 1: | |
if days == 1: | |
if hours == 1: | |
cObs = ['{:.0f}'.format(days), 'day', '{:.0f}'.format(hours), 'hour', cObs[2]] | |
else: | |
cObs = ['{:.0f}'.format(days), 'day', '{:.0f}'.format(hours), 'hours', cObs[2]] | |
elif days <= 99: | |
if hours == 1: | |
cObs = ['{:.0f}'.format(days), 'days', '{:.0f}'.format(hours), 'hour', cObs[2]] | |
else: | |
cObs = ['{:.0f}'.format(days), 'days', '{:.0f}'.format(hours), 'hours', cObs[2]] | |
elif days >= 100: | |
cObs = ['{:.0f}'.format(days), 'days', '-', '-', cObs[2]] | |
elif hours >= 1: | |
if hours == 1: | |
if minutes == 1: | |
cObs = ['{:.0f}'.format(hours), 'hour', '{:.0f}'.format(minutes), 'min', cObs[2]] | |
else: | |
cObs = ['{:.0f}'.format(hours), 'hour', '{:.0f}'.format(minutes), 'mins', cObs[2]] | |
elif hours > 1: | |
if minutes == 1: | |
cObs = ['{:.0f}'.format(hours), 'hours', '{:.0f}'.format(minutes), 'min', cObs[2]] | |
else: | |
cObs = ['{:.0f}'.format(hours), 'hours', '{:.0f}'.format(minutes), 'mins', cObs[2]] | |
else: | |
if minutes == 0: | |
cObs = ['< 1', 'minute', '-', '-', cObs[2]] | |
elif minutes == 1: | |
cObs = ['{:.0f}'.format(minutes), 'minute', '-', '-', cObs[2]] | |
else: | |
cObs = ['{:.0f}'.format(minutes), 'minutes', '-', '-', cObs[2]] | |
# Return formatted observations | |
return cObs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Handles Websocket messages received by the Raspberry Pi Python console for | |
WeatherFlow Tempest and Smart Home Weather stations. | |
Copyright (C) 2018-2023 Peter Davis | |
This program is free software: you can redistribute it and/or modify it under | |
the terms of the GNU General Public License as published by the Free Software | |
Foundation, either version 3 of the License, or (at your option) any later | |
version. | |
This program is distributed in the hope that it will be useful, but WITHOUT ANY | |
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A | |
PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License along with | |
this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
# Import required library modules | |
from lib.request_api import weatherflow_api | |
from lib.system import system | |
from lib import derivedVariables as derive | |
from lib import observationFormat as observation | |
from lib import properties | |
# Import required Kivy modules | |
from kivy.logger import Logger | |
from kivy.clock import mainthread | |
from kivy.app import App | |
# Define empty deviceObs dictionary | |
device_obs = {'obTime': [None, 's'], 'pressure': [None, 'mb'], 'outTemp': [None, 'c'], | |
'inTemp': [None, 'c'], 'humidity': [None, '%'], 'windSpd': [None, 'mps'], | |
'windGust': [None, 'mps'], 'windDir': [None, 'degrees'], 'rapidWindSpd': [None, 'mps'], | |
'rapidWindDir': [None, 'degrees'], 'uvIndex': [None, 'index'], 'radiation': [None, 'Wm2'], | |
'minuteRain': [None, 'mm'], 'dailyRain': [None, 'mm'], 'strikeMinute': [None, 'count'], | |
'strikeTime': [None, 's'], 'strikeDist': [None, 'km'], 'strike3hr': [None, 'count'], | |
} | |
# Define empty deriveObs dictionary | |
derive_obs = {'dewPoint': [None, 'c'], 'feelsLike': [None, 'c', '-', '-'], 'outTempMax': [None, 'c', '-'], | |
'outTempMin': [None, 'c', '-'], 'outTempDiff': [None, 'dc', '-'], 'outTempTrend': [None, 'c/hr', 'c8c8c8ff'], | |
'inTempMax': [None, 'c', '-'], 'inTempMin': [None, 'c', '-'], 'SLP': [None, 'mb', None], | |
'SLPTrend': [None, 'mb/hr', '-', '-'], 'SLPMin': [None, 'mb', '-'], 'SLPMax': [None, 'mb', '-'], | |
'windSpd': [None, 'mps', '-', '-', '-'], 'windAvg': [None, 'mps'], 'gustMax': [None, 'mps', '-'], | |
'windDir': [None, 'degrees', '-', '-'], 'rapidWindDir': [None, 'degrees', '-', '-'], 'rainRate': [None, 'mm/hr', '-'], | |
'uvIndex': [None, 'index'], 'peakSun': [None, 'hrs', '-'], 'strikeDeltaT': [None, 's', None], | |
'strikeFreq': [None, '/min', None, '/min'], | |
'strikeCount': {'today': [None, 'count'], | |
'month': [None, 'count'], | |
'year': [None, 'count'] | |
}, | |
'rainAccum': {'today': [None, 'mm'], | |
'yesterday': [None, 'mm'], | |
'month': [None, 'mm'], | |
'year': [None, 'mm'] | |
} | |
} | |
# ============================================================================= | |
# DEFINE 'obsParser' CLASS | |
# ============================================================================= | |
class obsParser(): | |
def __init__(self): | |
# Define instance variables | |
self.display_obs = properties.Obs() | |
self.api_data = {} | |
self.transmit = 1 | |
self.flag_api = [1, 1, 1, 1] | |
# Create reference to app object | |
self.app = App.get_running_app() | |
self.app.obsParser = self | |
# Define device and derived observations dictionary | |
self.device_obs = device_obs.copy() | |
self.derive_obs = derive_obs.copy() | |
def parse_obs_st(self, message, config): | |
""" Parse obs_st Websocket messages from TEMPEST module | |
INPUTS: | |
message obs_sky Websocket message | |
config Console configuration object | |
""" | |
# Extract latest TEMPEST Websocket JSON | |
if 'obs' in message: | |
latestOb = message['obs'][0] | |
else: | |
return | |
# Extract TEMPEST device_id. Initialise API data dictionary | |
device_id = message['device_id'] | |
self.api_data[device_id] = {'flagAPI': self.flag_api[0]} | |
# Discard duplicate TEMPEST Websocket messages | |
if 'obs_st' in self.display_obs: | |
if self.display_obs['obs_st']['obs'][0] == latestOb[0]: | |
return | |
# Extract required observations from latest TEMPEST Websocket JSON | |
self.device_obs['obTime'] = [latestOb[0], 's'] | |
self.device_obs['windSpd'] = [latestOb[2], 'mps'] | |
self.device_obs['windGust'] = [latestOb[3], 'mps'] | |
self.device_obs['windDir'] = [latestOb[4], 'degrees'] | |
self.device_obs['pressure'] = [latestOb[6], 'mb'] | |
self.device_obs['outTemp'] = [latestOb[7], 'c'] | |
self.device_obs['humidity'] = [latestOb[8], '%'] | |
self.device_obs['uvIndex'] = [latestOb[10], 'index'] | |
self.device_obs['radiation'] = [latestOb[11], 'Wm2'] | |
self.device_obs['minuteRain'] = [latestOb[12], 'mm'] | |
self.device_obs['strikeMinute'] = [latestOb[15], 'count'] | |
self.device_obs['dailyRain'] = [latestOb[18], 'mm'] | |
# Extract lightning strike data from the latest TEMPEST Websocket JSON | |
# "Summary" object | |
self.device_obs['strikeTime'] = [message['summary']['strike_last_epoch'] if 'strike_last_epoch' in message['summary'] else None, 's'] | |
self.device_obs['strikeDist'] = [message['summary']['strike_last_dist'] if 'strike_last_dist' in message['summary'] else None, 'km'] | |
self.device_obs['strike3hr'] = [message['summary']['strike_count_3h'] if 'strike_count_3h' in message['summary'] else None, 'count'] | |
# Request required TEMPEST data from the WeatherFlow API | |
self.api_data[device_id]['24Hrs'] = weatherflow_api.last_24h(device_id, latestOb[0], config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['SLPMin'][0] is None | |
or self.derive_obs['SLPMax'][0] is None | |
or self.derive_obs['outTempMin'][0] is None | |
or self.derive_obs['outTempMax'][0] is None | |
or self.derive_obs['windAvg'][0] is None | |
or self.derive_obs['gustMax'][0] is None | |
or self.derive_obs['peakSun'][0] is None | |
or self.derive_obs['strikeCount']['today'][0] is None): | |
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['yesterday'][0] is None): | |
self.api_data[device_id]['yesterday'] = weatherflow_api.yesterday(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['month'][0] is None | |
or self.derive_obs['strikeCount']['month'][0] is None): | |
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['year'][0] is None | |
or self.derive_obs['strikeCount']['year'][0] is None): | |
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config) | |
self.flag_api[0] = 0 | |
# Store latest TEMPEST JSON message | |
self.display_obs['obs_st'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'obs_st') | |
def parse_obs_sky(self, message, config): | |
""" Parse obs_sky Websocket messages from SKY module | |
INPUTS: | |
message obs_sky Websocket message | |
config Console configuration object | |
""" | |
# Extract latest SKY Websocket JSON | |
if 'obs' in message: | |
latestOb = message['obs'][0] | |
else: | |
return | |
# Extract SKY device_id. Initialise API data dictionary | |
device_id = message['device_id'] | |
self.api_data[device_id] = {'flagAPI': self.flag_api[1]} | |
# Discard duplicate SKY Websocket messages | |
if 'obs_sky' in self.display_obs: | |
if self.display_obs['obs_sky']['obs'][0] == latestOb[0]: | |
return | |
# Extract required observations from latest SKY Websocket JSON | |
self.device_obs['uvIndex'] = [latestOb[2], 'index'] | |
self.device_obs['minuteRain'] = [latestOb[3], 'mm'] | |
self.device_obs['windSpd'] = [latestOb[5], 'mps'] | |
self.device_obs['windGust'] = [latestOb[6], 'mps'] | |
self.device_obs['windDir'] = [latestOb[7], 'degrees'] | |
self.device_obs['radiation'] = [latestOb[10], 'Wm2'] | |
self.device_obs['dailyRain'] = [latestOb[11], 'mm'] | |
# Request required SKY data from the WeatherFlow API | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['windAvg'][0] is None | |
or self.derive_obs['gustMax'][0] is None | |
or self.derive_obs['peakSun'][0] is None): | |
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['yesterday'][0] is None): | |
self.api_data[device_id]['yesterday'] = weatherflow_api.yesterday(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['month'][0] is None): | |
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['rainAccum']['year'][0] is None): | |
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config) | |
self.flag_api[1] = 0 | |
# Store latest SKY JSON message | |
self.display_obs['obs_sky'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'obs_sky') | |
def parse_obs_out_air(self, message, config): | |
""" Parse obs_air Websocket messages from outdoor AIR module | |
INPUTS: | |
message obs_air Websocket message | |
config Console configuration object | |
""" | |
# Extract latest outdoor AIR Websocket JSON | |
if 'obs' in message: | |
latestOb = message['obs'][0] | |
else: | |
return | |
# Extract outdoor AIR device_id. Initialise API data dictionary | |
device_id = message['device_id'] | |
self.api_data[device_id] = {'flagAPI': self.flag_api[2]} | |
# Discard duplicate outdoor AIR Websocket messages | |
if 'obs_out_air' in self.display_obs: | |
if self.display_obs['obs_out_air']['obs'][0] == latestOb[0]: | |
return | |
# Extract required observations from latest outdoor AIR Websocket JSON | |
self.device_obs['obTime'] = [latestOb[0], 's'] | |
self.device_obs['pressure'] = [latestOb[1], 'mb'] | |
self.device_obs['outTemp'] = [latestOb[2], 'c'] | |
self.device_obs['humidity'] = [latestOb[3], '%'] | |
self.device_obs['strikeMinute'] = [latestOb[4], 'count'] | |
# Extract lightning strike data from the latest outdoor AIR Websocket | |
# JSON "Summary" object | |
self.device_obs['strikeTime'] = [message['summary']['strike_last_epoch'] if 'strike_last_epoch' in message['summary'] else None, 's'] | |
self.device_obs['strikeDist'] = [message['summary']['strike_last_dist'] if 'strike_last_dist' in message['summary'] else None, 'km'] | |
self.device_obs['strike3hr'] = [message['summary']['strike_count_3h'] if 'strike_count_3h' in message['summary'] else None, 'count'] | |
# Request required outdoor AIR data from the WeatherFlow API | |
self.api_data[device_id]['24Hrs'] = weatherflow_api.last_24h(device_id, latestOb[0], config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['SLPMin'][0] is None | |
or self.derive_obs['SLPMax'][0] is None | |
or self.derive_obs['outTempMin'][0] is None | |
or self.derive_obs['outTempMax'][0] is None | |
or self.derive_obs['strikeCount']['today'][0] is None): | |
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['strikeCount']['month'][0] is None): | |
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config) | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['strikeCount']['year'][0] is None): | |
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config) | |
self.flag_api[2] = 0 | |
# Store latest outdoor AIR JSON message | |
self.display_obs['obs_out_air'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'obs_out_air') | |
def parse_obs_in_air(self, message, config): | |
""" Parse obs_air Websocket messages from indoor AIR module | |
INPUTS: | |
message obs_air Websocket message | |
config Console configuration object | |
""" | |
# Extract latest indoor AIR Websocket JSON | |
if 'obs' in message: | |
latestOb = message['obs'][0] | |
else: | |
return | |
# Extract indoor AIR device_id. Initialise API data dictionary | |
device_id = message['device_id'] | |
self.api_data[device_id] = {'flagAPI': self.flag_api[3]} | |
# Discard duplicate indoor AIR Websocket messages | |
if 'obs_in_air' in self.display_obs: | |
if self.display_obs['obs_in_air']['obs'][0] == latestOb[0]: | |
return | |
# Extract required observations from latest indoor AIR Websocket JSON | |
self.device_obs['obTime'] = [latestOb[0], 's'] | |
self.device_obs['inTemp'] = [latestOb[2], 'c'] | |
# Request required indoor AIR data from the WeatherFlow API | |
if (self.api_data[device_id]['flagAPI'] | |
or self.derive_obs['inTempMin'][0] is None | |
or self.derive_obs['inTempMax'][0] is None): | |
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config) | |
self.flag_api[3] = 0 | |
# Store latest indoor AIR JSON message | |
self.display_obs['obs_in_air'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'obs_in_air') | |
def parse_rapid_wind(self, message, config): | |
""" Parse rapid_wind Websocket messages from SKY or TEMPEST module | |
INPUTS: | |
message rapid_wind Websocket message received from | |
SKY or TEMPEST module | |
config Console configuration object | |
""" | |
# Extract latest rapid_wind Websocket JSON | |
if 'ob' in message: | |
latestOb = message['ob'] | |
else: | |
return | |
# Extract device ID | |
device_id = message['device_id'] | |
# Discard duplicate rapid_wind Websocket messages | |
if 'rapid_wind' in self.display_obs: | |
if self.display_obs['rapid_wind']['ob'][0] == latestOb[0]: | |
return | |
# Extract required observations from latest rapid_wind Websocket JSON | |
self.device_obs['rapidWindSpd'] = [latestOb[1], 'mps'] | |
self.device_obs['rapidWindDir'] = [latestOb[2], 'degrees'] | |
# Extract wind direction from previous rapid_wind Websocket JSON | |
if 'rapid_wind' in self.device_obs: | |
previousOb = self.device_obs['rapid_wind']['ob'] | |
rapidWindDirOld = [previousOb[2], 'degrees'] | |
else: | |
rapidWindDirOld = [0, 'degrees'] | |
# If windspeed is zero, freeze direction at last direction of non-zero | |
# wind speed and edit latest rapid_wind Websocket JSON message. | |
if self.device_obs['rapidWindSpd'][0] == 0: | |
self.device_obs['rapidWindDir'] = rapidWindDirOld | |
message['ob'][2] = rapidWindDirOld[0] | |
# Store latest rapid_wind Websocket JSON message | |
self.display_obs['rapid_wind'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'rapid_wind') | |
def parse_evt_strike(self, message, config): | |
""" Parse lightning strike event Websocket messages received from AIR | |
or TEMPEST module | |
INPUTS: | |
message evt_strike Websocket message received from | |
AIR or TEMPEST module | |
config Console configuration object | |
""" | |
# Extract latest evt_strike Websocket JSON | |
if 'evt' in message: | |
latestEvt = message['evt'] | |
else: | |
return | |
# Extract device ID | |
device_id = message['device_id'] | |
# Discard duplicate evt_strike Websocket messages | |
if 'evt_strike' in self.display_obs: | |
if self.display_obs['evt_strike']['evt'][0] == latestEvt[0]: | |
return | |
# Extract required observations from latest evt_strike Websocket JSON | |
self.device_obs['strikeTime'] = [latestEvt[0], 's'] | |
self.device_obs['strikeDist'] = [latestEvt[1], 'km'] | |
# Store latest evt_strike JSON message | |
self.display_obs['evt_strike'] = message | |
# Calculate derived observations | |
self.calcDerivedVariables(device_id, config, 'evt_strike') | |
def calcDerivedVariables(self, device, config, device_type): | |
""" Calculate derived variables from available device observations | |
INPUTS: | |
device Device ID | |
config Console configuration object | |
device_type Device type | |
""" | |
# Derive variables from available obs_out_air and obs_st observations | |
# Derive variables from available obs_out_air and obs_st observations | |
if device_type in ('obs_out_air', 'obs_st'): | |
self.derive_obs['feelsLike'] = derive.feelsLike(self.device_obs['outTemp'], self.device_obs['humidity'], self.device_obs['windSpd'], config) | |
self.derive_obs['dewPoint'] = derive.dewPoint(self.device_obs['outTemp'], self.device_obs['humidity']) | |
self.derive_obs['outTempDiff'] = derive.tempDiff(self.device_obs['outTemp'], self.device_obs['obTime'], device, self.api_data, config) | |
self.derive_obs['outTempTrend'] = derive.tempTrend(self.device_obs['outTemp'], self.device_obs['obTime'], device, self.api_data, config) | |
self.derive_obs['outTempMax'] = derive.tempMax(self.device_obs['outTemp'], self.device_obs['obTime'], self.derive_obs['outTempMax'], device, self.api_data, config) | |
self.derive_obs['outTempMin'] = derive.tempMin(self.device_obs['outTemp'], self.device_obs['obTime'], self.derive_obs['outTempMin'], device, self.api_data, config) | |
self.derive_obs['SLP'] = derive.SLP(self.device_obs['pressure'], device, config) | |
self.derive_obs['SLPTrend'] = derive.SLPTrend(self.device_obs['pressure'], self.device_obs['obTime'], device, self.api_data, config) | |
self.derive_obs['SLPMax'] = derive.SLPMax(self.device_obs['pressure'], self.device_obs['obTime'], self.derive_obs['SLPMax'], device, self.api_data, config) | |
self.derive_obs['SLPMin'] = derive.SLPMin(self.device_obs['pressure'], self.device_obs['obTime'], self.derive_obs['SLPMin'], device, self.api_data, config) | |
self.derive_obs['strikeCount'] = derive.strikeCount(self.device_obs['strikeMinute'], self.derive_obs['strikeCount'], device, self.api_data, config) | |
self.derive_obs['strikeFreq'] = derive.strikeFrequency(self.device_obs['obTime'], device, self.api_data, config) | |
self.derive_obs['strikeDeltaT'] = derive.strikeDeltaT(self.device_obs['strikeTime']) | |
# Derive variables from available obs_sky and obs_st observations | |
if device_type in ('obs_sky', 'obs_st'): | |
self.derive_obs['uvIndex'] = derive.UVIndex(self.device_obs['uvIndex']) | |
self.derive_obs['peakSun'] = derive.peakSunHours(self.device_obs['radiation'], self.derive_obs['peakSun'], device, self.api_data, config) | |
self.derive_obs['windSpd'] = derive.beaufortScale(self.device_obs['windSpd']) | |
self.derive_obs['windDir'] = derive.cardinalWindDir(self.device_obs['windDir'], self.device_obs['windSpd']) | |
self.derive_obs['windAvg'] = derive.avgWindSpeed(self.device_obs['windSpd'], self.derive_obs['windAvg'], device, self.api_data, config) | |
self.derive_obs['gustMax'] = derive.maxWindGust(self.device_obs['windGust'], self.device_obs['obTime'], self.derive_obs['gustMax'], device, self.api_data, config) | |
self.derive_obs['rainRate'] = derive.rainRate(self.device_obs['minuteRain']) | |
self.derive_obs['rainAccum'] = derive.rainAccumulation(self.device_obs['dailyRain'], self.derive_obs['rainAccum'], device, self.api_data, config) | |
# Derive variables from available obs_out_air and obs_st observations | |
if device_type == 'obs_in_air': | |
self.derive_obs['inTempMax'] = derive.tempMax(self.device_obs['inTemp'], self.device_obs['obTime'], self.derive_obs['inTempMax'], device, self.api_data, config) | |
self.derive_obs['inTempMin'] = derive.tempMin(self.device_obs['inTemp'], self.device_obs['obTime'], self.derive_obs['inTempMin'], device, self.api_data, config) | |
# Derive variables from available rapid_wind observations | |
if device_type == 'rapid_wind': | |
self.derive_obs['rapidWindDir'] = derive.cardinalWindDir(self.device_obs['rapidWindDir'], self.device_obs['rapidWindSpd']) | |
# Derive variables from available evt_strike observations | |
if device_type == 'evt_strike': | |
self.derive_obs['strikeDeltaT'] = derive.strikeDeltaT(self.device_obs['strikeTime']) | |
# Format derived observations | |
self.format_derived_variables(config, device_type) | |
def format_derived_variables(self, config, device_type): | |
""" Format derived variables from available device observations | |
INPUTS: | |
config Console configuration object | |
device_type Device type | |
""" | |
# Convert derived variable units from obs_out_air and obs_st observations | |
if device_type in ('obs_out_air', 'obs_st', 'obs_all'): | |
outTemp = observation.Units(self.device_obs['outTemp'], config['Units']['Temp']) | |
feelsLike = observation.Units(self.derive_obs['feelsLike'], config['Units']['Temp']) | |
dewPoint = observation.Units(self.derive_obs['dewPoint'], config['Units']['Temp']) | |
outTempDiff = observation.Units(self.derive_obs['outTempDiff'], config['Units']['Temp']) | |
outTempTrend = observation.Units(self.derive_obs['outTempTrend'], config['Units']['Temp']) | |
outTempMax = observation.Units(self.derive_obs['outTempMax'], config['Units']['Temp']) | |
outTempMin = observation.Units(self.derive_obs['outTempMin'], config['Units']['Temp']) | |
humidity = observation.Units(self.device_obs['humidity'], config['Units']['Other']) | |
SLP = observation.Units(self.derive_obs['SLP'], config['Units']['Pressure']) | |
SLPTrend = observation.Units(self.derive_obs['SLPTrend'], config['Units']['Pressure']) | |
SLPMax = observation.Units(self.derive_obs['SLPMax'], config['Units']['Pressure']) | |
SLPMin = observation.Units(self.derive_obs['SLPMin'], config['Units']['Pressure']) | |
strikeDist = observation.Units(self.device_obs['strikeDist'], config['Units']['Distance']) | |
strikeDeltaT = observation.Units(self.derive_obs['strikeDeltaT'], config['Units']['Other']) | |
strikeFreq = observation.Units(self.derive_obs['strikeFreq'], config['Units']['Other']) | |
strike3hr = observation.Units(self.device_obs['strike3hr'], config['Units']['Other']) | |
strikeToday = observation.Units(self.derive_obs['strikeCount']['today'], config['Units']['Other']) | |
strikeMonth = observation.Units(self.derive_obs['strikeCount']['month'], config['Units']['Other']) | |
strikeYear = observation.Units(self.derive_obs['strikeCount']['year'], config['Units']['Other']) | |
# Convert derived variable units from obs_sky and obs_st observations | |
if device_type in ('obs_sky', 'obs_st', 'obs_all'): | |
rainRate = observation.Units(self.derive_obs['rainRate'], config['Units']['Precip']) | |
todayRain = observation.Units(self.derive_obs['rainAccum']['today'], config['Units']['Precip']) | |
yesterdayRain = observation.Units(self.derive_obs['rainAccum']['yesterday'], config['Units']['Precip']) | |
monthRain = observation.Units(self.derive_obs['rainAccum']['month'], config['Units']['Precip']) | |
yearRain = observation.Units(self.derive_obs['rainAccum']['year'], config['Units']['Precip']) | |
radiation = observation.Units(self.device_obs['radiation'], config['Units']['Other']) | |
uvIndex = observation.Units(self.derive_obs['uvIndex'], config['Units']['Other']) | |
peakSun = observation.Units(self.derive_obs['peakSun'], config['Units']['Other']) | |
windSpd = observation.Units(self.derive_obs['windSpd'], config['Units']['Wind']) | |
windDir = observation.Units(self.derive_obs['windDir'], config['Units']['Direction']) | |
windGust = observation.Units(self.device_obs['windGust'], config['Units']['Wind']) | |
windAvg = observation.Units(self.derive_obs['windAvg'], config['Units']['Wind']) | |
windMax = observation.Units(self.derive_obs['gustMax'], config['Units']['Wind']) | |
# Convert derived variable units from obs_in_air observations | |
if device_type in ('obs_in_air', 'obs_all'): | |
inTemp = observation.Units(self.device_obs['inTemp'], config['Units']['Temp']) | |
inTempMax = observation.Units(self.derive_obs['inTempMax'], config['Units']['Temp']) | |
inTempMin = observation.Units(self.derive_obs['inTempMin'], config['Units']['Temp']) | |
# Convert derived variable units from rapid_wind observations | |
if device_type in ('rapid_wind', 'obs_all'): | |
rapidWindSpd = observation.Units(self.device_obs['rapidWindSpd'], config['Units']['Wind']) | |
rapidWindDir = observation.Units(self.derive_obs['rapidWindDir'], 'degrees') | |
# Convert derived variable units from available evt_strike observations | |
if device_type in ('evt_strike', 'obs_all'): | |
strikeDist = observation.Units(self.device_obs['strikeDist'], config['Units']['Distance']) | |
strikeDeltaT = observation.Units(self.derive_obs['strikeDeltaT'], config['Units']['Other']) | |
# Format derived variables from obs_air and obs_st observations | |
if device_type in ('obs_out_air', 'obs_st', 'obs_all'): | |
self.display_obs['outTemp'] = observation.Format(outTemp, 'Temp') | |
self.display_obs['FeelsLike'] = observation.Format(feelsLike, 'Temp') | |
self.display_obs['DewPoint'] = observation.Format(dewPoint, 'Temp') | |
self.display_obs['outTempDiff'] = observation.Format(outTempDiff, 'Temp') | |
self.display_obs['outTempTrend'] = observation.Format(outTempTrend, 'Temp') | |
self.display_obs['outTempMax'] = observation.Format(outTempMax, ['Temp', 'Time'], config) | |
self.display_obs['outTempMin'] = observation.Format(outTempMin, ['Temp', 'Time'], config) | |
self.display_obs['Humidity'] = observation.Format(humidity, 'Humidity') | |
self.display_obs['SLP'] = observation.Format(SLP, 'Pressure') | |
self.display_obs['SLPTrend'] = observation.Format(SLPTrend, 'Pressure') | |
self.display_obs['SLPMax'] = observation.Format(SLPMax, ['Pressure', 'Time'], config) | |
self.display_obs['SLPMin'] = observation.Format(SLPMin, ['Pressure', 'Time'], config) | |
self.display_obs['StrikeDist'] = observation.Format(strikeDist, 'StrikeDistance') | |
self.display_obs['StrikeDeltaT'] = observation.Format(strikeDeltaT, 'TimeDelta') | |
self.display_obs['StrikeFreq'] = observation.Format(strikeFreq, 'StrikeFrequency') | |
self.display_obs['Strikes3hr'] = observation.Format(strike3hr, 'StrikeCount') | |
self.display_obs['StrikesToday'] = observation.Format(strikeToday, 'StrikeCount') | |
self.display_obs['StrikesMonth'] = observation.Format(strikeMonth, 'StrikeCount') | |
self.display_obs['StrikesYear'] = observation.Format(strikeYear, 'StrikeCount') | |
# Format derived variables from obs_sky and obs_st observations | |
if device_type in ('obs_sky', 'obs_st', 'obs_all'): | |
self.display_obs['Radiation'] = observation.Format(radiation, 'Radiation') | |
self.display_obs['UVIndex'] = observation.Format(uvIndex, 'UV') | |
self.display_obs['peakSun'] = observation.Format(peakSun, 'peakSun') | |
self.display_obs['RainRate'] = observation.Format(rainRate, 'Precip') | |
self.display_obs['TodayRain'] = observation.Format(todayRain, 'Precip') | |
self.display_obs['YesterdayRain'] = observation.Format(yesterdayRain, 'Precip') | |
self.display_obs['MonthRain'] = observation.Format(monthRain, 'Precip') | |
self.display_obs['YearRain'] = observation.Format(yearRain, 'Precip') | |
self.display_obs['WindSpd'] = observation.Format(windSpd, 'Wind') | |
self.display_obs['WindGust'] = observation.Format(windGust, 'Wind') | |
self.display_obs['AvgWind'] = observation.Format(windAvg, 'Wind') | |
self.display_obs['MaxGust'] = observation.Format(windMax, ['Wind', 'Time'], config) | |
self.display_obs['WindDir'] = observation.Format(windDir, 'Direction') | |
# Format derived variables from obs_in_air observations | |
if device_type in ('obs_in_air', 'obs_all'): | |
self.display_obs['inTemp'] = observation.Format(inTemp, 'Temp') | |
self.display_obs['inTempMax'] = observation.Format(inTempMax, ['Temp', 'Time'], config) | |
self.display_obs['inTempMin'] = observation.Format(inTempMin, ['Temp', 'Time'], config) | |
# Format derived variables from rapid_wind observations | |
if device_type in ('rapid_wind', 'obs_all'): | |
self.display_obs['rapidSpd'] = observation.Format(rapidWindSpd, 'Wind') | |
self.display_obs['rapidDir'] = observation.Format(rapidWindDir, 'Direction') | |
# Format derived variables from evt_strike observations | |
if device_type in ('evt_strike', 'obs_all'): | |
self.display_obs['StrikeDist'] = observation.Format(strikeDist, 'StrikeDistance') | |
self.display_obs['StrikeDeltaT'] = observation.Format(strikeDeltaT, 'TimeDelta') | |
# Update display with new variables | |
self.update_display(device_type) | |
def reformat_display(self): | |
while self.app.websocket_client.activeThreads(): | |
pass | |
self.format_derived_variables(self.app.config, 'obs_all') | |
def resetDisplay(self): | |
while self.app.websocket_client.activeThreads(): | |
pass | |
self.display_obs = properties.Obs() | |
self.device_obs = device_obs.copy() | |
self.derive_obs = derive_obs.copy() | |
self.api_data = {} | |
self.update_display('obs_reset') | |
@mainthread | |
def update_display(self, ob_type): | |
""" Update display with new variables derived from latest websocket | |
message | |
INPUTS: | |
ob_type Latest Websocket message type | |
""" | |
# Update display values with new derived observations | |
reference_error = False | |
for Key, Value in list(self.display_obs.items()): | |
if not (ob_type == 'obs_all' and 'rapid' in Key): | |
try: # Don't update rapidWind display when type is 'all' | |
self.app.CurrentConditions.Obs[Key] = Value # as the RapidWind rose is not animated in this case | |
except ReferenceError: | |
if not reference_error: | |
Logger.warning(f'obs_parser: {system().log_time()} - Reference error {ob_type}') | |
reference_error = True | |
# Update display graphics with new derived observations | |
if ob_type == 'rapid_wind': | |
if hasattr(self.app, 'WindSpeedPanel'): | |
for panel in getattr(self.app, 'WindSpeedPanel'): | |
panel.animateWindRose() | |
elif ob_type == 'evt_strike': | |
if self.app.config['Display']['LightningPanel'] == '1': | |
for ii, button in enumerate(self.app.CurrentConditions.button_list): | |
if "Lightning" in button[2]: | |
self.app.CurrentConditions.switchPanel([], button) | |
if hasattr(self.app, 'LightningPanel'): | |
for panel in getattr(self.app, 'LightningPanel'): | |
panel.setLightningBoltIcon() | |
panel.animateLightningBoltIcon() | |
else: | |
if ob_type in ['obs_st', 'obs_air', 'obs_all', 'obs_reset']: | |
if hasattr(self.app, 'TemperaturePanel'): | |
for panel in getattr(self.app, 'TemperaturePanel'): | |
panel.setFeelsLikeIcon() | |
if hasattr(self.app, 'LightningPanel'): | |
for panel in getattr(self.app, 'LightningPanel'): | |
panel.setLightningBoltIcon() | |
if hasattr(self.app, 'BarometerPanel'): | |
for panel in getattr(self.app, 'BarometerPanel'): | |
panel.setBarometerArrow() | |
if ob_type in ['obs_st', 'obs_sky', 'obs_all', 'obs_reset']: | |
if hasattr(self.app, 'WindSpeedPanel'): | |
for panel in getattr(self.app, 'WindSpeedPanel'): | |
panel.setWindIcons() | |
if hasattr(self.app, 'SunriseSunsetPanel'): | |
for panel in getattr(self.app, 'SunriseSunsetPanel'): | |
panel.setUVBackground() | |
if hasattr(self.app, 'RainfallPanel'): | |
for panel in getattr(self.app, 'RainfallPanel'): | |
panel.animate_rain_rate() | |
if hasattr(self.app, 'TemperaturePanel'): | |
for panel in getattr(self.app, 'TemperaturePanel'): | |
panel.setFeelsLikeIcon() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## WeatherFlow PiConsole: Raspberry Pi Python console for WeatherFlow Tempest | |
## and Smart Home Weather stations. | |
## Copyright (C) 2018-2023 Peter Davis | |
## This program is free software: you can redistribute it and/or modify it under | |
## the terms of the GNU General Public License as published by the Free Software | |
## Foundation, either version 3 of the License, or (at your option) any later | |
## version. | |
## This program is distributed in the hope that it will be useful, but WITHOUT | |
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
## FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | |
## details. | |
## You should have received a copy of the GNU General Public License along with | |
## this program. If not, see <http://www.gnu.org/licenses/>. | |
## ============================================================================= | |
## WIND SPEED AND DIRECTION PANEL AND BUTTON | |
## ============================================================================= | |
<WindSpeedPanel>: | |
## Panel background and title | |
PanelBackground: | |
_panelTitle: 'Wind Speed' | |
_panelStatus: app.CurrentConditions.Status['sky_last_sample'] if app.config['Station']['SkyID'] else app.CurrentConditions.Status['tempest_last_sample'] | |
## Rapid Wind wind speed and direction | |
windRose: | |
Image: | |
source: 'icons/windRose/icons/' + str(round(root.rapidWindDir)) + app.scaleSuffix | |
pos_hint: {'x': 0, 'y': 0} | |
size_hint: (1, 1) | |
keep_ratio: 0 | |
allow_stretch: 1 | |
## Current average wind speed | |
LargeField: | |
text: app.CurrentConditions.Obs['WindSpd'][0] | |
pos_hint: {'x': 3/262, 'y': 95/202} | |
size_hint_x: (60/262) | |
MediumField: | |
text: app.CurrentConditions.Obs['WindSpd'][1] | |
pos_hint: {'x': 3/262, 'y': 78/202} | |
size_hint_x: (60/262) | |
## Daily averaged wind speed | |
SmallField: | |
text: 'Wind' | |
pos_hint: {'x': 3/262, 'y': 178/202} | |
size_hint_x: (65/262) | |
SmallField: | |
text: 'Avg [color=ff8837ff]' + app.CurrentConditions.Obs['AvgWind'][0] + '[/color] ' + app.CurrentConditions.Obs['AvgWind'][1] | |
pos_hint: {'x': 3/262, 'y': 161/202} | |
size_hint_x: (101/262) | |
## Current wind gust | |
LargeField: | |
text: app.CurrentConditions.Obs['WindGust'][0] | |
pos_hint: {'x': 201/262, 'y': 95/202} | |
size_hint_x: (60/262) | |
MediumField: | |
text: app.CurrentConditions.Obs['WindGust'][1] | |
pos_hint: {'x': 201/262, 'y': 78/202} | |
size_hint_x: (60/262) | |
## Maximum wind gust | |
SmallField: | |
text: 'Gust' | |
pos_hint: {'x': 142/262, 'y': 178/202} | |
size_hint_x: (165/262) | |
SmallField: | |
text: 'Max [color=ff8837ff]' + app.CurrentConditions.Obs['MaxGust'][0] + '[/color] ' + app.CurrentConditions.Obs['MaxGust'][1] | |
pos_hint: {'x': 159/262, 'y': 161/202} | |
size_hint_x: (101/262) | |
SmallField: | |
text: '--:--' if not isinstance(app.CurrentConditions.Obs['MaxGust'], list) else app.CurrentConditions.Obs['MaxGust'][2] | |
pos_hint: {'x': 180/262, 'y': 144/202} | |
size_hint_x: (65.5/262) | |
## Current Beaufort scale text and icon | |
Image: | |
source: 'icons/windSpd/' + root.windSpdIcon + app.scaleSuffix | |
pos_hint: {'x': 18/262, 'y': 28/202} | |
size_hint: (30/262, 30/202) | |
keep_ratio: 0 | |
allow_stretch: 1 | |
SmallField: | |
text: app.CurrentConditions.Obs['WindSpd'][4] | |
pos_hint: {'x': 6/262, 'y': 6/202} | |
size_hint: (126/262, 17/202) | |
text_size: self.size | |
halign: 'left' | |
## Current average wind direction | |
Image: | |
source: 'icons/windDir/' + root.windDirIcon + app.scaleSuffix | |
pos_hint: {'x': 216/262, 'y': 28/202} | |
size_hint: (30/262, 30/202) | |
keep_ratio: 0 | |
allow_stretch: 1 | |
SmallField: | |
text: 'Direction: [color=9aba2fff]' + app.CurrentConditions.Obs['WindDir'][0] + app.CurrentConditions.Obs['WindDir'][1] + '[/color]' | |
pos_hint: {'x': 119/262, 'y': 6/202} | |
size_hint: (135/262, 17/202) | |
text_size: self.size | |
halign: 'right' | |
## Rapid wind direction in degrees | |
MediumField: | |
text: app.CurrentConditions.Obs['rapidDir'][0] + app.CurrentConditions.Obs['rapidDir'][1] | |
pos_hint: {'x': 102/262, 'y': 116/202} | |
size_hint_x: (60/262) | |
## Rapid wind speed | |
MediumField: | |
text: app.CurrentConditions.Obs['rapidSpd'][0] + ' [size=' + str(int(self.font_size*0.8)) + ']' + app.CurrentConditions.Obs['rapidSpd'][1] + '[/size]' | |
pos_hint: {'x': 92/262, 'y': 89/202} | |
size_hint_x: (80/262) | |
## Rapid wind direction text | |
SmallField: | |
text: app.CurrentConditions.Obs['rapidDir'][3] | |
pos_hint: {'x': 92/262, 'y': 65.5/202} | |
size_hint_x: (80/262) | |
<WindSpeedButton>: | |
PanelButton: | |
text: 'Wind Speed' | |
on_release: app.CurrentConditions.switchPanel(self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment