Last active
October 7, 2017 10:06
-
-
Save balzer82/2c96d807242ab7338ba3 to your computer and use it in GitHub Desktop.
Tinkerforge Weather Station plus Tweepy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import socket | |
import sys | |
import time | |
from datetime import timedelta | |
try: | |
import tweepy | |
except: | |
print('Tweepy ist nicht installiert! Probier mal \'pip install tweepy\'') | |
import math | |
import logging as log | |
log.basicConfig(level=log.INFO) | |
try: | |
from tinkerforge.ip_connection import IPConnection | |
from tinkerforge.ip_connection import Error | |
from tinkerforge.brick_master import Master | |
from tinkerforge.bricklet_lcd_20x4 import LCD20x4 | |
from tinkerforge.bricklet_ambient_light import AmbientLight | |
from tinkerforge.bricklet_humidity import Humidity | |
from tinkerforge.bricklet_barometer import Barometer | |
except: | |
print('Tinkerforge Python Bindings noch nicht installiert. Probier mal \'pip install tinkerforge\'') | |
class WeatherStation: | |
HOST = "localhost" | |
PORT = 4223 | |
ipcon = None | |
lcd = None | |
al = None | |
hum = None | |
baro = None | |
def __init__(self): | |
self.ipcon = IPConnection() | |
while True: | |
try: | |
self.ipcon.connect(WeatherStation.HOST, WeatherStation.PORT) | |
break | |
except Error as e: | |
log.error('Connection Error: ' + str(e.description)) | |
time.sleep(1) | |
except socket.error as e: | |
log.error('Socket error: ' + str(e)) | |
time.sleep(1) | |
self.ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE, | |
self.cb_enumerate) | |
self.ipcon.register_callback(IPConnection.CALLBACK_CONNECTED, | |
self.cb_connected) | |
while True: | |
try: | |
self.ipcon.enumerate() | |
break | |
except Error as e: | |
log.error('Enumerate Error: ' + str(e.description)) | |
time.sleep(1) | |
# Init Weather Data Dict | |
self.data = {} | |
def cb_illuminance(self, illuminance): | |
if self.lcd is not None: | |
text = 'Illuminanc %6.0f lx' % (illuminance/10.0) | |
self.lcd.write_line(0, 0, text) | |
#log.info('Write to line 0: ' + text) | |
# Fill the Data Dictionary | |
self.data['Helligkeit'] = '%+dLux' % (illuminance/10.0) | |
def cb_humidity(self, humidity): | |
if self.lcd is not None: | |
text = 'Humidity %6.0f %%' % (humidity/10.0) | |
self.lcd.write_line(1, 0, text) | |
#log.info('Write to line 1: ' + text) | |
# Fill the Data Dictionary | |
self.data['rel. Luftfeuchte'] = '%+d%%' % (humidity/10.0) | |
def cb_air_pressure(self, air_pressure): | |
if self.lcd is not None: | |
text = 'Air Press %7.0f mb' % (air_pressure/1000.0) | |
self.lcd.write_line(2, 0, text) | |
#log.info('Write to line 2: ' + text) | |
# Fill the Data Dictionary | |
self.data['Luftdruck'] = '%+dmbar' % (air_pressure/1000.0) | |
try: | |
temperature = self.baro.get_chip_temperature() | |
except Error as e: | |
log.error('Could not get temperature: ' + str(e.description)) | |
return | |
# \xDF == ° on LCD 20x4 charset | |
text = 'Temperature %5.0f \xDFC' % (temperature/100.0) | |
self.lcd.write_line(3, 0, text) | |
#log.info('Write to line 3: ' + text.replace('\xDF', '°')) | |
# Fill the Data Dictionary | |
self.data['Temp'] = '%+d%sC' % (temperature/100.0, unichr(176)) | |
def cb_enumerate(self, uid, connected_uid, position, hardware_version, | |
firmware_version, device_identifier, enumeration_type): | |
if enumeration_type == IPConnection.ENUMERATION_TYPE_CONNECTED or \ | |
enumeration_type == IPConnection.ENUMERATION_TYPE_AVAILABLE: | |
if device_identifier == LCD20x4.DEVICE_IDENTIFIER: | |
try: | |
self.lcd = LCD20x4(uid, self.ipcon) | |
self.lcd.clear_display() | |
self.lcd.backlight_on() | |
log.info('LCD 20x4 initialized') | |
except Error as e: | |
log.error('LCD 20x4 init failed: ' + str(e.description)) | |
self.lcd = None | |
elif device_identifier == AmbientLight.DEVICE_IDENTIFIER: | |
try: | |
self.al = AmbientLight(uid, self.ipcon) | |
self.al.set_illuminance_callback_period(1000) | |
self.al.register_callback(self.al.CALLBACK_ILLUMINANCE, | |
self.cb_illuminance) | |
log.info('Ambient Light initialized') | |
except Error as e: | |
log.error('Ambient Light init failed: ' + str(e.description)) | |
self.al = None | |
elif device_identifier == Humidity.DEVICE_IDENTIFIER: | |
try: | |
self.hum = Humidity(uid, self.ipcon) | |
self.hum.set_humidity_callback_period(1000) | |
self.hum.register_callback(self.hum.CALLBACK_HUMIDITY, | |
self.cb_humidity) | |
log.info('Humidity initialized') | |
except Error as e: | |
log.error('Humidity init failed: ' + str(e.description)) | |
self.hum = None | |
elif device_identifier == Barometer.DEVICE_IDENTIFIER: | |
try: | |
self.baro = Barometer(uid, self.ipcon) | |
self.baro.set_air_pressure_callback_period(1000) | |
self.baro.register_callback(self.baro.CALLBACK_AIR_PRESSURE, | |
self.cb_air_pressure) | |
log.info('Barometer initialized') | |
except Error as e: | |
log.error('Barometer init failed: ' + str(e.description)) | |
self.baro = None | |
def cb_connected(self, connected_reason): | |
if connected_reason == IPConnection.CONNECT_REASON_AUTO_RECONNECT: | |
log.info('Auto Reconnect') | |
while True: | |
try: | |
self.ipcon.enumerate() | |
break | |
except Error as e: | |
log.error('Enumerate Error: ' + str(e.description)) | |
time.sleep(1) | |
def init_twitter(): | |
CONSUMER_KEY = '' | |
CONSUMER_SECRET = '' | |
ACCESS_KEY = '' | |
ACCESS_SECRET = '' | |
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) | |
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET) | |
try: | |
api = tweepy.API(auth) | |
except Error as e: | |
log.error('Keine Verbindung zu Twitter: ' + str(e.description)) | |
return api | |
def tweetstats(twitter, tweetdata): | |
TIMEFORMAT = "%d.%m.%Y um %H:%MUhr" | |
timeStamp = time.strftime(TIMEFORMAT) | |
# Uptime aufm RaspberryPi ermitteln | |
# http://planzero.org/blog/2012/01/26/system_uptime_in_python,_a_better_way | |
''' | |
with open('/proc/uptime', 'r') as f: | |
uptime_seconds = float(f.readline().split()[0]) | |
uptime = str(timedelta(seconds = uptime_seconds)) | |
''' | |
message = 'Garten am %s:\n' % (timeStamp) | |
for key, val in tweetdata.iteritems(): | |
message += ' %s %s\n' % (val, key) | |
try: | |
log.info('Tweet: %s (%i Zeichen)' % (message, len(message))) | |
# http://docs.tweepy.org/en/latest/api.html#API.update_status | |
twitter.update_status(status=message, place_id='7b5986414898c846') | |
except Exception, e: | |
print 'Fehler:', e | |
if __name__ == "__main__": | |
log.info('Weather Station: Start') | |
weather_station = WeatherStation() | |
twitter = init_twitter() | |
tweetdata = weather_station.data | |
time.sleep(10) # let em fill all values of the Data Dict | |
while True: | |
if (int(time.strftime('%M')) % 10) == 0: # jede volle 10 Minuten | |
try: | |
tweetstats(twitter, tweetdata) | |
time.sleep(60*10-1) # knapp 10min nix machen | |
except Exception, e: | |
print 'Fehler beim Tweeten: ', e | |
if weather_station.ipcon != None: | |
weather_station.ipcon.disconnect() | |
log.info('Weather Station: End') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment