Skip to content

Instantly share code, notes, and snippets.

@balzer82
Last active October 7, 2017 10:06
Show Gist options
  • Save balzer82/2c96d807242ab7338ba3 to your computer and use it in GitHub Desktop.
Save balzer82/2c96d807242ab7338ba3 to your computer and use it in GitHub Desktop.
Tinkerforge Weather Station plus Tweepy
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import sys
import time
from datetime import timedelta
try:
import tweepy
except:
print('Tweepy ist nicht installiert! Probier mal \'pip install tweepy\'')
import math
import logging as log
log.basicConfig(level=log.INFO)
try:
from tinkerforge.ip_connection import IPConnection
from tinkerforge.ip_connection import Error
from tinkerforge.brick_master import Master
from tinkerforge.bricklet_lcd_20x4 import LCD20x4
from tinkerforge.bricklet_ambient_light import AmbientLight
from tinkerforge.bricklet_humidity import Humidity
from tinkerforge.bricklet_barometer import Barometer
except:
print('Tinkerforge Python Bindings noch nicht installiert. Probier mal \'pip install tinkerforge\'')
class WeatherStation:
HOST = "localhost"
PORT = 4223
ipcon = None
lcd = None
al = None
hum = None
baro = None
def __init__(self):
self.ipcon = IPConnection()
while True:
try:
self.ipcon.connect(WeatherStation.HOST, WeatherStation.PORT)
break
except Error as e:
log.error('Connection Error: ' + str(e.description))
time.sleep(1)
except socket.error as e:
log.error('Socket error: ' + str(e))
time.sleep(1)
self.ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE,
self.cb_enumerate)
self.ipcon.register_callback(IPConnection.CALLBACK_CONNECTED,
self.cb_connected)
while True:
try:
self.ipcon.enumerate()
break
except Error as e:
log.error('Enumerate Error: ' + str(e.description))
time.sleep(1)
# Init Weather Data Dict
self.data = {}
def cb_illuminance(self, illuminance):
if self.lcd is not None:
text = 'Illuminanc %6.0f lx' % (illuminance/10.0)
self.lcd.write_line(0, 0, text)
#log.info('Write to line 0: ' + text)
# Fill the Data Dictionary
self.data['Helligkeit'] = '%+dLux' % (illuminance/10.0)
def cb_humidity(self, humidity):
if self.lcd is not None:
text = 'Humidity %6.0f %%' % (humidity/10.0)
self.lcd.write_line(1, 0, text)
#log.info('Write to line 1: ' + text)
# Fill the Data Dictionary
self.data['rel. Luftfeuchte'] = '%+d%%' % (humidity/10.0)
def cb_air_pressure(self, air_pressure):
if self.lcd is not None:
text = 'Air Press %7.0f mb' % (air_pressure/1000.0)
self.lcd.write_line(2, 0, text)
#log.info('Write to line 2: ' + text)
# Fill the Data Dictionary
self.data['Luftdruck'] = '%+dmbar' % (air_pressure/1000.0)
try:
temperature = self.baro.get_chip_temperature()
except Error as e:
log.error('Could not get temperature: ' + str(e.description))
return
# \xDF == ° on LCD 20x4 charset
text = 'Temperature %5.0f \xDFC' % (temperature/100.0)
self.lcd.write_line(3, 0, text)
#log.info('Write to line 3: ' + text.replace('\xDF', '°'))
# Fill the Data Dictionary
self.data['Temp'] = '%+d%sC' % (temperature/100.0, unichr(176))
def cb_enumerate(self, uid, connected_uid, position, hardware_version,
firmware_version, device_identifier, enumeration_type):
if enumeration_type == IPConnection.ENUMERATION_TYPE_CONNECTED or \
enumeration_type == IPConnection.ENUMERATION_TYPE_AVAILABLE:
if device_identifier == LCD20x4.DEVICE_IDENTIFIER:
try:
self.lcd = LCD20x4(uid, self.ipcon)
self.lcd.clear_display()
self.lcd.backlight_on()
log.info('LCD 20x4 initialized')
except Error as e:
log.error('LCD 20x4 init failed: ' + str(e.description))
self.lcd = None
elif device_identifier == AmbientLight.DEVICE_IDENTIFIER:
try:
self.al = AmbientLight(uid, self.ipcon)
self.al.set_illuminance_callback_period(1000)
self.al.register_callback(self.al.CALLBACK_ILLUMINANCE,
self.cb_illuminance)
log.info('Ambient Light initialized')
except Error as e:
log.error('Ambient Light init failed: ' + str(e.description))
self.al = None
elif device_identifier == Humidity.DEVICE_IDENTIFIER:
try:
self.hum = Humidity(uid, self.ipcon)
self.hum.set_humidity_callback_period(1000)
self.hum.register_callback(self.hum.CALLBACK_HUMIDITY,
self.cb_humidity)
log.info('Humidity initialized')
except Error as e:
log.error('Humidity init failed: ' + str(e.description))
self.hum = None
elif device_identifier == Barometer.DEVICE_IDENTIFIER:
try:
self.baro = Barometer(uid, self.ipcon)
self.baro.set_air_pressure_callback_period(1000)
self.baro.register_callback(self.baro.CALLBACK_AIR_PRESSURE,
self.cb_air_pressure)
log.info('Barometer initialized')
except Error as e:
log.error('Barometer init failed: ' + str(e.description))
self.baro = None
def cb_connected(self, connected_reason):
if connected_reason == IPConnection.CONNECT_REASON_AUTO_RECONNECT:
log.info('Auto Reconnect')
while True:
try:
self.ipcon.enumerate()
break
except Error as e:
log.error('Enumerate Error: ' + str(e.description))
time.sleep(1)
def init_twitter():
# Twitter
CONSUMER_KEY = ''
CONSUMER_SECRET = ''
ACCESS_KEY = ''
ACCESS_SECRET = ''
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
try:
api = tweepy.API(auth)
except Error as e:
log.error('Keine Verbindung zu Twitter: ' + str(e.description))
return api
def tweetstats(twitter, tweetdata):
TIMEFORMAT = "%d.%m.%Y um %H:%MUhr"
timeStamp = time.strftime(TIMEFORMAT)
# Uptime aufm RaspberryPi ermitteln
# http://planzero.org/blog/2012/01/26/system_uptime_in_python,_a_better_way
'''
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
uptime = str(timedelta(seconds = uptime_seconds))
'''
message = 'Garten am %s:\n' % (timeStamp)
for key, val in tweetdata.iteritems():
message += ' %s %s\n' % (val, key)
try:
log.info('Tweet: %s (%i Zeichen)' % (message, len(message)))
# http://docs.tweepy.org/en/latest/api.html#API.update_status
twitter.update_status(status=message, place_id='7b5986414898c846')
except Exception, e:
print 'Fehler:', e
if __name__ == "__main__":
log.info('Weather Station: Start')
weather_station = WeatherStation()
twitter = init_twitter()
tweetdata = weather_station.data
time.sleep(10) # let em fill all values of the Data Dict
while True:
if (int(time.strftime('%M')) % 10) == 0: # jede volle 10 Minuten
try:
tweetstats(twitter, tweetdata)
time.sleep(60*10-1) # knapp 10min nix machen
except Exception, e:
print 'Fehler beim Tweeten: ', e
if weather_station.ipcon != None:
weather_station.ipcon.disconnect()
log.info('Weather Station: End')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment