Skip to content

Instantly share code, notes, and snippets.

@karlkec
Last active January 16, 2024 17:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karlkec/7052c0db8eb98ea1bc597e49efcf74d2 to your computer and use it in GitHub Desktop.
Save karlkec/7052c0db8eb98ea1bc597e49efcf74d2 to your computer and use it in GitHub Desktop.
Weatherflow Pi Console mod to display time of max wind gust
""" Returns the derived weather variables required by the Raspberry Pi Python
console for WeatherFlow Tempest and Smart Home Weather stations.
Copyright (C) 2018-2023 Peter Davis
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
# Import required library modules
from lib.request_api import weatherflow_api
from lib.system import system
from lib import derivedVariables as derive
# Import required Python modules
from kivy.logger import Logger
from datetime import datetime
import bisect
import ephem
import math
import pytz
import time
def dewPoint(outTemp, humidity):
""" Calculate the dew point from the temperature and relative humidity
INPUTS:
outTemp Temperature from AIR module [C]
humidity Relative humidity from AIR module [%]
OUTPUT:
DewPoint Dew point [C]
"""
# Return None if required variables are missing
errorOutput = [None, 'c']
if outTemp[0] is None:
Logger.warning(f'dewPoint: {system().log_time()} - outTemp is None')
return errorOutput
elif humidity[0] is None:
Logger.warning(f'dewPoint: {system().log_time()} - humidity is None')
return errorOutput
# Calculate dew point
if humidity[0] > 0:
A = 17.625
B = 243.04
N = B * (math.log(humidity[0] / 100.0) + (A * outTemp[0]) / (B + outTemp[0]))
D = A - math.log(humidity[0] / 100.0) - (A * outTemp[0]) / (B + outTemp[0])
dewPoint = N / D
else:
dewPoint = None
# Return Dew Point
return [dewPoint, 'c']
def feelsLike(outTemp, humidity, windSpd, config):
""" Calculate the Feels Like temperature from the temperature, relative
humidity, and wind speed
INPUTS:
Temp Temperature from AIR module [C]
Humidity Relative humidity from AIR module [%]
windSpd Wind speed from SKY module [m/s]
config Station configuration
OUTPUT:
FeelsLike Feels Like temperature [C]
"""
# Return None if required variables are missing
errorOutput = [None, 'c', '-', '-']
if outTemp[0] is None:
Logger.warning(f'feelsLike: {system().log_time()} - outTemp is None')
return errorOutput
elif humidity[0] is None:
Logger.warning(f'feelsLike: {system().log_time()} - humidity is None')
return errorOutput
elif windSpd[0] is None:
Logger.warning(f'feelsLike: {system().log_time()} - windSpd is None')
return errorOutput
# Convert observation units as required
TempF = [outTemp[0] * (9 / 5) + 32, 'f']
WindMPH = [windSpd[0] * 2.2369362920544, 'mph']
WindKPH = [windSpd[0] * 3.6, 'kph']
# If temperature is less than 10 degrees celcius and wind speed is higher
# than 3 mph, calculate wind chill using the Joint Action Group for
# Temperature Indices formula
if outTemp[0] <= 10 and WindMPH[0] > 3:
WindChill = (+ 13.12 + 0.6215 * outTemp[0]
- 11.37 * (WindKPH[0])**0.16 + 0.3965 * outTemp[0]
* (WindKPH[0])**0.16)
FeelsLike = [WindChill, 'c']
# If temperature is at or above 80 degress farenheit (26.67 C), and humidity
# is at or above 40%, calculate the Heat Index
elif TempF[0] >= 80 and humidity[0] >= 40:
HeatIndex = (-42.379 + (2.04901523 * TempF[0])
+ (10.1433127 * humidity[0])
- (0.22475541 * TempF[0] * humidity[0])
- (6.83783e-3 * TempF[0]**2)
- (5.481717e-2 * humidity[0]**2)
+ (1.22874e-3 * TempF[0]**2 * humidity[0])
+ (8.5282e-4 * TempF[0] * humidity[0]**2)
- (1.99e-6 * TempF[0]**2 * humidity[0]**2))
FeelsLike = [(HeatIndex - 32) * (5 / 9), 'c']
# Else set Feels Like temperature to observed temperature
else:
FeelsLike = outTemp
# Define 'FeelsLike' temperature cutoffs
Cutoffs = [float(item) for item in list(config['FeelsLike'].values())]
# Define 'FeelsLike temperature text and icon
Description = ['Feeling extremely cold', 'Feeling freezing cold', 'Feeling very cold',
'Feeling cold', 'Feeling mild', 'Feeling warm', 'Feeling hot',
'Feeling very hot', 'Feeling extremely hot', '-']
Icon = ['ExtremelyCold', 'FreezingCold', 'VeryCold', 'Cold', 'Mild', 'Warm',
'Hot', 'VeryHot', 'ExtremelyHot', '-']
if config['Units']['Temp'] == 'f':
Ind = bisect.bisect(Cutoffs, FeelsLike[0] * (9 / 5) + 32)
else:
Ind = bisect.bisect(Cutoffs, FeelsLike[0])
# Return 'Feels Like' temperature
return [FeelsLike[0], FeelsLike[1], Description[Ind], Icon[Ind]]
def SLP(pressure, device, config):
""" Calculate the sea level pressure from the station pressure
INPUTS:
pressure Station pressure from AIR/TEMPEST module [mb]
device Device ID that observation originates from
config Station configuration
OUTPUT:
SLP Sea level pressure [mb]
"""
# Return None if required variables are missing
errorOutput = [None, 'mb', None]
if pressure[0] is None:
Logger.warning(f'SLP: {system().log_time()} - pressure is None')
return errorOutput
# Extract required configuration variables
elevation = config['Station']['Elevation']
if str(device) == config['Station']['OutAirID']:
height = config['Station']['OutAirHeight']
elif str(device) == config['Station']['TempestID']:
height = config['Station']['TempestHeight']
# Define required constants
P0 = 1013.25
Rd = 287.05
GammaS = 0.0065
g = 9.80665
T0 = 288.15
elevation = float(elevation) + float(height)
# Calculate and return sea level pressure
SLP = (pressure[0]
* (1 + ((P0 / pressure[0])**((Rd * GammaS) / g))
* ((GammaS * elevation) / T0))**(g / (Rd * GammaS))
)
return [SLP, 'mb', SLP]
def SLPTrend(pressure, obTime, device, apiData, config):
""" Calculate the pressure trend from the sea level pressure over the last
three hours
INPUTS:
pressure Current station pressure [mb]
obTime Time of latest observation [s]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
Trend Sea level pressure trend [mb]
"""
# Return None if required variables are missing
errorOutput = [None, 'mb/hr', '-', '-']
if pressure[0] is None:
Logger.warning(f'SLPTrend: {system().log_time()} - pressure is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'SLPTrend: {system().log_time()} - obTime is None')
return errorOutput
# Define index of pressure in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 1
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 6
# Extract required observations from WeatherFlow API data based on device
# type indicated in API call
if '24Hrs' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'):
data24hrs = apiData[device]['24Hrs'].json()['obs']
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None]
apiPres = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None]
try:
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime]
if min(dTime) < 5 * 60:
pres3h = [apiPres[dTime.index(min(dTime))], 'mb']
time3h = [apiTime[dTime.index(min(dTime))], 's']
pres0h = pressure
time0h = obTime
else:
Logger.warning(f'SLPTrend: {system().log_time()} - no data in 3 hour window')
return errorOutput
except Exception as Error:
Logger.warning(f'SLPTrend: {system().log_time()} - {Error}')
return errorOutput
else:
return errorOutput
# Convert station pressure into sea level pressure
pres3h = SLP(pres3h, device, config)
pres0h = SLP(pres0h, device, config)
# Calculate three hour temperature trend
try:
Trend = (pres0h[0] - pres3h[0]) / ((time0h[0] - time3h[0]) / 3600)
except Exception as Error:
Logger.warning(f'SLPTrend: {system().log_time()} - {Error}')
return errorOutput
# Define pressure trend text
if Trend > 2 / 3:
TrendTxt = '[color=ff8837ff]Rising rapidly[/color]'
elif Trend >= 1 / 3:
TrendTxt = '[color=ff8837ff]Rising[/color]'
elif Trend <= -2 / 3:
TrendTxt = '[color=00a4b4ff]Falling rapidly[/color]'
elif Trend <= -1 / 3:
TrendTxt = '[color=00a4b4ff]Falling[/color]'
else:
TrendTxt = '[color=9aba2fff]Steady[/color]'
# Define weather tendency based on pressure and trend
if pres0h[0] >= 1023:
if 'Falling rapidly' in TrendTxt:
Tendency = 'Becoming cloudy and warmer'
else:
Tendency = 'Fair conditions likely'
elif 1009 < pres0h[0] < 1023:
if 'Falling rapidly' in TrendTxt:
Tendency = 'Rainy conditions likely'
else:
Tendency = 'Conditions unchanged'
elif pres0h[0] <= 1009:
if 'Falling rapidly' in TrendTxt:
Tendency = 'Stormy conditions likely'
elif 'Falling' in TrendTxt:
Tendency = 'Rainy conditions likely'
else:
Tendency = 'Becoming clearer and cooler'
else:
Tendency = '-'
# Return pressure trend
return [Trend, 'mb/hr', TrendTxt, Tendency]
def SLPMax(pressure, obTime, maxPres, device, apiData, config):
""" Calculate maximum pressure since midnight station time
INPUTS:
Time Current observation time [s]
Temp Current pressure [mb]
maxPres Current maximum pressure [mb]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
maxPres Maximum pressure [mb]
"""
# Return None if required variables are missing
errorOutput = [None, 'mb', '-', None, time.time()]
if pressure[0] is None:
Logger.warning(f'SLPMax: {system().log_time()} - pressure is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'SLPMax: {system().log_time()} - obTime is None')
return errorOutput
# Calculate sea level pressure
SLP = derive.SLP(pressure, device, config)
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of temperature in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 1
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 6
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily maximum and minimum pressure
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
obTime = [item[0] for item in dataToday if item[index_bucket_a] is not None]
pressure = [[item[index_bucket_a], 'mb'] for item in dataToday if item[index_bucket_a] is not None]
SLP = [derive.SLP(P, device, config) for P in pressure]
try:
maxPres = [max(SLP)[0], 'mb', obTime[SLP.index(max(SLP))], 's', max(SLP)[0], obTime[SLP.index(max(SLP))]]
except Exception as Error:
Logger.warning(f'SLPMax: {system().log_time()} - {Error}')
maxPres = errorOutput
else:
maxPres = errorOutput
# Else if midnight has passed, reset maximum pressure
elif Now.date() > datetime.fromtimestamp(maxPres[5], Tz).date():
maxPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]]
# Else if current pressure is greater than maximum recorded pressure, update
# maximum pressure
elif SLP[0] > maxPres[4]:
maxPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]]
# Else maximum pressure unchanged, return existing values
else:
maxPres = [maxPres[4], 'mb', maxPres[2], 's', maxPres[4], obTime[0]]
# Return required variables
return maxPres
def SLPMin(pressure, obTime, minPres, device, apiData, config):
""" Calculate minimum pressure since midnight station time
INPUTS:
pressure Current pressure [mb]
obTime Current observation time [s]
minPres Current minimum pressure [mb]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
minPres Minumum pressure [mb]
"""
# Return None if required variables are missing
errorOutput = [None, 'mb', '-', None, time.time()]
if pressure[0] is None:
Logger.warning(f'SLPMin: {system().log_time()} - pressure is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'SLPMin: {system().log_time()} - obTime is None')
return errorOutput
# Calculate sea level pressure
SLP = derive.SLP(pressure, device, config)
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of temperature in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 1
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 6
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily maximum and minimum pressure
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
obTime = [item[0] for item in dataToday if item[index_bucket_a] is not None]
pressure = [[item[index_bucket_a], 'mb'] for item in dataToday if item[index_bucket_a] is not None]
SLP = [derive.SLP(P, device, config) for P in pressure]
try:
minPres = [min(SLP)[0], 'mb', obTime[SLP.index(min(SLP))], 's', min(SLP)[0], obTime[SLP.index(min(SLP))]]
except Exception as Error:
Logger.warning(f'SLPMin: {system().log_time()} - {Error}')
minPres = errorOutput
else:
minPres = errorOutput
# Else if midnight has passed, reset maximum and minimum pressure
elif Now.date() > datetime.fromtimestamp(minPres[5], Tz).date():
minPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]]
# Else if current pressure is less than minimum recorded pressure, update
# minimum pressure and time
elif SLP[0] < minPres[4]:
minPres = [SLP[0], 'mb', obTime[0], 's', SLP[0], obTime[0]]
# Else minimum pressure unchanged, return existing values
else:
minPres = [minPres[4], 'mb', minPres[2], 's', minPres[4], obTime[0]]
# Return required variables
return minPres
def tempDiff(outTemp, obTime, device, apiData, config):
""" Calculate 24 hour temperature difference
INPUTS:
outTemp Current temperature [deg C]
obTime Current observation time [s]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
dTemp 24 hour temperature difference [deg C]
"""
# Return None if required variables are missing
errorOutput = [None, 'dc', '-']
if outTemp[0] is None:
Logger.warning(f'tempDiff: {system().log_time()} - outTemp is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'tempDiff: {system().log_time()} - obTime is None')
return errorOutput
# Define index of temperature in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 2
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 7
# Extract required observations from WeatherFlow API data based on device
# type indicated in API call
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'):
data24hrs = apiData[device]['24Hrs'].json()['obs']
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None]
apiTemp = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None]
try:
dTime = obTime[0] - apiTime[0]
if dTime > 86400 - (5 * 60) and dTime < 86400 + (5 * 60):
temp24h = apiTemp[0]
temp0h = outTemp[0]
else:
Logger.warning(f'tempDiff: {system().log_time()} - no data in 24 hour window')
return errorOutput
except Exception as Error:
Logger.warning(f'tempDiff: {system().log_time()} - {Error}')
return errorOutput
else:
return errorOutput
# Calculate 24 hour temperature Difference
try:
dTemp = temp0h - temp24h
except Exception as Error:
Logger.warning(f'tempDiff: {system().log_time()} - {Error}')
return errorOutput
# Define temperature difference text
if abs(dTemp) < 0.05:
diffTxt = '[color=c8c8c8ff][/color]'
elif dTemp > 0:
diffTxt = '[color=f05e40ff] warmer[/color]'
elif dTemp < 0:
diffTxt = '[color=00a4b4ff] colder[/color]'
# Return 24 hour temperature difference
return [dTemp, 'dc', diffTxt]
def tempTrend(outTemp, obTime, device, apiData, config):
""" Calculate 3 hour temperature trend
INPUTS:
outTemp Current temperature [deg C]
obTime Current observation time [s]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
Trend 24 hour temperature difference [deg C]
"""
# Return None if required variables are missing
errorOutput = [None, 'c/hr', 'c8c8c8ff']
if outTemp[0] is None:
Logger.warning(f'tempTrend: {system().log_time()} - outTemp is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'tempTrend: {system().log_time()} - obTime is None')
return errorOutput
# Define index of temperature in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 2
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 7
# Extract required observations from WeatherFlow API data based on device
# type indicated in API call
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'):
data24hrs = apiData[device]['24Hrs'].json()['obs']
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None]
apiTemp = [ob[index_bucket_a] for ob in data24hrs if ob[index_bucket_a] is not None]
try:
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime]
if min(dTime) < 5 * 60:
temp3h = apiTemp[dTime.index(min(dTime))]
time3h = apiTime[dTime.index(min(dTime))]
temp0h = outTemp[0]
time0h = obTime[0]
else:
Logger.warning(f'tempTrend: {system().log_time()} - no data in 3 hour window')
return errorOutput
except Exception as Error:
Logger.warning(f'tempTrend: {system().log_time()} - {Error}')
return errorOutput
else:
return errorOutput
# Calculate three hour temperature trend
try:
Trend = (temp0h - temp3h) / ((time0h - time3h) / 3600)
except Exception as Error:
Logger.warning(f'tempTrend: {system().log_time()} - {Error}')
return errorOutput
# Define temperature trend color
if abs(Trend) < 0.05:
Color = 'c8c8c8ff'
elif Trend > 0:
Color = 'f05e40ff'
elif Trend < 1 / 3:
Color = '00a4b4ff'
# Return temperature trend
return [Trend, 'c/hr', Color]
def tempMax(Temp, obTime, maxTemp, device, apiData, config):
""" Calculate maximum temperature for specified device since midnight
station time
INPUTS:
Temp Current temperature [deg C]
obTime Current observation time [s]
maxTemp Current maximum temperature [deg C]
device Device ID that observation originates from
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
maxTemp Maximum temperature [deg C]
"""
# Return None if required variables are missing
errorOutput = [None, 'c', '-', None, time.time()]
if Temp[0] is None:
Logger.warning(f'tempMax: {system().log_time()} - Temp is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'tempMax: {system().log_time()} - obTime is None')
return errorOutput
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of temperature in websocket packets
if (str(device) == config['Station']['OutAirID']
or str(device) == config['Station']['InAirID']):
index_bucket_a = 2
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 7
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily maximum temperature
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
apiTime = [item[0] for item in dataToday if item[index_bucket_a] is not None]
apiTemp = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None]
try:
maxTemp = [max(apiTemp), 'c', apiTime[apiTemp.index(max(apiTemp))], 's', max(apiTemp), apiTime[apiTemp.index(max(apiTemp))]]
except Exception as Error:
Logger.warning(f'tempMax: {system().log_time()} - {Error}')
maxTemp = errorOutput
else:
maxTemp = errorOutput
# Else if midnight has passed, reset maximum temperature
elif Now.date() > datetime.fromtimestamp(maxTemp[5], Tz).date():
maxTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]]
# Else if current temperature is greater than maximum recorded temperature,
# update maximum temperature
elif Temp[0] > maxTemp[4]:
maxTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]]
# Else maximum temperature unchanged, return existing values
else:
maxTemp = [maxTemp[4], 'c', maxTemp[2], 's', maxTemp[4], obTime[0]]
# Return required variables
return maxTemp
def tempMin(Temp, obTime, minTemp, device, apiData, config):
""" Calculate minimum temperature for specified device since midnight
station time
INPUTS:
Temp Current temperature [deg C]
obTime Current observation time [s]
minTemp Current minimum temperature [deg C]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
minTemp Minumum temperature [deg C]
"""
# Return None if required variables are missing
errorOutput = [None, 'c', '-', None, time.time()]
if Temp[0] is None:
Logger.warning(f'tempMin: {system().log_time()} - Temp is None')
return errorOutput
elif obTime[0] is None:
Logger.warning(f'tempMin: {system().log_time()} - obTime is None')
return errorOutput
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of temperature in websocket packets
if (str(device) == config['Station']['OutAirID']
or str(device) == config['Station']['InAirID']):
index_bucket_a = 2
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 7
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily minimum temperature
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
apiTime = [item[0] for item in dataToday if item[index_bucket_a] is not None]
apiTemp = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None]
try:
minTemp = [min(apiTemp), 'c', apiTime[apiTemp.index(min(apiTemp))], 's', min(apiTemp), apiTime[apiTemp.index(min(apiTemp))]]
except Exception as Error:
Logger.warning(f'tempMin: {system().log_time()} - {Error}')
minTemp = errorOutput
else:
minTemp = errorOutput
# Else if midnight has passed, reset minimum temperature
elif Now.date() > datetime.fromtimestamp(minTemp[5], Tz).date():
minTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]]
# Else if current temperature is less than minimum recorded temperature,
# update minimum temperature
elif Temp[0] < minTemp[4]:
minTemp = [Temp[0], 'c', obTime[0], 's', Temp[0], obTime[0]]
# Else minimum temperature unchanged, return existing values
else:
minTemp = [minTemp[4], 'c', minTemp[2], 's', minTemp[4], obTime[0]]
# Return required variables
return minTemp
def strikeDeltaT(strikeTime):
""" Calculate time since last lightning strike
INPUTS:
strikeTime Time of last lightning strike [s]
OUTPUT:
strikeDeltaT Time since last lightning strike [s]
"""
# Return None if required variables are missing
errorOutput = [None, 's', None]
if strikeTime[0] is None:
Logger.warning(f'strikeDeltaT: {system().log_time()} - strikeTime is None')
return errorOutput
# Calculate time since last lightning strike
deltaT = time.time() - strikeTime[0]
deltaT = [deltaT, 's', deltaT]
# Return time since and distance to last lightning strike
return deltaT
def strikeFrequency(obTime, device, apiData, config):
""" Calculate lightning strike frequency over the previous 10 minutes and
three hours
INPUTS:
obTime Time of latest observation
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
strikeFrequency Strike frequency over the previous 10 [Count]
minutes and three hours
"""
# Return None if required variables are missing
errorOutput = [None, '/min', None, '/min']
if obTime[0] is None:
Logger.warning(f'strikeFreq: {system().log_time()} - obTime is None')
return errorOutput
# Define index of total lightning strike counts in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 4
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 15
# Extract lightning strike count over the last three hours. Return None for
# strikeFrequency if API call has failed
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'):
data24hrs = apiData[device]['24Hrs'].json()['obs']
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None]
try:
dTime = [abs(T - (obTime[0] - 3 * 3600)) for T in apiTime]
if min(dTime) < 5 * 60:
count3h = [ob[index_bucket_a] for ob in data24hrs[dTime.index(min(dTime)):] if ob[index_bucket_a] is not None]
else:
Logger.warning(f'strikeFreq: {system().log_time()} - no data in 3 hour window')
count3h = None
except Exception as Error:
Logger.warning(f'strikeFreq: {system().log_time()} - {Error}')
count3h = None
else:
count3h = None
# Calculate average strike frequency over the last three hours
if count3h is not None:
activeStrikes = [count for count in count3h if count > 0]
if len(activeStrikes) > 0:
strikeFrequency3h = [sum(activeStrikes) / len(activeStrikes), '/min']
else:
strikeFrequency3h = [0.0, '/min']
else:
strikeFrequency3h = [None, '/min']
# Extract lightning strike count over the last 10 minutes. Return None for
# strikeFrequency if API call has failed
if '24Hrs' in apiData[device] and weatherflow_api.verify_response(apiData[device]['24Hrs'], 'obs'):
data24hrs = apiData[device]['24Hrs'].json()['obs']
apiTime = [ob[0] for ob in data24hrs if ob[index_bucket_a] is not None]
try:
dTime = [abs(T - (obTime[0] - 600)) for T in apiTime]
if min(dTime) < 2 * 60:
count10m = [ob[index_bucket_a] for ob in data24hrs[dTime.index(min(dTime)):] if ob[index_bucket_a] is not None]
else:
Logger.warning(f'strikeFreq: {system().log_time()} - no data in 10 minute window')
count10m = None
except Exception as Error:
Logger.warning(f'strikeFreq: {system().log_time()} - {Error}')
count10m = None
else:
count10m = None
# Calculate average strike frequency over the last 10 minutes
if count10m is not None:
activeStrikes = [count for count in count10m if count > 0]
if len(activeStrikes) > 0:
strikeFrequency10m = [sum(activeStrikes) / len(activeStrikes), '/min']
else:
strikeFrequency10m = [0.0, '/min']
else:
strikeFrequency10m = [None, '/min']
# Return strikeFrequency for last 10 minutes and last three hours
return strikeFrequency10m + strikeFrequency3h
def strikeCount(count, strikeCount, device, apiData, config):
""" Calculate the number of lightning strikes for the last day/month/year
INPUTS:
count Number of lightning strikes in the past minute [Count]
strikeCount Dictionary containing fields:
Today Number of lightning strikes today [Count]
Yesterday Number of lightning strikes in last month [Count]
Year Number of lightning strikes in last year [Count]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
strikeCount Dictionary containing fields:
Today Number of lightning strikes today [Count]
Yesterday Number of lightning strikes in last month [Count]
Year Number of lightning strikes in last year [Count]
"""
# Return None if required variables are missing
errorOutput = [None, 'count', None, time.time()]
if count[0] is None:
Logger.warning(f'strikeCount: {system().log_time()} - count is None')
todayStrikes = monthStrikes = yearStrikes = errorOutput
return {'today': todayStrikes, 'month': monthStrikes, 'year': yearStrikes}
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of total lightning strike counts in websocket packets
if str(device) == config['Station']['OutAirID']:
index_bucket_a = 4
index_bucket_e = 4
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 15
index_bucket_e = 24
# If console is initialising, download all data for current day using
# Weatherflow API and calculate total daily lightning strikes
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
apiStrikes = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None]
try:
todayStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()]
except Exception as Error:
Logger.warning(f'strikeCount: {system().log_time()} - {Error}')
todayStrikes = errorOutput
else:
todayStrikes = errorOutput
# Else if midnight has passed, reset daily lightning strike count to zero
elif Now.date() > datetime.fromtimestamp(strikeCount['today'][3], Tz).date():
todayStrikes = [count[0], 'count', count[0], time.time()]
# Else, calculate current daily lightning strike count
else:
currentCount = strikeCount['today'][2]
updatedCount = currentCount + count[0] if count[0] is not None else currentCount
todayStrikes = [updatedCount, 'count', updatedCount, time.time()]
# If console is initialising and today is the first day on the month, set
# monthly lightning strikes to current daily lightning strikes
if strikeCount['month'][0] is None and Now.day == 1:
monthStrikes = [todayStrikes[0], 'count', todayStrikes[0], time.time()]
# Else if console is initialising, calculate total monthly lightning strikes
# from the WeatherFlow API data
elif 'month' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['month'], 'obs'):
dataMonth = apiData[device]['month'].json()['obs']
apiStrikes = [item[index_bucket_e] for item in dataMonth if item[index_bucket_e] is not None]
try:
monthStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()]
if todayStrikes[0] is not None:
monthStrikes[0] += todayStrikes[0]
monthStrikes[2] += todayStrikes[2]
except Exception as Error:
Logger.warning(f'strikeCount: {system().log_time()} - {Error}')
monthStrikes = errorOutput
else:
monthStrikes = errorOutput
# Else if the end of the month has passed, reset monthly lightning strike
# count to zero
elif Now.month > datetime.fromtimestamp(strikeCount['month'][3], Tz).month:
monthStrikes = [count[0], 'count', count[0], time.time()]
# Else, calculate current monthly lightning strike count
else:
currentCount = strikeCount['month'][2]
updatedCount = currentCount + count[0] if count[0] is not None else currentCount
monthStrikes = [updatedCount, 'count', updatedCount, time.time()]
# If console is initialising and today is the first day on the year, set
# yearly lightning strikes to current daily lightning strikes
if strikeCount['year'][0] is None and Now.timetuple().tm_yday == 1:
yearStrikes = [todayStrikes[0], 'count', todayStrikes[0], time.time()]
# Else if console is initialising, calculate total yearly lightning strikes
# from the WeatherFlow API data
elif 'year' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['year'], 'obs'):
dataYear = apiData[device]['year'].json()['obs']
apiStrikes = [item[index_bucket_e] for item in dataYear if item[index_bucket_e] is not None]
try:
yearStrikes = [sum(x for x in apiStrikes), 'count', sum(x for x in apiStrikes), time.time()]
if todayStrikes[0] is not None:
yearStrikes[0] += todayStrikes[0]
yearStrikes[2] += todayStrikes[2]
except Exception as Error:
Logger.warning(f'strikeCount: {system().log_time()} - {Error}')
yearStrikes = errorOutput
else:
yearStrikes = errorOutput
# Else if the end of the year has passed, reset monthly and yearly lightning
# strike count to zero
elif Now.year > datetime.fromtimestamp(strikeCount['year'][3], Tz).year:
monthStrikes = [count[0], 'count', count[0], time.time()]
yearStrikes = [count[0], 'count', count[0], time.time()]
# Else, calculate current yearly lightning strike count
else:
currentCount = strikeCount['year'][2]
updatedCount = currentCount + count[0] if count[0] is not None else currentCount
yearStrikes = [updatedCount, 'count', updatedCount, time.time()]
# Return Daily, Monthly, and Yearly lightning strike counts
return {'today': todayStrikes, 'month': monthStrikes, 'year': yearStrikes}
def rainRate(minuteRain):
""" Calculate the average windspeed since midnight station time
INPUTS:
windSpd Rain accumulation for the current minute [mm]
OUTPUT:
rainRate Current instantaneous rain rate [mm/hr]
"""
# Return None if required variables are missing
errorOutput = [None, 'mm/hr', '-', None]
if minuteRain[0] is None:
Logger.warning(f'rainRate: {system().log_time()} - minuteRain is None')
return errorOutput
# Calculate instantaneous rain rate from instantaneous rain accumulation
Rate = minuteRain[0] * 60
# Define rain rate text based on calculated
if Rate == 0:
RateText = 'Currently Dry'
elif Rate < 0.25:
RateText = 'Very Light Rain'
elif Rate < 1.0:
RateText = 'Light Rain'
elif Rate < 4.0:
RateText = 'Moderate Rain'
elif Rate < 16.0:
RateText = 'Heavy Rain'
elif Rate < 50.0:
RateText = 'Very Heavy Rain'
else:
RateText = 'Extreme Rain'
# Return instantaneous rain rate and text
return [Rate, 'mm/hr', RateText, Rate]
def rainAccumulation(dailyRain, rainAccum, device, apiData, config):
""" Calculate the rain accumulation for today/yesterday/month/year
INPUTS:
dailyRain Daily rain accumulation [mm]
rainAccum Dictionary containing fields:
Today Rain accumulation for the current day [mm]
Yesterday Rain accumulation yesterday [mm]
Month Rain accumulation for the current month [mm]
Year Rain accumulation for the current year [mm]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
rainAccum Dictionary containing fields:
Today Rain accumulation for the current day [mm]
Yesterday Rain accumulation yesterday [mm]
Month Rain accumulation for the current month [mm]
Year Rain accumulation for the current year [mm]
"""
# Return None if required variables are missing
errorOutput = [None, 'mm', None, time.time()]
if dailyRain[0] is None:
Logger.warning(f'rainAccum: {system().log_time()} - dailyRain is None')
todayRain = yesterdayRain = monthRain = yearRain = errorOutput
return {'today': todayRain, 'yesterday': yesterdayRain, 'month': monthRain, 'year': yearRain}
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of total daily rain accumulation in websocket packets
if str(device) == config['Station']['SkyID']:
index_bucket_a = 3
index_bucket_e = 3
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 12
index_bucket_e = 28
# Set current daily rainfall accumulation
todayRain = [dailyRain[0], 'mm', dailyRain[0], time.time()]
# If console is initialising, calculate yesterday's rainfall from the
# WeatherFlow API data
if 'yesterday' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['yesterday'], 'obs'):
yesterdayData = apiData[device]['yesterday'].json()['obs']
rainData = [item[index_bucket_a] for item in yesterdayData if item[index_bucket_a] is not None]
try:
yesterdayRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()]
except Exception as Error:
Logger.warning(f'rainAccum: {system().log_time()} - {Error}')
yesterdayRain = errorOutput
else:
yesterdayRain = errorOutput
# Else if midnight has passed, set yesterday rainfall accumulation equal to
# rainAccum['today'] (which still contains yesterday's accumulation)
elif Now.date() > datetime.fromtimestamp(rainAccum['today'][3], Tz).date():
yesterdayRain = [rainAccum['today'][2], 'mm', rainAccum['today'][2], time.time()]
# Else, set yesterday rainfall accumulation as unchanged
else:
yesterdayRain = [rainAccum['yesterday'][2], 'mm', rainAccum['yesterday'][2], time.time()]
# If console is initialising and today is the first day on the month, set
# monthly rainfall to current daily rainfall
if rainAccum['month'][0] is None and Now.day == 1:
monthRain = [dailyRain[0], 'mm', 0, time.time()]
# Else if console is initialising, calculate total monthly rainfall from
# the WeatherFlow API data
elif 'month' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['month'], 'obs'):
monthData = apiData[device]['month'].json()['obs']
rainData = [item[index_bucket_e] for item in monthData if item[index_bucket_e] is not None]
try:
monthRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()]
if not math.isnan(dailyRain[0]):
monthRain[0] += dailyRain[0]
except Exception as Error:
Logger.warning(f'rainAccum: {system().log_time()} - {Error}')
monthRain = errorOutput
else:
monthRain = errorOutput
# Else if the end of the month has passed, reset monthly rain accumulation
# to current daily rain accumulation
elif Now.month > datetime.fromtimestamp(rainAccum['month'][3], Tz).month:
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
monthRain = [dailyAccum, 'mm', 0, time.time()]
# Else if midnight has passed, permanently add rainAccum['Today'] (which
# still contains yesterday's accumulation) and current daily rainfall to
# monthly rain accumulation
elif Now.date() > datetime.fromtimestamp(rainAccum['month'][3], Tz).date():
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
monthRain = [rainAccum['month'][2] + rainAccum['today'][2] + dailyAccum, 'mm', rainAccum['month'][2] + rainAccum['today'][2], time.time()]
# Else, update current monthly rainfall accumulation
else:
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
monthRain = [rainAccum['month'][2] + dailyAccum, 'mm', rainAccum['month'][2], time.time()]
# If console is initialising and today is the first day on the year, set
# yearly rainfall to current daily rainfall
if rainAccum['year'][0] is None and Now.timetuple().tm_yday == 1:
yearRain = [dailyRain[0], 'mm', 0, time.time()]
# Else if console is initialising, calculate total yearly rainfall from the
# WeatherFlow API data
elif 'year' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['year'], 'obs'):
yearData = apiData[device]['year'].json()['obs']
rainData = [item[index_bucket_e] for item in yearData if item[index_bucket_e] is not None]
try:
yearRain = [sum(x for x in rainData), 'mm', sum(x for x in rainData), time.time()]
if not math.isnan(dailyRain[0]):
yearRain[0] += dailyRain[0]
except Exception as Error:
Logger.warning(f'rainAccum: {system().log_time()} - {Error}')
yearRain = errorOutput
else:
yearRain = errorOutput
# Else if the end of the year has passed, reset monthly and yearly rain
# accumulation to current daily rain accumulation
elif Now.year > datetime.fromtimestamp(rainAccum['year'][3], Tz).year:
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
yearRain = [dailyAccum, 'mm', 0, time.time()]
monthRain = [dailyAccum, 'mm', 0, time.time()]
# Else if midnight has passed, permanently add rainAccum['Today'] (which
# still contains yesterday's accumulation) and current daily rainfall to
# yearly rain accumulation
elif Now.date() > datetime.fromtimestamp(rainAccum['year'][3], Tz).date():
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
yearRain = [rainAccum['year'][2] + rainAccum['year'][2] + dailyAccum, 'mm', rainAccum['year'][2] + rainAccum['today'][2], time.time()]
# Else, calculate current yearly rain accumulation
else:
dailyAccum = dailyRain[0] if not math.isnan(dailyRain[0]) else 0
yearRain = [rainAccum['year'][2] + dailyAccum, 'mm', rainAccum['year'][2], time.time()]
# Return Daily, Monthly, and Yearly rainfall accumulation totals
return {'today': todayRain, 'yesterday': yesterdayRain, 'month': monthRain, 'year': yearRain}
def avgWindSpeed(windSpd, avgWind, device, apiData, config):
""" Calculate the average windspeed since midnight station time
INPUTS:
windSpd Current wind speed [m/s]
avgWind Current average wind speed since midnight [m/s]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
AvgWind Average wind speed since midnight [m/s]
"""
# Return None if required variables are missing
errorOutput = [None, 'mps', None, None, time.time()]
if windSpd[0] is None:
Logger.warning(f'avgSpeed: {system().log_time()} - windSpd is None')
return errorOutput
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of wind speed in websocket packets
if str(device) == config['Station']['SkyID']:
index_bucket_a = 5
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 2
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily averaged windspeed
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
todayData = apiData[device]['today'].json()['obs']
windSpd = [item[index_bucket_a] for item in todayData if item[index_bucket_a] is not None]
try:
average = sum(x for x in windSpd) / len(windSpd)
windAvg = [average, 'mps', average, len(windSpd), time.time()]
except Exception as Error:
Logger.warning(f'avgSpeed: {system().log_time()} - {Error}')
windAvg = errorOutput
else:
windAvg = errorOutput
# Else if midnight has passed, reset daily averaged wind speed
elif Now.date() > datetime.fromtimestamp(avgWind[4], Tz).date():
windAvg = [windSpd[0], 'mps', windSpd[0], 1, time.time()]
# Else, calculate current daily averaged wind speed
else:
length = avgWind[3] + 1
currentAvg = avgWind[2]
updatedAvg = (length - 1) / length * currentAvg + 1 / length * windSpd[0]
windAvg = [updatedAvg, 'mps', updatedAvg, length, time.time()]
# Return daily averaged wind speed
return windAvg
def maxWindGust(windGust, obTime, maxGust, device, apiData, config):
""" Calculate the maximum wind gust since midnight station time
INPUTS:
windGust Current wind gust [m/s]
obTime Current observation time [s]
maxGust Current maximum wind gust since midnight [m/s]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
maxGust Maximum wind gust since midnight [m/s]
"""
# Return None if required variables are missing
errorOutput = [None, 'mps', '-', None, time.time()]
if windGust[0] is None:
Logger.warning(f'maxGust: {system().log_time()} - windGust is None')
return errorOutput
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Define index of wind speed in websocket packets
if str(device) == config['Station']['SkyID']:
index_bucket_a = 6
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 3
# If console is initialising, download all data for current day using
# Weatherflow API and calculate daily maximum wind gust
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
todayData = apiData[device]['today'].json()['obs']
apiTime = [item[0] for item in todayData if item[index_bucket_a] is not None]
apiWind = [item[index_bucket_a] for item in todayData if item[index_bucket_a] is not None]
try:
maxGust = [max(apiWind), 'mps', apiTime[apiWind.index(max(apiWind))], 's', max(apiWind), apiTime[apiWind.index(max(apiWind))]]
except Exception as Error:
Logger.warning(f'maxGust: {system().log_time()} - {Error}')
maxGust = errorOutput
else:
maxGust = errorOutput
# Else if midnight has passed, reset maximum recorded wind gust
elif Now.date() > datetime.fromtimestamp(maxGust[5], Tz).date():
maxGust = [windGust[0], 'mps', obTime[0], 's', windGust[0], obTime[0]]
# Else if current gust speed is greater than maximum recorded gust speed,
# update maximum gust speed
elif windGust[0] > maxGust[4]:
maxGust = [windGust[0], 'mps', obTime[0], 's', windGust[0], obTime[0]]
# Else maximum gust speed is unchanged, return existing value
else:
maxGust = [maxGust[4], 'mps', maxGust[2], 's', maxGust[4], obTime[0]]
# Return maximum wind gust
return maxGust
def cardinalWindDir(windDir, windSpd=[1, 'mps']):
""" Defines the cardinal wind direction from the current wind direction in
degrees. Sets the wind direction as "Calm" if current wind speed is zero
INPUTS:
windDir Current wind direction [degrees]
windSpd Current wind speed [m/s]
OUTPUT:
cardinalWind Cardinal wind direction
"""
# Return None if required variables are missing
errorOutput = [windDir[0], windDir[1], '-', '-']
if windDir[0] is None and windSpd[0] != 0.0:
Logger.warning(f'cardWindDir: {system().log_time()} - windDir is None')
return errorOutput
elif windSpd[0] is None:
Logger.warning(f'cardWindDir: {system().log_time()} - windSpd is None')
return errorOutput
# Define all possible cardinal wind directions and descriptions
Direction = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N']
Description = ['Due North', 'North NE', 'North East', 'East NE', 'Due East', 'East SE', 'South East', 'South SE',
'Due South', 'South SW', 'South West', 'West SW', 'Due West', 'West NW', 'North West', 'North NW',
'Due North']
# Define actual cardinal wind direction and description based on current
# wind direction in degrees
if windSpd[0] == 0:
Direction = 'Calm'
Description = '[color=9aba2fff]Calm[/color]'
cardinalWind = [windDir[0], windDir[1], Direction, Description]
else:
Ind = int(round(windDir[0] / 22.5))
Direction = Direction[Ind]
Description = Description[Ind].split()[0] + ' [color=9aba2fff]' + Description[Ind].split()[1] + '[/color]'
cardinalWind = [windDir[0], windDir[1], Direction, Description]
# Return cardinal wind direction and description
return cardinalWind
def beaufortScale(windSpd):
""" Defines the Beaufort scale value from the current wind speed
INPUTS:
windSpd Current wind speed [m/s]
OUTPUT:
beaufortScale Beaufort Scale speed, description, and icon
"""
# Return None if required variables are missing
errorOutput = windSpd + ['-', '-', '-']
if windSpd[0] is None:
Logger.warning(f'beaufScale: {system().log_time()} - windSpd is None')
return errorOutput
# Define Beaufort scale cutoffs and Force numbers
Cutoffs = [0.5, 1.5, 3.3, 5.5, 7.9, 10.7, 13.8, 17.1, 20.7, 24.4, 28.4, 32.6]
Force = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Description = ['Calm Conditions', 'Light Air', 'Light Breeze', 'Gentle Breeze',
'Moderate Breeze', 'Fresh Breeze', 'Strong Breeze', 'Near Gale Force',
'Gale Force', 'Severe Gale Force', 'Storm Force', 'Violent Storm',
'Hurricane Force']
# Define Beaufort Scale wind speed, description, and icon
Ind = bisect.bisect(Cutoffs, windSpd[0])
Beaufort = [float(Force[Ind]), str(Force[Ind]), Description[Ind]]
# Return Beaufort Scale speed, description, and icon
return windSpd + Beaufort
def UVIndex(uvLevel):
""" Defines the UV index from the current UV level
INPUTS:
uvLevel Current UV level [m/s]
OUTPUT:
uvIndex UV index
"""
# Return None if required variables are missing
errorOutput = [None, 'index', '-', '#646464']
if uvLevel[0] is None:
Logger.warning(f'UVIndex: {system().log_time()} - uvLevel is None')
return errorOutput
# Define UV Index cutoffs and level descriptions
Cutoffs = [0, 3, 6, 8, 11]
Level = ['None', 'Low', 'Moderate', 'High', 'Very High', 'Extreme']
# Define UV index colours
Grey = '#646464'
Green = '#558B2F'
Yellow = '#F9A825'
Orange = '#EF6C00'
Red = '#B71C1C'
Violet = '#6A1B9A'
Color = [Grey, Green, Yellow, Orange, Red, Violet]
# Set the UV index
if uvLevel[0] > 0:
Ind = bisect.bisect(Cutoffs, round(uvLevel[0], 1))
else:
Ind = 0
uvIndex = [round(uvLevel[0], 1), 'index', Level[Ind], Color[Ind]]
# Return UV Index icon
return uvIndex
def peakSunHours(radiation, peakSun, device, apiData, config):
""" Calculate peak sun hours since midnight and daily solar potential
INPUTS:
Radiation Current solar radiation [W/m^2]
peakSun Current peak sun hours since midnight [hours]
device Device ID
apiData WeatherFlow REST API data
config Station configuration
OUTPUT:
peakSun Peak sun hours since midnight and solar potential
"""
# Return None if required variables are missing
errorOutput = [None, 'hrs', '-']
if radiation[0] is None:
Logger.warning(f'peakSun: {system().log_time()} - radiation is None')
return errorOutput
# Define current time in station timezone
Tz = pytz.timezone(config['Station']['Timezone'])
Now = datetime.now(pytz.utc).astimezone(Tz)
# Calculate time of sunrise and sunset or use existing values
if peakSun[0] is None or Now > datetime.fromtimestamp(peakSun[5], Tz):
Observer = ephem.Observer()
Observer.pressure = 0
Observer.lat = str(config['Station']['Latitude'])
Observer.lon = str(config['Station']['Longitude'])
Observer.horizon = '-0:34'
sunrise = Observer.next_rising(ephem.Sun()).datetime().timestamp()
sunset = Observer.next_setting(ephem.Sun()).datetime().timestamp()
else:
sunrise = peakSun[4]
sunset = peakSun[5]
# Define index of radiation in websocket packets
if str(device) == config['Station']['SkyID']:
index_bucket_a = 10
elif str(device) == config['Station']['TempestID']:
index_bucket_a = 11
# If console is initialising, download all data for current day using
# Weatherflow API and calculate Peak Sun Hours
if 'today' in apiData[device]:
if weatherflow_api.verify_response(apiData[device]['today'], 'obs'):
dataToday = apiData[device]['today'].json()['obs']
radiation = [item[index_bucket_a] for item in dataToday if item[index_bucket_a] is not None]
try:
watthrs = sum([item * (1 / 60) for item in radiation])
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()]
except Exception as Error:
Logger.warning(f'peakSun: {system().log_time()} - {Error}')
return errorOutput
else:
return errorOutput
# Else if midnight has passed, reset Peak Sun Hours
elif Now.date() > datetime.fromtimestamp(peakSun[6], Tz).date():
watthrs = radiation[0] * (1 / 60)
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()]
# Else calculate current Peak Sun Hours
else:
watthrs = peakSun[3] + radiation[0] * (1 / 60)
peakSun = [watthrs / 1000, 'hrs', watthrs, sunrise, sunset, time.time()]
# Calculate proportion of daylight hours that have passed
if datetime.fromtimestamp(sunrise, Tz) <= Now <= datetime.fromtimestamp(sunset, Tz):
daylightFactor = (time.time() - sunrise) / (sunset - sunrise)
else:
daylightFactor = 1
# Define daily solar potential
if peakSun[0] / daylightFactor == 0:
peakSun.insert(2, '[color=#646464ff]None[/color]')
elif peakSun[0] / daylightFactor < 2:
peakSun.insert(2, '[color=#4575b4ff]Limited[/color]')
elif peakSun[0] / daylightFactor < 4:
peakSun.insert(2, '[color=#fee090ff]Moderate[/color]')
elif peakSun[0] / daylightFactor < 6:
peakSun.insert(2, '[color=#f46d43ff]Good[/color]')
else:
peakSun.insert(2, '[color=#d73027ff]Excellent[/color]')
# Return Peak Sun Hours
return peakSun
""" Formats and sets the required units of observations displayed on the
Raspberry Pi Python console for Weather Flow Smart Home Weather Stations.
Copyright (C) 2018-2023 Peter Davis
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
# Import required modules
from lib import derivedVariables as derive
from datetime import datetime
import pytz
def Units(Obs, Unit):
""" Sets the required observation units
INPUTS:
Obs Observations with current units
Unit Required output unit
OUTPUT:
cObs Observation converted into required unit
"""
# Convert temperature observations
cObs = Obs[:]
if Unit in ['f', 'c']:
for ii, T in enumerate(Obs):
if T == 'c':
if Unit == 'f':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * (9 / 5) + 32
cObs[ii] = 'f'
else:
cObs[ii - 1] = Obs[ii - 1]
cObs[ii] = 'c'
if T in ['dc', 'c/hr']:
if Unit == 'f':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * (9 / 5)
if T == 'dc':
cObs[ii] = 'f'
elif T == 'c/hr':
cObs[ii] = 'f/hr'
else:
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1]
if T == 'dc':
cObs[ii] = 'c'
# Convert pressure and pressure trend observations
elif Unit in ['inhg', 'mmhg', 'hpa', 'mb']:
for ii, P in enumerate(Obs):
if P in ['mb', 'mb/hr']:
if Unit == 'inhg':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 0.0295301
if P == 'mb':
cObs[ii] = ' inHg'
else:
cObs[ii] = ' inHg/hr'
elif Unit == 'mmhg':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 0.750063
if P == 'mb':
cObs[ii] = ' mmHg'
else:
cObs[ii] = ' mmHg/hr'
elif Unit == 'hpa':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1]
if P == 'mb':
cObs[ii] = ' hPa'
else:
cObs[ii] = ' hPa/hr'
else:
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1]
if P == 'mb':
cObs[ii] = ' mb'
else:
cObs[ii] = ' mb/hr'
# Convert windspeed observations
elif Unit in ['mph', 'lfm', 'kts', 'kph', 'bft', 'mps']:
for ii, W in enumerate(Obs):
if W == 'mps':
if Unit == 'mph' or Unit == 'lfm':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 2.2369362920544
cObs[ii] = 'mph'
elif Unit == 'kts':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 1.9438
cObs[ii] = 'kts'
elif Unit == 'kph':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 3.6
cObs[ii] = 'km/h'
elif Unit == 'bft':
if Obs[ii - 1] is not None:
cObs[ii - 1] = derive.beaufortScale(Obs[ii - 1:ii + 1])[2]
cObs[ii] = 'bft'
else:
cObs[ii - 1] = Obs[ii - 1]
cObs[ii] = 'm/s'
# Convert wind direction observations
elif Unit in ['degrees', 'cardinal']:
for ii, W in enumerate(Obs):
if W == 'degrees':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
cObs[ii] = ''
elif cObs[ii - 1] == 'calm':
cObs[ii - 1] = 'Calm'
cObs[ii] = ''
elif Unit == 'cardinal':
cObs[ii - 1] = derive.cardinalWindDir(Obs[ii - 1:ii + 1])[2]
cObs[ii] = ''
else:
cObs[ii - 1] = Obs[ii - 1]
cObs[ii] = 'degrees'
# Convert rain accumulation and rain rate observations
elif Unit in ['in', 'cm', 'mm']:
for ii, Prcp in enumerate(Obs):
if Prcp in ['mm', 'mm/hr']:
if Unit == 'in':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 0.0393701
if Prcp == 'mm':
cObs[ii] = ' in'
else:
cObs[ii] = ' in/hr'
elif Unit == 'cm':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 0.1
if Prcp == 'mm':
cObs[ii] = ' cm'
else:
cObs[ii] = ' cm/hr'
else:
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1]
if Prcp == 'mm':
cObs[ii] = ' mm'
else:
cObs[ii] = ' mm/hr'
# Convert distance observations
elif Unit in ['km', 'mi']:
for ii, Dist in enumerate(Obs):
if Dist == 'km':
if Unit == 'mi':
if Obs[ii - 1] is not None:
cObs[ii - 1] = Obs[ii - 1] * 0.62137
cObs[ii] = 'miles'
# Convert other observations
elif Unit in ['metric', 'imperial']:
for ii, other in enumerate(Obs):
if other == 'Wm2':
pass
elif other == 'index':
pass
elif other == 'hrs':
pass
elif other == '/min':
pass
elif other == 'count':
pass
elif other == 's':
pass
elif other == '%':
pass
# Return converted observations
return cObs
def Format(Obs, obType, config=[]):
""" Formats the observation for display on the console
INPUTS:
Obs Observations with units
obType Observation type
OUTPUT:
cObs Formatted observation based on specified obType
"""
# Convert obType to list if required
if not isinstance(obType, list):
obType = [obType]
# Format temperature observations
cObs = Obs[:]
for Type in obType:
if Type == 'Temp':
for ii, T in enumerate(Obs):
if isinstance(T, str) and T.strip() in ['c', 'f']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
elif round(cObs[ii - 1], 1) == 0.0:
cObs[ii - 1] = '{:.1f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
if T.strip() == 'c':
cObs[ii] = u'\N{DEGREE CELSIUS}'
elif T.strip() == 'f':
cObs[ii] = u'\N{DEGREE FAHRENHEIT}'
elif isinstance(T, str) and T.strip() in ['c/hr', 'f/hr']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
elif round(cObs[ii - 1], 1) == 0.0:
cObs[ii - 1] = '{:.1f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:+.1f}'.format(cObs[ii - 1])
if T.strip() == 'c/hr':
cObs[ii] = u'\N{DEGREE CELSIUS}/hr'
elif T.strip() == 'f/hr':
cObs[ii] = u'\N{DEGREE FAHRENHEIT}/hr'
elif Type == 'forecastTemp':
for ii, T in enumerate(Obs):
if isinstance(T, str) and T.strip() in ['c', 'f']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
elif round(cObs[ii - 1], 1) == 0.0:
cObs[ii - 1] = '{:.0f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
if T.strip() == 'c':
cObs[ii] = u'\N{DEGREE CELSIUS}'
elif T.strip() == 'f':
cObs[ii] = u'\N{DEGREE FAHRENHEIT}'
# Format pressure observations
elif Type == 'Pressure':
for ii, P in enumerate(Obs):
if isinstance(P, str) and P.strip() in ['inHg/hr', 'inHg', 'mmHg/hr', 'mmHg', 'hPa/hr', 'mb/hr', 'hPa', 'mb']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if P.strip() in ['inHg/hr', 'inHg']:
if round(cObs[ii - 1], 3) == 0.0:
cObs[ii - 1] = '{:+.3f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:+.3f}'.format(cObs[ii - 1])
elif P.strip() in ['mmHg/hr', 'mmHg']:
if round(cObs[ii - 1], 2) == 0.0:
cObs[ii - 1] = '{:+.2f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:+.2f}'.format(cObs[ii - 1])
elif P.strip() in ['hPa/hr', 'mb/hr', 'hPa', 'mb']:
if round(cObs[ii - 1], 1) == 0.0:
cObs[ii - 1] = '{:+.1f}'.format(abs(cObs[ii - 1]))
else:
cObs[ii - 1] = '{:+.1f}'.format(cObs[ii - 1])
# Format windspeed observations
elif Type == 'Wind':
for ii, W in enumerate(Obs):
if isinstance(W, str) and W.strip() in ['mph', 'kts', 'km/h', 'bft', 'm/s']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if round(cObs[ii - 1], 1) < 100:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif Type == 'forecastWind':
for ii, W in enumerate(Obs):
if isinstance(W, str) and W.strip() in ['mph', 'kts', 'km/h', 'bft', 'm/s']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
# Format wind direction observations
elif Type == 'Direction':
for ii, D in enumerate(Obs):
if isinstance(D, str) and D.strip() in ['degrees']:
cObs[ii] = u'\u00B0'
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
# Format rain accumulation and rain rate observations
elif Type == 'Precip':
for ii, Prcp in enumerate(Obs):
if isinstance(Prcp, str):
if Prcp.strip() == 'mm':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if cObs[ii - 1] == 0:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif cObs[ii - 1] < 0.127:
cObs[ii - 1] = 'Trace'
cObs[ii] = ''
elif round(cObs[ii - 1], 1) < 10:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif Prcp.strip() == 'cm':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if cObs[ii - 1] == 0:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif cObs[ii - 1] < 0.0127:
cObs[ii - 1] = 'Trace'
cObs[ii] = ''
elif round(cObs[ii - 1], 2) < 10:
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1])
elif round(cObs[ii - 1], 1) < 100:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif Prcp.strip() == 'in':
cObs[ii] = u'\u0022'
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if cObs[ii - 1] == 0:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif cObs[ii - 1] < 0.005:
cObs[ii - 1] = 'Trace'
cObs[ii] = ''
elif round(cObs[ii - 1], 2) < 10:
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1])
elif round(cObs[ii - 1], 1) < 100:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif Prcp.strip() == 'mm/hr':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if cObs[ii - 1] == 0:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif cObs[ii - 1] < 0.1:
cObs[ii - 1] = '<0.1'
elif round(cObs[ii - 1], 1) < 10:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif Prcp.strip() in ['in/hr', 'cm/hr']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
if cObs[ii - 1] == 0:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
elif cObs[ii - 1] < 0.01:
cObs[ii - 1] = '<0.01'
elif round(cObs[ii - 1], 2) < 10:
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1])
elif round(cObs[ii - 1], 1) < 100:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
# Format humidity observations
elif Type == 'Humidity':
for ii, H in enumerate(Obs):
if isinstance(H, str) and H.strip() == '%':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
# Format solar radiation observations
elif Type == 'Radiation':
for ii, Rad in enumerate(Obs):
if isinstance(Rad, str) and Rad.strip() == 'Wm2':
cObs[ii] = ' W/m' + u'\u00B2'
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
# Format UV observations
elif Type == 'UV':
for ii, UV in enumerate(Obs):
if isinstance(UV, str) and UV.strip() == 'index':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
cObs.extend(['-', '#646464'])
else:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
# Format Peak Sun Hours observations
elif Type == 'peakSun':
for ii, psh in enumerate(Obs):
if isinstance(psh, str) and psh.strip() == 'hrs':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1])
# Format battery voltage observations
elif Type == 'Battery':
for ii, V in enumerate(Obs):
if isinstance(V, str) and V.strip() == 'v':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.2f}'.format(cObs[ii - 1])
# Format lightning strike count observations
elif Type == 'StrikeCount':
for ii, L in enumerate(Obs):
if isinstance(L, str) and L.strip() == 'count':
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
elif cObs[ii - 1] < 1000:
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
else:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1] / 1000) + ' k'
# Format lightning strike distance observations
elif Type == 'StrikeDistance':
for ii, StrikeDist in enumerate(Obs):
if isinstance(StrikeDist, str):
if StrikeDist.strip() in ['km']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(max(cObs[ii - 1] - 3, 0)) + '-' + '{:.0f}'.format(cObs[ii - 1] + 3)
elif StrikeDist.strip() in ['miles']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
cObs[ii - 1] = '{:.0f}'.format(max(cObs[ii - 1] - 3 * 0.62137, 0)) + '-' + '{:.0f}'.format(cObs[ii - 1] + 3 * 0.62137)
# Format lightning strike frequency observations
elif Type == 'StrikeFrequency':
for ii, StrikeFreq in enumerate(Obs):
if isinstance(StrikeFreq, str):
if StrikeFreq.strip() in ['/min']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
cObs[ii] = ' /min'
elif cObs[ii - 1].is_integer():
cObs[ii - 1] = '{:.0f}'.format(cObs[ii - 1])
cObs[ii] = ' /min'
else:
cObs[ii - 1] = '{:.1f}'.format(cObs[ii - 1])
cObs[ii] = ' /min'
# Format time difference observations
elif Type == 'Time':
for ii, Time in enumerate(Obs):
if isinstance(Time, str) and Time.strip() in ['s']:
if cObs[ii - 1] is None:
cObs[ii - 1] = '-'
else:
Tz = pytz.timezone(config['Station']['Timezone'])
if config['Display']['TimeFormat'] == '12 hr':
if config['System']['Hardware'] == 'Other':
Format = '%#I:%M %p'
else:
Format = '%-I:%M %p'
else:
Format = '%H:%M'
cObs[ii - 1] = datetime.fromtimestamp(cObs[ii - 1], Tz).strftime(Format)
# Format time difference observations
elif Type == 'TimeDelta':
for ii, Delta in enumerate(Obs):
if isinstance(Delta, str) and Delta.strip() in ['s']:
if cObs[ii - 1] is None:
cObs = ['-', '-', '-', '-', cObs[2]]
else:
days, remainder = divmod(cObs[ii - 1], 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
if days >= 1:
if days == 1:
if hours == 1:
cObs = ['{:.0f}'.format(days), 'day', '{:.0f}'.format(hours), 'hour', cObs[2]]
else:
cObs = ['{:.0f}'.format(days), 'day', '{:.0f}'.format(hours), 'hours', cObs[2]]
elif days <= 99:
if hours == 1:
cObs = ['{:.0f}'.format(days), 'days', '{:.0f}'.format(hours), 'hour', cObs[2]]
else:
cObs = ['{:.0f}'.format(days), 'days', '{:.0f}'.format(hours), 'hours', cObs[2]]
elif days >= 100:
cObs = ['{:.0f}'.format(days), 'days', '-', '-', cObs[2]]
elif hours >= 1:
if hours == 1:
if minutes == 1:
cObs = ['{:.0f}'.format(hours), 'hour', '{:.0f}'.format(minutes), 'min', cObs[2]]
else:
cObs = ['{:.0f}'.format(hours), 'hour', '{:.0f}'.format(minutes), 'mins', cObs[2]]
elif hours > 1:
if minutes == 1:
cObs = ['{:.0f}'.format(hours), 'hours', '{:.0f}'.format(minutes), 'min', cObs[2]]
else:
cObs = ['{:.0f}'.format(hours), 'hours', '{:.0f}'.format(minutes), 'mins', cObs[2]]
else:
if minutes == 0:
cObs = ['< 1', 'minute', '-', '-', cObs[2]]
elif minutes == 1:
cObs = ['{:.0f}'.format(minutes), 'minute', '-', '-', cObs[2]]
else:
cObs = ['{:.0f}'.format(minutes), 'minutes', '-', '-', cObs[2]]
# Return formatted observations
return cObs
""" Handles Websocket messages received by the Raspberry Pi Python console for
WeatherFlow Tempest and Smart Home Weather stations.
Copyright (C) 2018-2023 Peter Davis
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
# Import required library modules
from lib.request_api import weatherflow_api
from lib.system import system
from lib import derivedVariables as derive
from lib import observationFormat as observation
from lib import properties
# Import required Kivy modules
from kivy.logger import Logger
from kivy.clock import mainthread
from kivy.app import App
# Define empty deviceObs dictionary
device_obs = {'obTime': [None, 's'], 'pressure': [None, 'mb'], 'outTemp': [None, 'c'],
'inTemp': [None, 'c'], 'humidity': [None, '%'], 'windSpd': [None, 'mps'],
'windGust': [None, 'mps'], 'windDir': [None, 'degrees'], 'rapidWindSpd': [None, 'mps'],
'rapidWindDir': [None, 'degrees'], 'uvIndex': [None, 'index'], 'radiation': [None, 'Wm2'],
'minuteRain': [None, 'mm'], 'dailyRain': [None, 'mm'], 'strikeMinute': [None, 'count'],
'strikeTime': [None, 's'], 'strikeDist': [None, 'km'], 'strike3hr': [None, 'count'],
}
# Define empty deriveObs dictionary
derive_obs = {'dewPoint': [None, 'c'], 'feelsLike': [None, 'c', '-', '-'], 'outTempMax': [None, 'c', '-'],
'outTempMin': [None, 'c', '-'], 'outTempDiff': [None, 'dc', '-'], 'outTempTrend': [None, 'c/hr', 'c8c8c8ff'],
'inTempMax': [None, 'c', '-'], 'inTempMin': [None, 'c', '-'], 'SLP': [None, 'mb', None],
'SLPTrend': [None, 'mb/hr', '-', '-'], 'SLPMin': [None, 'mb', '-'], 'SLPMax': [None, 'mb', '-'],
'windSpd': [None, 'mps', '-', '-', '-'], 'windAvg': [None, 'mps'], 'gustMax': [None, 'mps', '-'],
'windDir': [None, 'degrees', '-', '-'], 'rapidWindDir': [None, 'degrees', '-', '-'], 'rainRate': [None, 'mm/hr', '-'],
'uvIndex': [None, 'index'], 'peakSun': [None, 'hrs', '-'], 'strikeDeltaT': [None, 's', None],
'strikeFreq': [None, '/min', None, '/min'],
'strikeCount': {'today': [None, 'count'],
'month': [None, 'count'],
'year': [None, 'count']
},
'rainAccum': {'today': [None, 'mm'],
'yesterday': [None, 'mm'],
'month': [None, 'mm'],
'year': [None, 'mm']
}
}
# =============================================================================
# DEFINE 'obsParser' CLASS
# =============================================================================
class obsParser():
def __init__(self):
# Define instance variables
self.display_obs = properties.Obs()
self.api_data = {}
self.transmit = 1
self.flag_api = [1, 1, 1, 1]
# Create reference to app object
self.app = App.get_running_app()
self.app.obsParser = self
# Define device and derived observations dictionary
self.device_obs = device_obs.copy()
self.derive_obs = derive_obs.copy()
def parse_obs_st(self, message, config):
""" Parse obs_st Websocket messages from TEMPEST module
INPUTS:
message obs_sky Websocket message
config Console configuration object
"""
# Extract latest TEMPEST Websocket JSON
if 'obs' in message:
latestOb = message['obs'][0]
else:
return
# Extract TEMPEST device_id. Initialise API data dictionary
device_id = message['device_id']
self.api_data[device_id] = {'flagAPI': self.flag_api[0]}
# Discard duplicate TEMPEST Websocket messages
if 'obs_st' in self.display_obs:
if self.display_obs['obs_st']['obs'][0] == latestOb[0]:
return
# Extract required observations from latest TEMPEST Websocket JSON
self.device_obs['obTime'] = [latestOb[0], 's']
self.device_obs['windSpd'] = [latestOb[2], 'mps']
self.device_obs['windGust'] = [latestOb[3], 'mps']
self.device_obs['windDir'] = [latestOb[4], 'degrees']
self.device_obs['pressure'] = [latestOb[6], 'mb']
self.device_obs['outTemp'] = [latestOb[7], 'c']
self.device_obs['humidity'] = [latestOb[8], '%']
self.device_obs['uvIndex'] = [latestOb[10], 'index']
self.device_obs['radiation'] = [latestOb[11], 'Wm2']
self.device_obs['minuteRain'] = [latestOb[12], 'mm']
self.device_obs['strikeMinute'] = [latestOb[15], 'count']
self.device_obs['dailyRain'] = [latestOb[18], 'mm']
# Extract lightning strike data from the latest TEMPEST Websocket JSON
# "Summary" object
self.device_obs['strikeTime'] = [message['summary']['strike_last_epoch'] if 'strike_last_epoch' in message['summary'] else None, 's']
self.device_obs['strikeDist'] = [message['summary']['strike_last_dist'] if 'strike_last_dist' in message['summary'] else None, 'km']
self.device_obs['strike3hr'] = [message['summary']['strike_count_3h'] if 'strike_count_3h' in message['summary'] else None, 'count']
# Request required TEMPEST data from the WeatherFlow API
self.api_data[device_id]['24Hrs'] = weatherflow_api.last_24h(device_id, latestOb[0], config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['SLPMin'][0] is None
or self.derive_obs['SLPMax'][0] is None
or self.derive_obs['outTempMin'][0] is None
or self.derive_obs['outTempMax'][0] is None
or self.derive_obs['windAvg'][0] is None
or self.derive_obs['gustMax'][0] is None
or self.derive_obs['peakSun'][0] is None
or self.derive_obs['strikeCount']['today'][0] is None):
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['yesterday'][0] is None):
self.api_data[device_id]['yesterday'] = weatherflow_api.yesterday(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['month'][0] is None
or self.derive_obs['strikeCount']['month'][0] is None):
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['year'][0] is None
or self.derive_obs['strikeCount']['year'][0] is None):
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config)
self.flag_api[0] = 0
# Store latest TEMPEST JSON message
self.display_obs['obs_st'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'obs_st')
def parse_obs_sky(self, message, config):
""" Parse obs_sky Websocket messages from SKY module
INPUTS:
message obs_sky Websocket message
config Console configuration object
"""
# Extract latest SKY Websocket JSON
if 'obs' in message:
latestOb = message['obs'][0]
else:
return
# Extract SKY device_id. Initialise API data dictionary
device_id = message['device_id']
self.api_data[device_id] = {'flagAPI': self.flag_api[1]}
# Discard duplicate SKY Websocket messages
if 'obs_sky' in self.display_obs:
if self.display_obs['obs_sky']['obs'][0] == latestOb[0]:
return
# Extract required observations from latest SKY Websocket JSON
self.device_obs['uvIndex'] = [latestOb[2], 'index']
self.device_obs['minuteRain'] = [latestOb[3], 'mm']
self.device_obs['windSpd'] = [latestOb[5], 'mps']
self.device_obs['windGust'] = [latestOb[6], 'mps']
self.device_obs['windDir'] = [latestOb[7], 'degrees']
self.device_obs['radiation'] = [latestOb[10], 'Wm2']
self.device_obs['dailyRain'] = [latestOb[11], 'mm']
# Request required SKY data from the WeatherFlow API
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['windAvg'][0] is None
or self.derive_obs['gustMax'][0] is None
or self.derive_obs['peakSun'][0] is None):
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['yesterday'][0] is None):
self.api_data[device_id]['yesterday'] = weatherflow_api.yesterday(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['month'][0] is None):
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['rainAccum']['year'][0] is None):
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config)
self.flag_api[1] = 0
# Store latest SKY JSON message
self.display_obs['obs_sky'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'obs_sky')
def parse_obs_out_air(self, message, config):
""" Parse obs_air Websocket messages from outdoor AIR module
INPUTS:
message obs_air Websocket message
config Console configuration object
"""
# Extract latest outdoor AIR Websocket JSON
if 'obs' in message:
latestOb = message['obs'][0]
else:
return
# Extract outdoor AIR device_id. Initialise API data dictionary
device_id = message['device_id']
self.api_data[device_id] = {'flagAPI': self.flag_api[2]}
# Discard duplicate outdoor AIR Websocket messages
if 'obs_out_air' in self.display_obs:
if self.display_obs['obs_out_air']['obs'][0] == latestOb[0]:
return
# Extract required observations from latest outdoor AIR Websocket JSON
self.device_obs['obTime'] = [latestOb[0], 's']
self.device_obs['pressure'] = [latestOb[1], 'mb']
self.device_obs['outTemp'] = [latestOb[2], 'c']
self.device_obs['humidity'] = [latestOb[3], '%']
self.device_obs['strikeMinute'] = [latestOb[4], 'count']
# Extract lightning strike data from the latest outdoor AIR Websocket
# JSON "Summary" object
self.device_obs['strikeTime'] = [message['summary']['strike_last_epoch'] if 'strike_last_epoch' in message['summary'] else None, 's']
self.device_obs['strikeDist'] = [message['summary']['strike_last_dist'] if 'strike_last_dist' in message['summary'] else None, 'km']
self.device_obs['strike3hr'] = [message['summary']['strike_count_3h'] if 'strike_count_3h' in message['summary'] else None, 'count']
# Request required outdoor AIR data from the WeatherFlow API
self.api_data[device_id]['24Hrs'] = weatherflow_api.last_24h(device_id, latestOb[0], config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['SLPMin'][0] is None
or self.derive_obs['SLPMax'][0] is None
or self.derive_obs['outTempMin'][0] is None
or self.derive_obs['outTempMax'][0] is None
or self.derive_obs['strikeCount']['today'][0] is None):
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['strikeCount']['month'][0] is None):
self.api_data[device_id]['month'] = weatherflow_api.month(device_id, config)
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['strikeCount']['year'][0] is None):
self.api_data[device_id]['year'] = weatherflow_api.year(device_id, config)
self.flag_api[2] = 0
# Store latest outdoor AIR JSON message
self.display_obs['obs_out_air'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'obs_out_air')
def parse_obs_in_air(self, message, config):
""" Parse obs_air Websocket messages from indoor AIR module
INPUTS:
message obs_air Websocket message
config Console configuration object
"""
# Extract latest indoor AIR Websocket JSON
if 'obs' in message:
latestOb = message['obs'][0]
else:
return
# Extract indoor AIR device_id. Initialise API data dictionary
device_id = message['device_id']
self.api_data[device_id] = {'flagAPI': self.flag_api[3]}
# Discard duplicate indoor AIR Websocket messages
if 'obs_in_air' in self.display_obs:
if self.display_obs['obs_in_air']['obs'][0] == latestOb[0]:
return
# Extract required observations from latest indoor AIR Websocket JSON
self.device_obs['obTime'] = [latestOb[0], 's']
self.device_obs['inTemp'] = [latestOb[2], 'c']
# Request required indoor AIR data from the WeatherFlow API
if (self.api_data[device_id]['flagAPI']
or self.derive_obs['inTempMin'][0] is None
or self.derive_obs['inTempMax'][0] is None):
self.api_data[device_id]['today'] = weatherflow_api.today(device_id, config)
self.flag_api[3] = 0
# Store latest indoor AIR JSON message
self.display_obs['obs_in_air'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'obs_in_air')
def parse_rapid_wind(self, message, config):
""" Parse rapid_wind Websocket messages from SKY or TEMPEST module
INPUTS:
message rapid_wind Websocket message received from
SKY or TEMPEST module
config Console configuration object
"""
# Extract latest rapid_wind Websocket JSON
if 'ob' in message:
latestOb = message['ob']
else:
return
# Extract device ID
device_id = message['device_id']
# Discard duplicate rapid_wind Websocket messages
if 'rapid_wind' in self.display_obs:
if self.display_obs['rapid_wind']['ob'][0] == latestOb[0]:
return
# Extract required observations from latest rapid_wind Websocket JSON
self.device_obs['rapidWindSpd'] = [latestOb[1], 'mps']
self.device_obs['rapidWindDir'] = [latestOb[2], 'degrees']
# Extract wind direction from previous rapid_wind Websocket JSON
if 'rapid_wind' in self.device_obs:
previousOb = self.device_obs['rapid_wind']['ob']
rapidWindDirOld = [previousOb[2], 'degrees']
else:
rapidWindDirOld = [0, 'degrees']
# If windspeed is zero, freeze direction at last direction of non-zero
# wind speed and edit latest rapid_wind Websocket JSON message.
if self.device_obs['rapidWindSpd'][0] == 0:
self.device_obs['rapidWindDir'] = rapidWindDirOld
message['ob'][2] = rapidWindDirOld[0]
# Store latest rapid_wind Websocket JSON message
self.display_obs['rapid_wind'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'rapid_wind')
def parse_evt_strike(self, message, config):
""" Parse lightning strike event Websocket messages received from AIR
or TEMPEST module
INPUTS:
message evt_strike Websocket message received from
AIR or TEMPEST module
config Console configuration object
"""
# Extract latest evt_strike Websocket JSON
if 'evt' in message:
latestEvt = message['evt']
else:
return
# Extract device ID
device_id = message['device_id']
# Discard duplicate evt_strike Websocket messages
if 'evt_strike' in self.display_obs:
if self.display_obs['evt_strike']['evt'][0] == latestEvt[0]:
return
# Extract required observations from latest evt_strike Websocket JSON
self.device_obs['strikeTime'] = [latestEvt[0], 's']
self.device_obs['strikeDist'] = [latestEvt[1], 'km']
# Store latest evt_strike JSON message
self.display_obs['evt_strike'] = message
# Calculate derived observations
self.calcDerivedVariables(device_id, config, 'evt_strike')
def calcDerivedVariables(self, device, config, device_type):
""" Calculate derived variables from available device observations
INPUTS:
device Device ID
config Console configuration object
device_type Device type
"""
# Derive variables from available obs_out_air and obs_st observations
# Derive variables from available obs_out_air and obs_st observations
if device_type in ('obs_out_air', 'obs_st'):
self.derive_obs['feelsLike'] = derive.feelsLike(self.device_obs['outTemp'], self.device_obs['humidity'], self.device_obs['windSpd'], config)
self.derive_obs['dewPoint'] = derive.dewPoint(self.device_obs['outTemp'], self.device_obs['humidity'])
self.derive_obs['outTempDiff'] = derive.tempDiff(self.device_obs['outTemp'], self.device_obs['obTime'], device, self.api_data, config)
self.derive_obs['outTempTrend'] = derive.tempTrend(self.device_obs['outTemp'], self.device_obs['obTime'], device, self.api_data, config)
self.derive_obs['outTempMax'] = derive.tempMax(self.device_obs['outTemp'], self.device_obs['obTime'], self.derive_obs['outTempMax'], device, self.api_data, config)
self.derive_obs['outTempMin'] = derive.tempMin(self.device_obs['outTemp'], self.device_obs['obTime'], self.derive_obs['outTempMin'], device, self.api_data, config)
self.derive_obs['SLP'] = derive.SLP(self.device_obs['pressure'], device, config)
self.derive_obs['SLPTrend'] = derive.SLPTrend(self.device_obs['pressure'], self.device_obs['obTime'], device, self.api_data, config)
self.derive_obs['SLPMax'] = derive.SLPMax(self.device_obs['pressure'], self.device_obs['obTime'], self.derive_obs['SLPMax'], device, self.api_data, config)
self.derive_obs['SLPMin'] = derive.SLPMin(self.device_obs['pressure'], self.device_obs['obTime'], self.derive_obs['SLPMin'], device, self.api_data, config)
self.derive_obs['strikeCount'] = derive.strikeCount(self.device_obs['strikeMinute'], self.derive_obs['strikeCount'], device, self.api_data, config)
self.derive_obs['strikeFreq'] = derive.strikeFrequency(self.device_obs['obTime'], device, self.api_data, config)
self.derive_obs['strikeDeltaT'] = derive.strikeDeltaT(self.device_obs['strikeTime'])
# Derive variables from available obs_sky and obs_st observations
if device_type in ('obs_sky', 'obs_st'):
self.derive_obs['uvIndex'] = derive.UVIndex(self.device_obs['uvIndex'])
self.derive_obs['peakSun'] = derive.peakSunHours(self.device_obs['radiation'], self.derive_obs['peakSun'], device, self.api_data, config)
self.derive_obs['windSpd'] = derive.beaufortScale(self.device_obs['windSpd'])
self.derive_obs['windDir'] = derive.cardinalWindDir(self.device_obs['windDir'], self.device_obs['windSpd'])
self.derive_obs['windAvg'] = derive.avgWindSpeed(self.device_obs['windSpd'], self.derive_obs['windAvg'], device, self.api_data, config)
self.derive_obs['gustMax'] = derive.maxWindGust(self.device_obs['windGust'], self.device_obs['obTime'], self.derive_obs['gustMax'], device, self.api_data, config)
self.derive_obs['rainRate'] = derive.rainRate(self.device_obs['minuteRain'])
self.derive_obs['rainAccum'] = derive.rainAccumulation(self.device_obs['dailyRain'], self.derive_obs['rainAccum'], device, self.api_data, config)
# Derive variables from available obs_out_air and obs_st observations
if device_type == 'obs_in_air':
self.derive_obs['inTempMax'] = derive.tempMax(self.device_obs['inTemp'], self.device_obs['obTime'], self.derive_obs['inTempMax'], device, self.api_data, config)
self.derive_obs['inTempMin'] = derive.tempMin(self.device_obs['inTemp'], self.device_obs['obTime'], self.derive_obs['inTempMin'], device, self.api_data, config)
# Derive variables from available rapid_wind observations
if device_type == 'rapid_wind':
self.derive_obs['rapidWindDir'] = derive.cardinalWindDir(self.device_obs['rapidWindDir'], self.device_obs['rapidWindSpd'])
# Derive variables from available evt_strike observations
if device_type == 'evt_strike':
self.derive_obs['strikeDeltaT'] = derive.strikeDeltaT(self.device_obs['strikeTime'])
# Format derived observations
self.format_derived_variables(config, device_type)
def format_derived_variables(self, config, device_type):
""" Format derived variables from available device observations
INPUTS:
config Console configuration object
device_type Device type
"""
# Convert derived variable units from obs_out_air and obs_st observations
if device_type in ('obs_out_air', 'obs_st', 'obs_all'):
outTemp = observation.Units(self.device_obs['outTemp'], config['Units']['Temp'])
feelsLike = observation.Units(self.derive_obs['feelsLike'], config['Units']['Temp'])
dewPoint = observation.Units(self.derive_obs['dewPoint'], config['Units']['Temp'])
outTempDiff = observation.Units(self.derive_obs['outTempDiff'], config['Units']['Temp'])
outTempTrend = observation.Units(self.derive_obs['outTempTrend'], config['Units']['Temp'])
outTempMax = observation.Units(self.derive_obs['outTempMax'], config['Units']['Temp'])
outTempMin = observation.Units(self.derive_obs['outTempMin'], config['Units']['Temp'])
humidity = observation.Units(self.device_obs['humidity'], config['Units']['Other'])
SLP = observation.Units(self.derive_obs['SLP'], config['Units']['Pressure'])
SLPTrend = observation.Units(self.derive_obs['SLPTrend'], config['Units']['Pressure'])
SLPMax = observation.Units(self.derive_obs['SLPMax'], config['Units']['Pressure'])
SLPMin = observation.Units(self.derive_obs['SLPMin'], config['Units']['Pressure'])
strikeDist = observation.Units(self.device_obs['strikeDist'], config['Units']['Distance'])
strikeDeltaT = observation.Units(self.derive_obs['strikeDeltaT'], config['Units']['Other'])
strikeFreq = observation.Units(self.derive_obs['strikeFreq'], config['Units']['Other'])
strike3hr = observation.Units(self.device_obs['strike3hr'], config['Units']['Other'])
strikeToday = observation.Units(self.derive_obs['strikeCount']['today'], config['Units']['Other'])
strikeMonth = observation.Units(self.derive_obs['strikeCount']['month'], config['Units']['Other'])
strikeYear = observation.Units(self.derive_obs['strikeCount']['year'], config['Units']['Other'])
# Convert derived variable units from obs_sky and obs_st observations
if device_type in ('obs_sky', 'obs_st', 'obs_all'):
rainRate = observation.Units(self.derive_obs['rainRate'], config['Units']['Precip'])
todayRain = observation.Units(self.derive_obs['rainAccum']['today'], config['Units']['Precip'])
yesterdayRain = observation.Units(self.derive_obs['rainAccum']['yesterday'], config['Units']['Precip'])
monthRain = observation.Units(self.derive_obs['rainAccum']['month'], config['Units']['Precip'])
yearRain = observation.Units(self.derive_obs['rainAccum']['year'], config['Units']['Precip'])
radiation = observation.Units(self.device_obs['radiation'], config['Units']['Other'])
uvIndex = observation.Units(self.derive_obs['uvIndex'], config['Units']['Other'])
peakSun = observation.Units(self.derive_obs['peakSun'], config['Units']['Other'])
windSpd = observation.Units(self.derive_obs['windSpd'], config['Units']['Wind'])
windDir = observation.Units(self.derive_obs['windDir'], config['Units']['Direction'])
windGust = observation.Units(self.device_obs['windGust'], config['Units']['Wind'])
windAvg = observation.Units(self.derive_obs['windAvg'], config['Units']['Wind'])
windMax = observation.Units(self.derive_obs['gustMax'], config['Units']['Wind'])
# Convert derived variable units from obs_in_air observations
if device_type in ('obs_in_air', 'obs_all'):
inTemp = observation.Units(self.device_obs['inTemp'], config['Units']['Temp'])
inTempMax = observation.Units(self.derive_obs['inTempMax'], config['Units']['Temp'])
inTempMin = observation.Units(self.derive_obs['inTempMin'], config['Units']['Temp'])
# Convert derived variable units from rapid_wind observations
if device_type in ('rapid_wind', 'obs_all'):
rapidWindSpd = observation.Units(self.device_obs['rapidWindSpd'], config['Units']['Wind'])
rapidWindDir = observation.Units(self.derive_obs['rapidWindDir'], 'degrees')
# Convert derived variable units from available evt_strike observations
if device_type in ('evt_strike', 'obs_all'):
strikeDist = observation.Units(self.device_obs['strikeDist'], config['Units']['Distance'])
strikeDeltaT = observation.Units(self.derive_obs['strikeDeltaT'], config['Units']['Other'])
# Format derived variables from obs_air and obs_st observations
if device_type in ('obs_out_air', 'obs_st', 'obs_all'):
self.display_obs['outTemp'] = observation.Format(outTemp, 'Temp')
self.display_obs['FeelsLike'] = observation.Format(feelsLike, 'Temp')
self.display_obs['DewPoint'] = observation.Format(dewPoint, 'Temp')
self.display_obs['outTempDiff'] = observation.Format(outTempDiff, 'Temp')
self.display_obs['outTempTrend'] = observation.Format(outTempTrend, 'Temp')
self.display_obs['outTempMax'] = observation.Format(outTempMax, ['Temp', 'Time'], config)
self.display_obs['outTempMin'] = observation.Format(outTempMin, ['Temp', 'Time'], config)
self.display_obs['Humidity'] = observation.Format(humidity, 'Humidity')
self.display_obs['SLP'] = observation.Format(SLP, 'Pressure')
self.display_obs['SLPTrend'] = observation.Format(SLPTrend, 'Pressure')
self.display_obs['SLPMax'] = observation.Format(SLPMax, ['Pressure', 'Time'], config)
self.display_obs['SLPMin'] = observation.Format(SLPMin, ['Pressure', 'Time'], config)
self.display_obs['StrikeDist'] = observation.Format(strikeDist, 'StrikeDistance')
self.display_obs['StrikeDeltaT'] = observation.Format(strikeDeltaT, 'TimeDelta')
self.display_obs['StrikeFreq'] = observation.Format(strikeFreq, 'StrikeFrequency')
self.display_obs['Strikes3hr'] = observation.Format(strike3hr, 'StrikeCount')
self.display_obs['StrikesToday'] = observation.Format(strikeToday, 'StrikeCount')
self.display_obs['StrikesMonth'] = observation.Format(strikeMonth, 'StrikeCount')
self.display_obs['StrikesYear'] = observation.Format(strikeYear, 'StrikeCount')
# Format derived variables from obs_sky and obs_st observations
if device_type in ('obs_sky', 'obs_st', 'obs_all'):
self.display_obs['Radiation'] = observation.Format(radiation, 'Radiation')
self.display_obs['UVIndex'] = observation.Format(uvIndex, 'UV')
self.display_obs['peakSun'] = observation.Format(peakSun, 'peakSun')
self.display_obs['RainRate'] = observation.Format(rainRate, 'Precip')
self.display_obs['TodayRain'] = observation.Format(todayRain, 'Precip')
self.display_obs['YesterdayRain'] = observation.Format(yesterdayRain, 'Precip')
self.display_obs['MonthRain'] = observation.Format(monthRain, 'Precip')
self.display_obs['YearRain'] = observation.Format(yearRain, 'Precip')
self.display_obs['WindSpd'] = observation.Format(windSpd, 'Wind')
self.display_obs['WindGust'] = observation.Format(windGust, 'Wind')
self.display_obs['AvgWind'] = observation.Format(windAvg, 'Wind')
self.display_obs['MaxGust'] = observation.Format(windMax, ['Wind', 'Time'], config)
self.display_obs['WindDir'] = observation.Format(windDir, 'Direction')
# Format derived variables from obs_in_air observations
if device_type in ('obs_in_air', 'obs_all'):
self.display_obs['inTemp'] = observation.Format(inTemp, 'Temp')
self.display_obs['inTempMax'] = observation.Format(inTempMax, ['Temp', 'Time'], config)
self.display_obs['inTempMin'] = observation.Format(inTempMin, ['Temp', 'Time'], config)
# Format derived variables from rapid_wind observations
if device_type in ('rapid_wind', 'obs_all'):
self.display_obs['rapidSpd'] = observation.Format(rapidWindSpd, 'Wind')
self.display_obs['rapidDir'] = observation.Format(rapidWindDir, 'Direction')
# Format derived variables from evt_strike observations
if device_type in ('evt_strike', 'obs_all'):
self.display_obs['StrikeDist'] = observation.Format(strikeDist, 'StrikeDistance')
self.display_obs['StrikeDeltaT'] = observation.Format(strikeDeltaT, 'TimeDelta')
# Update display with new variables
self.update_display(device_type)
def reformat_display(self):
while self.app.websocket_client.activeThreads():
pass
self.format_derived_variables(self.app.config, 'obs_all')
def resetDisplay(self):
while self.app.websocket_client.activeThreads():
pass
self.display_obs = properties.Obs()
self.device_obs = device_obs.copy()
self.derive_obs = derive_obs.copy()
self.api_data = {}
self.update_display('obs_reset')
@mainthread
def update_display(self, ob_type):
""" Update display with new variables derived from latest websocket
message
INPUTS:
ob_type Latest Websocket message type
"""
# Update display values with new derived observations
reference_error = False
for Key, Value in list(self.display_obs.items()):
if not (ob_type == 'obs_all' and 'rapid' in Key):
try: # Don't update rapidWind display when type is 'all'
self.app.CurrentConditions.Obs[Key] = Value # as the RapidWind rose is not animated in this case
except ReferenceError:
if not reference_error:
Logger.warning(f'obs_parser: {system().log_time()} - Reference error {ob_type}')
reference_error = True
# Update display graphics with new derived observations
if ob_type == 'rapid_wind':
if hasattr(self.app, 'WindSpeedPanel'):
for panel in getattr(self.app, 'WindSpeedPanel'):
panel.animateWindRose()
elif ob_type == 'evt_strike':
if self.app.config['Display']['LightningPanel'] == '1':
for ii, button in enumerate(self.app.CurrentConditions.button_list):
if "Lightning" in button[2]:
self.app.CurrentConditions.switchPanel([], button)
if hasattr(self.app, 'LightningPanel'):
for panel in getattr(self.app, 'LightningPanel'):
panel.setLightningBoltIcon()
panel.animateLightningBoltIcon()
else:
if ob_type in ['obs_st', 'obs_air', 'obs_all', 'obs_reset']:
if hasattr(self.app, 'TemperaturePanel'):
for panel in getattr(self.app, 'TemperaturePanel'):
panel.setFeelsLikeIcon()
if hasattr(self.app, 'LightningPanel'):
for panel in getattr(self.app, 'LightningPanel'):
panel.setLightningBoltIcon()
if hasattr(self.app, 'BarometerPanel'):
for panel in getattr(self.app, 'BarometerPanel'):
panel.setBarometerArrow()
if ob_type in ['obs_st', 'obs_sky', 'obs_all', 'obs_reset']:
if hasattr(self.app, 'WindSpeedPanel'):
for panel in getattr(self.app, 'WindSpeedPanel'):
panel.setWindIcons()
if hasattr(self.app, 'SunriseSunsetPanel'):
for panel in getattr(self.app, 'SunriseSunsetPanel'):
panel.setUVBackground()
if hasattr(self.app, 'RainfallPanel'):
for panel in getattr(self.app, 'RainfallPanel'):
panel.animate_rain_rate()
if hasattr(self.app, 'TemperaturePanel'):
for panel in getattr(self.app, 'TemperaturePanel'):
panel.setFeelsLikeIcon()
## WeatherFlow PiConsole: Raspberry Pi Python console for WeatherFlow Tempest
## and Smart Home Weather stations.
## Copyright (C) 2018-2023 Peter Davis
## This program is free software: you can redistribute it and/or modify it under
## the terms of the GNU General Public License as published by the Free Software
## Foundation, either version 3 of the License, or (at your option) any later
## version.
## This program is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
## FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
## details.
## You should have received a copy of the GNU General Public License along with
## this program. If not, see <http://www.gnu.org/licenses/>.
## =============================================================================
## WIND SPEED AND DIRECTION PANEL AND BUTTON
## =============================================================================
<WindSpeedPanel>:
## Panel background and title
PanelBackground:
_panelTitle: 'Wind Speed'
_panelStatus: app.CurrentConditions.Status['sky_last_sample'] if app.config['Station']['SkyID'] else app.CurrentConditions.Status['tempest_last_sample']
## Rapid Wind wind speed and direction
windRose:
Image:
source: 'icons/windRose/icons/' + str(round(root.rapidWindDir)) + app.scaleSuffix
pos_hint: {'x': 0, 'y': 0}
size_hint: (1, 1)
keep_ratio: 0
allow_stretch: 1
## Current average wind speed
LargeField:
text: app.CurrentConditions.Obs['WindSpd'][0]
pos_hint: {'x': 3/262, 'y': 95/202}
size_hint_x: (60/262)
MediumField:
text: app.CurrentConditions.Obs['WindSpd'][1]
pos_hint: {'x': 3/262, 'y': 78/202}
size_hint_x: (60/262)
## Daily averaged wind speed
SmallField:
text: 'Wind'
pos_hint: {'x': 3/262, 'y': 178/202}
size_hint_x: (65/262)
SmallField:
text: 'Avg [color=ff8837ff]' + app.CurrentConditions.Obs['AvgWind'][0] + '[/color] ' + app.CurrentConditions.Obs['AvgWind'][1]
pos_hint: {'x': 3/262, 'y': 161/202}
size_hint_x: (101/262)
## Current wind gust
LargeField:
text: app.CurrentConditions.Obs['WindGust'][0]
pos_hint: {'x': 201/262, 'y': 95/202}
size_hint_x: (60/262)
MediumField:
text: app.CurrentConditions.Obs['WindGust'][1]
pos_hint: {'x': 201/262, 'y': 78/202}
size_hint_x: (60/262)
## Maximum wind gust
SmallField:
text: 'Gust'
pos_hint: {'x': 142/262, 'y': 178/202}
size_hint_x: (165/262)
SmallField:
text: 'Max [color=ff8837ff]' + app.CurrentConditions.Obs['MaxGust'][0] + '[/color] ' + app.CurrentConditions.Obs['MaxGust'][1]
pos_hint: {'x': 159/262, 'y': 161/202}
size_hint_x: (101/262)
SmallField:
text: '--:--' if not isinstance(app.CurrentConditions.Obs['MaxGust'], list) else app.CurrentConditions.Obs['MaxGust'][2]
pos_hint: {'x': 180/262, 'y': 144/202}
size_hint_x: (65.5/262)
## Current Beaufort scale text and icon
Image:
source: 'icons/windSpd/' + root.windSpdIcon + app.scaleSuffix
pos_hint: {'x': 18/262, 'y': 28/202}
size_hint: (30/262, 30/202)
keep_ratio: 0
allow_stretch: 1
SmallField:
text: app.CurrentConditions.Obs['WindSpd'][4]
pos_hint: {'x': 6/262, 'y': 6/202}
size_hint: (126/262, 17/202)
text_size: self.size
halign: 'left'
## Current average wind direction
Image:
source: 'icons/windDir/' + root.windDirIcon + app.scaleSuffix
pos_hint: {'x': 216/262, 'y': 28/202}
size_hint: (30/262, 30/202)
keep_ratio: 0
allow_stretch: 1
SmallField:
text: 'Direction: [color=9aba2fff]' + app.CurrentConditions.Obs['WindDir'][0] + app.CurrentConditions.Obs['WindDir'][1] + '[/color]'
pos_hint: {'x': 119/262, 'y': 6/202}
size_hint: (135/262, 17/202)
text_size: self.size
halign: 'right'
## Rapid wind direction in degrees
MediumField:
text: app.CurrentConditions.Obs['rapidDir'][0] + app.CurrentConditions.Obs['rapidDir'][1]
pos_hint: {'x': 102/262, 'y': 116/202}
size_hint_x: (60/262)
## Rapid wind speed
MediumField:
text: app.CurrentConditions.Obs['rapidSpd'][0] + ' [size=' + str(int(self.font_size*0.8)) + ']' + app.CurrentConditions.Obs['rapidSpd'][1] + '[/size]'
pos_hint: {'x': 92/262, 'y': 89/202}
size_hint_x: (80/262)
## Rapid wind direction text
SmallField:
text: app.CurrentConditions.Obs['rapidDir'][3]
pos_hint: {'x': 92/262, 'y': 65.5/202}
size_hint_x: (80/262)
<WindSpeedButton>:
PanelButton:
text: 'Wind Speed'
on_release: app.CurrentConditions.switchPanel(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment