Skip to content

Instantly share code, notes, and snippets.

@xively-gists
Last active December 17, 2015 06:28
Show Gist options
  • Save xively-gists/5565335 to your computer and use it in GitHub Desktop.
Save xively-gists/5565335 to your computer and use it in GitHub Desktop.
Xively Raspberry Pi tutorial 1
#!/usr/bin/env python
import os
import xively
import subprocess
import time
import datetime
import requests
# extract feed_id and api_key from environment variables
FEED_ID = os.environ["FEED_ID"]
API_KEY = os.environ["API_KEY"]
DEBUG = os.environ["DEBUG"] or false
# initialize api client
api = xively.XivelyAPIClient(API_KEY)
# function to read 1 minute load average from system uptime command
def read_loadavg():
if DEBUG:
print "Reading load average"
return subprocess.check_output(["awk '{print $1}' /proc/loadavg"], shell=True)
# function to return a datastream object. This either creates a new datastream,
# or returns an existing one
def get_datastream(feed):
try:
datastream = feed.datastreams.get("load_avg")
if DEBUG:
print "Found existing datastream"
return datastream
except:
if DEBUG:
print "Creating new datastream"
datastream = feed.datastreams.create("load_avg", tags="load_01")
return datastream
# main program entry point - runs continuously updating our datastream with the
# current 1 minute load average
def run():
print "Starting Xively tutorial script"
feed = api.feeds.get(FEED_ID)
datastream = get_datastream(feed)
datastream.max_value = None
datastream.min_value = None
while True:
load_avg = read_loadavg()
if DEBUG:
print "Updating Xively feed with value: %s" % load_avg
datastream.current_value = load_avg
datastream.at = datetime.datetime.utcnow()
try:
datastream.update()
except requests.HTTPError as e:
print "HTTPError({0}): {1}".format(e.errno, e.strerror)
time.sleep(10)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment