Skip to content

Instantly share code, notes, and snippets.

@benemorius
Created February 6, 2017 18:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benemorius/f4bbd89191f7c73678b233dcfec30d09 to your computer and use it in GitHub Desktop.
Save benemorius/f4bbd89191f7c73678b233dcfec30d09 to your computer and use it in GitHub Desktop.
receives data from Argent 80422 and other sensors and sends it to Weewx over UDP
#!/usr/bin/python3
import threading
import Adafruit_ADS1x15
import RPi.GPIO as GPIO
import time
from datetime import datetime
import json
import socket
import Adafruit_BMP.BMP280 as BMP280
from aosong import am2315
class am2315_i2cfix(am2315.Sensor):
def pi_i2c_bus_number(self):
return 1
## Weewx
WEEWX_ADDRESS = '127.0.0.1'
WEEWX_PORT = '8888'
## GPIO
WIND_GPIO = 21
RAIN_GPIO = 20
## ADC
GAIN = 1
ADC_WIND_DIRECTION_CHANNEL = 1
adc = Adafruit_ADS1x15.ADS1115(address=0x49)
## wind
KMH_PER_TICK_SECOND = 2.4
wind_lock = threading.Semaphore()
# rain
MM_PER_TICK = 0.2794
rain_lock = threading.Semaphore()
# pressure
INHG_PER_PA = 0.0002953
bmp = BMP280.BMP280()
# humidity
am = am2315_i2cfix()
class DataManager(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, daemon=True, name="DataManager")
self.wind_ticks = 0
self.rain_ticks = 0
self.destination_address = WEEWX_ADDRESS
self.destination_port = int(WEEWX_PORT)
GPIO.setmode(GPIO.BCM)
GPIO.setup(WIND_GPIO, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(RAIN_GPIO, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(WIND_GPIO, GPIO.FALLING, callback=self._gpio_callback, bouncetime=10)
GPIO.add_event_detect(RAIN_GPIO, GPIO.FALLING, callback=self._gpio_callback, bouncetime=10)
def run(self):
self.last_wind_update = datetime.utcnow()
self.last_rain_update = datetime.utcnow()
while True:
humidity = self._get_humidity()
pressure_pa = self._get_pressure()
temperature_c = self._get_temperature()
enclosure_temperature_c = self._get_enclosure_temperature()
wind_direction = self._get_wind_direction()
wind_speed = self._get_wind_speed()
rain_fall = self._get_rain()
print_message = "wind speed: {:.1f} km/h, wind direction: {} deg, rain fall: {:.3f} mm, temperature: {} C, humidity: {:.1f}%, pressure: {:.0f} Pa, enclosure temperature: {:.2f} C".format(
wind_speed, wind_direction, rain_fall, temperature_c, humidity, pressure_pa, enclosure_temperature_c)
print(print_message)
observations = {}
observations['dateTime'] = time.time()
observations['usUnits'] = 'weewx.METRIC'
observations['pressure'] = pressure_pa * INHG_PER_PA * 33.863881588173335 # FIXME magic number
observations['outTemp'] = temperature_c
observations['windSpeed'] = wind_speed
# observations['windDir'] = wind_direction
observations['outHumidity'] = humidity
observations['rain'] = rain_fall
observations['inTemp'] = enclosure_temperature_c
packet = json.dumps(observations)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(bytes(packet, "utf-8"), (self.destination_address, self.destination_port))
time.sleep(2)
def _get_humidity(self):
try:
humidity = float(am.humidity())
except:
humidity = None
return humidity
def _get_temperature(self):
try:
temperature_c = float(am.temperature())
# temperature_c = float(bmp.read_temperature())
except:
temperature_c = None
return temperature_c
def _get_pressure(self):
try:
pressure_pa = float(bmp.read_pressure())
except:
pressure_pa = None
return pressure_pa
def _get_wind_direction(self):
try:
raw_voltage = adc.read_adc(ADC_WIND_DIRECTION_CHANNEL, gain=GAIN)
print("adc: {}".format(raw_voltage))
# TODO ugly nonlinear function with no equation provided by datasheet :(
degrees = 361
except:
degrees = None
return degrees
def _get_wind_speed(self):
with wind_lock:
now = datetime.utcnow()
seconds = (now - self.last_wind_update).total_seconds()
kmh = KMH_PER_TICK_SECOND * self.wind_ticks / seconds
self.last_wind_update = datetime.utcnow()
self.wind_ticks = 0
return float(kmh)
def _get_rain(self):
# FIXME certainly can't run this once per second as-is
with rain_lock:
now = datetime.utcnow()
seconds = (now - self.last_wind_update).total_seconds()
mm_per_second = MM_PER_TICK * self.rain_ticks / seconds
self.last_rain_update = datetime.utcnow()
self.rain_ticks = 0
return float(mm_per_second / 10)
def _get_enclosure_temperature(self):
try:
temperature_c = float(bmp.read_temperature())
except:
temperature_c = None
return temperature_c
def _gpio_callback(self, channel):
if channel == WIND_GPIO:
with wind_lock:
self.wind_ticks = self.wind_ticks + 1
elif channel == RAIN_GPIO:
with rain_lock:
self.rain_ticks = self.rain_ticks + 1
def main():
data_thread = DataManager()
data_thread.start()
data_thread.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment