Skip to content

Instantly share code, notes, and snippets.

@ChristianTremblay
Last active January 26, 2021 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChristianTremblay/4b38a42cb212e8ef69d2c55d41c5bb8a to your computer and use it in GitHub Desktop.
Save ChristianTremblay/4b38a42cb212e8ef69d2c55d41c5bb8a to your computer and use it in GitHub Desktop.
Weather BACnet device using OpenWeatherMap API
import atoma, requests
import BAC0
import re
from weatherlink import data, xref
from bacpypes.basetypes import EngineeringUnits, DateTime
from bacpypes.primitivedata import CharacterString, Date, Time
from BAC0.core.devices.local.models import (
analog_input,
datetime_value,
character_string,
)
from BAC0.core.devices.local.object import ObjectFactory
from BAC0.core.devices.local.models import make_state_text
from BAC0.tasks.RecurringTask import RecurringTask
import time
def start_device():
print("Starting BACnet device")
new_device = BAC0.lite(deviceId=10032, port=47809)
time.sleep(1)
# Analog Values
_new_objects = analog_input(
instance=0,
name="Inside_Temp",
description="most recent valid inside temp **(°F)**",
presentValue=0,
properties={"units": "degreesFahrenheit"},
)
_new_objects = analog_input(
instance=1,
name="Outside_Temp",
description="most recent valid temperature **(°F)**",
presentValue=0,
properties={"units": "degreesFahrenheit"},
)
_new_objects = analog_input(
instance=2,
name="Current_Pressure",
description="Current Pressure in bars",
presentValue=0,
properties={"units": "bars"},
)
_new_objects = analog_input(
instance=3,
name="Inside_Humidity",
description="most recent valid inside humidity **(%RH)**",
presentValue=0,
properties={"units": "percentRelativeHumidity"},
)
_new_objects = analog_input(
instance=4,
name="Outside_Humidity",
description="most recent valid humidity **(%RH)**",
presentValue=0,
properties={"units": "percentRelativeHumidity"},
)
_new_objects = analog_input(
instance=5,
name="Current_Wind_Speed",
description="Current Wind Speed in km/h",
presentValue=0,
properties={"units": "kilometersPerHour"},
)
_new_objects = analog_input(
instance=6,
name="Current_Wind_Speed_10_min_Avg",
description="average wind speed over last 10 min **(mph)**",
presentValue=0,
properties={"units": "milesPerHour"},
)
_new_objects = analog_input(
instance=7,
name="Wind_Dir",
description="Wind Direction",
presentValue=0,
properties={"units": "degreesAngular"},
)
_new_objects = analog_input(
instance=8,
name="rain_rate_clicks",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "noUnits"},
)
_new_objects = analog_input(
instance=9,
name="rain_rate_in",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "inches"},
)
_new_objects = analog_input(
instance=10,
name="rain_rate_mm",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "millimeters"},
)
_new_objects = analog_input(
instance=11,
name="rain_storm_clicks",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "noUnits"},
)
_new_objects = analog_input(
instance=12,
name="rain_storm_in",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "inches"},
)
_new_objects = analog_input(
instance=13,
name="rain_storm_mm",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "millimeters"},
)
_new_objects = analog_input(
instance=14,
name="rain_day_clicks",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "noUnits"},
)
_new_objects = analog_input(
instance=15,
name="rain_day_in",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "inches"},
)
_new_objects = analog_input(
instance=16,
name="rain_day_mm",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "millimeters"},
)
_new_objects = analog_input(
instance=17,
name="rain_month_clicks",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "noUnits"},
)
_new_objects = analog_input(
instance=18,
name="rain_month_in",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "inches"},
)
_new_objects = analog_input(
instance=19,
name="rain_month_mm",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "millimeters"},
)
_new_objects = analog_input(
instance=20,
name="rain_year_clicks",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "noUnits"},
)
_new_objects = analog_input(
instance=21,
name="rain_year_in",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "inches"},
)
_new_objects = analog_input(
instance=22,
name="rain_year_mm",
description="most recent valid rain rate **(counts/hour)**",
presentValue=0,
properties={"units": "millimeters"},
)
# DateTime Values
_new_objects = datetime_value(instance=1, name="Last_Update")
_new_objects.add_objects_to_application(new_device)
return new_device
class App:
dev = start_device()
_ = data()
@property
def meteo(self):
return data()
app = App()
def update():
# print(app.meteo.actual_conditions)
data = app.meteo
# Update values
try:
for k, v in xref.items():
app.dev[v].presentValue = data[k]
app.dev[v].statusFlags = [0, 0, 0, 0]
except:
app.dev[v].statusFlags = [0, 1, 0, 0]
app.dev["Last_Update"].presentValue.date = Date(data["ts"].date().isoformat())
app.dev["Last_Update"].presentValue.time = Time(data["ts"].time().isoformat())
def main():
task_device = RecurringTask(update, delay=60)
task_device.start()
while True:
time.sleep(0.001)
if __name__ == "__main__":
main()
#!/usr/bin/env python
from BAC0.core.utils.notes import note_and_log
import requests, json, time
from datetime import datetime
@note_and_log
class OpenWeather:
blank = {
"coord": {"lon": -92.11, "lat": 46.78},
"weather": [
{"id": 800, "main": "Unknown", "description": "Unknown", "icon": "01d"}
],
"base": "stations",
"main": {
"temp": 0,
"feels_like": 0,
"temp_min": 0,
"temp_max": 0,
"pressure": 0,
"humidity": 0,
},
"visibility": 0,
"wind": {"speed": 0, "deg": 0},
"clouds": {"all": 1},
"dt": 0,
"sys": {"type": 1, "id": 3903, "country": "US", "sunrise": 0, "sunset": 0},
"timezone": 0,
"id": 0,
"name": "Unknown",
"cod": 0,
}
def __init__(self, city="duluth", units="imperial"):
self.base_url, self.api_key = self.get_api_key_and_base_url().values()
self.city_name = city
self.complete_url = (
self.base_url
+ "appid="
+ self.api_key
+ "&q="
+ self.city_name
+ "&units="
+ units
)
self.data = None
self.update()
def update(self):
backup = self.data
try:
response = requests.get(self.complete_url)
#print(response.json())
self.data = response.json()
except Exception as error:
self._log.error("Error updating weather | {}".format(error))
if self.data is None:
self._log.warning("Using blank template")
return self.blank
self._log.warning("Using last time report")
return backup
return self.data
def get_api_key_and_base_url(self):
with open("api.key", "r") as file:
d = json.load(file)
return d["openweathermap"]
@property
def timestamp(self):
return datetime.fromtimestamp(self.data["dt"])
@property
def sunrise(self):
return datetime.fromtimestamp(self.data["sys"]["sunrise"])
@property
def sunset(self):
return datetime.fromtimestamp(self.data["sys"]["sunset"])
@property
def temp(self):
return self.data["main"]["temp"]
@property
def hum(self):
return self.data["main"]["humidity"]
@property
def press(self):
return self.data["main"]["pressure"]
@property
def windspd(self):
return self.data["wind"]["speed"]
@property
def winddir(self):
return self.data["wind"]["deg"]
@property
def cloudcov(self):
return self.data["clouds"]["all"]
@property
def descrip(self):
return self.data["weather"][0]["description"]
@property
def city(self):
return self.data["name"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment