Skip to content

Instantly share code, notes, and snippets.

@ajfisher
Last active December 4, 2018 20:42
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ajfisher/4335388 to your computer and use it in GitHub Desktop.
Save ajfisher/4335388 to your computer and use it in GitHub Desktop.
Set up some Christmas lights to pulse depending on keywords being made on twitter
# config for twitter.
# put your twitter keys in here
consumer_key = ""
consumer_secret=""
access_token = ""
access_token_secret = ""
# set up the various keywords and their mapping.
# Each of these is a bauble on the tree. The light node
# specifies which arduino to use.
# Period is the time to illuminate
# kewords is a list of twitter keywords to track for this bauble
# channel is the specific PWM channel to be used
# Debug prints data to stdout when it finds a keyword it should illuminate on
keyword_set = {
"tree": {
"light_node": 0,
"light_period": 500,
"keywords": ["xmas tree", "christmas tree",],
"channel": 0,
"debug": False,
},
"snowflake": {
"light_node": 0,
"light_period": 5000,
"keywords": ["snowflake", "snow flake", "white christmas",],
"channel": 1,
"debug": False,
},
"santa": {
"light_node": 0,
"light_period": 500,
"keywords": ["santa claus", "father christmas",],
"channel": 2,
"debug": False,
},
"family": {
"light_node": 0,
"light_period": 5000,
"keywords": ["family christmas",],
"channel": 3,
"debug": False,
},
"present": {
"light_node": 1,
"light_period": 1000,
"keywords": ["xmas present", "christmas present",],
"channel": 0,
"debug": False,
},
"rudolph": {
"light_node": 1,
"light_period": 1000,
"keywords": ["rudolph",],
"channel": 1,
"debug": False,
},
"love": {
"light_node": 1,
"light_period": 5000,
"keywords": ["love christmas","love xmas"],
"channel": 2,
"debug": False,
},
"xmas": {
"light_node": 1,
"light_period": 1000,
"keywords": ["merry xmas",],
"channel": 3,
"debug": False,
},
}
//board:duemilanove
// Arduino serial pulser.
// Pulses a PWM pin supplied over Serial for the duration that was sent.
#include <stdint.h>
//#define DEBUG
#define CHANNEL0 9
#define CHANNEL1 6
#define CHANNEL2 5
#define CHANNEL3 3
#define NO_CHANNELS 4
#define DECAY_TIME 8 // number of cycles to run before doing a decay loop
#define UINT16_MAX (65535U)
#define NODE 1 // used to define which node we're looking at. TODO move this to EEPROM and burn in
byte channels[] = {CHANNEL0, CHANNEL1, CHANNEL2, CHANNEL3};
long channel_time_values[NO_CHANNELS]; // used to hold the time to stay lit
uint8_t channel_light_val[NO_CHANNELS]; // used to hold the light value
enum LIGHT_DIRECTION {
FADE_DOWN = -1,
HOLD = 0,
FADE_UP = 1
} channel_light_dir[NO_CHANNELS]; // used to hold the direction
uint8_t current_channel = 0;
long current_value = 0;
enum STATE {
WAITING_DATA,
WAITING_VALUE,
DISCLOSE_NODE
} state = WAITING_DATA;
void setup() {
// we're going to be talking to the serial line so best we open it.
Serial.begin(9600);
Serial.print("Lightnode:");
Serial.println(NODE);
Serial.flush();
#ifdef DEBUG
Serial.println("Serial pulser.");
Serial.println("Send a channel number then a duration in msec");
Serial.println("Send CR to complete");
#endif
// set up the PWM pins for use.
for (uint8_t i=0; i< NO_CHANNELS; i++) {
pinMode(channels[i], OUTPUT);
digitalWrite(channels[i], LOW);
channel_light_val[i] = 0;
}
}
void loop() {
// check to see if anything is coming in on the serial line and if there
// is then process it.
if(Serial.available()){
char ch = Serial.read();
if (ch >= '0' && ch<= '9') {
if (state == WAITING_DATA) {
current_channel = (current_channel * 10) + (ch - '0');
#ifdef DEBUG
Serial.print("Got a char: ");
Serial.println(ch);
#endif
} else if (state == WAITING_VALUE) {
current_value = (current_value * 10) + (ch - '0');
}
} else if (ch == 32) { // space so we split the channel stuff.
if (state == WAITING_DATA) {
state = WAITING_VALUE;
#ifdef DEBUG
Serial.println("Got a space");
#endif
}
} else if (ch == 'n') {
//channel has asked to disclose our name
state = DISCLOSE_NODE;
} else if (ch == 10) {
// newline so end of val.
if (state == DISCLOSE_NODE) {
Serial.print("Lightnode:");
Serial.println(NODE);
Serial.flush();
state = WAITING_DATA;
} else if (state == WAITING_VALUE) {
state = WAITING_DATA;
#ifdef DEBUG
Serial.print("Got a new line ");
Serial.print("Channel: ");
Serial.print(current_channel);
Serial.print("msecs: ");
Serial.println(current_value);
#endif
pulse(current_channel, current_value);
current_value = 0;
current_channel = 0;
}
}
}
// decay the values on the channels appropriately.
for (int i = 0; i < NO_CHANNELS; i++) {
channel_time_values[i]--;
if (channel_time_values[i] < 0) {
channel_time_values[i] = 0;
if (channel_light_val[i] > 0 && channel_light_dir[i] == HOLD) {
channel_light_dir[i] = FADE_DOWN;
#ifdef DEBUG
Serial.print("Fading down channel: ");
Serial.println(i);
#endif
}
}
if (channel_light_dir[i] == FADE_UP) {
if (channel_light_val[i] < 255) {
channel_light_val[i] += channel_light_dir[i];
} else {
channel_light_dir[i] = HOLD;
#ifdef DEBUG
Serial.print("HOLDING HIGH channel: ");
Serial.println(i);
#endif
}
} else if (channel_light_dir[i] == FADE_DOWN) {
if (channel_light_val[i] > 0) {
channel_light_val[i] += channel_light_dir[i];
} else {
channel_light_dir[i] = HOLD;
#ifdef DEBUG
Serial.print("HOLDING LOW channel: ");
Serial.println(i);
#endif
}
}
// now we actually know what the value should be write it to the pin.
analogWrite(channels[i], channel_light_val[i]);
}
delay(1);
}
void pulse(uint8_t channel, uint16_t msecs ) {
// this function adds the value of a pulse onto the specified channel
if ((channel_time_values[channel]+ msecs) > UINT16_MAX ) {
channel_time_values[channel] = UINT16_MAX;
} else {
channel_time_values[channel] += msecs;
#ifdef DEBUG
Serial.print("Setting time value to: ");
Serial.println(channel_time_values[channel]);
#endif
}
channel_light_dir[channel] = FADE_UP;
#ifdef DEBUG
Serial.print("Fading up channel: ");
Serial.println(channel);
#endif
}
Copyright (c) Andrew Fisher and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of ajfisher-lights nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
argparse==1.2.1
distribute==0.6.24
pyserial==2.6
tweepy==1.12
wsgiref==0.1.2
# Twitter connector to talk to the arduino driven LEDs over serial.
import sys
from time import sleep
import threading
import serial
import tweepy
from config import *
INTERFACES = ("/dev/ttyUSB", "/dev/ttyACM")
BAUD_RATE = 9600
NO_INTERFACES = 4
HEADER_STRING = "Lightnode:"
conns = {} #has all the serial connections
master_keywords = []
keyword_bauble_map = {} #sets up a map between the keywords as keys to specific baubles
baubles = []
class Bauble():
"""
This class represents one of the baubles. It contains a reference
to the serial node that controls it and the keywords associated with it
"""
light_period = 0 # msecs that the light should illuminate on keyword
light_node = None # which node are you looking at
channel = None # which PWM channel are you talking to
serial_conn = None # which serial connection is this bauble connected to
keywords = [] # keywords to track.
debug = False # used to determine to display tweet or not on this bauble
def __init__(self, light_period=500, light_node=0, channel=0, keywords=[],
connections = {}, debug = False):
"""
Set up the bauble with the various params set
"""
# set up the object
self.light_period = light_period
self.light_node = light_node
self.channel = channel
self.keywords = keywords
self.debug = debug
for serialport in connections:
if (connections[serialport]["light_node"] == self.light_node):
self.serial_conn = connections[serialport]["ser"]
def __str__(self):
return "%s on (LN)%s:(C)%s" % (self.keywords, self.light_node, self.channel)
def pulse(self):
"""
This function pulses the bauble according to the settings
"""
if self.serial_conn is not None:
self.serial_conn.write("%s %s\n" % (self.channel, self.light_period) )
else:
print "Tried to write to an invalid connection %s" % self.keywords
class XmasListener(tweepy.StreamListener):
"""
Class set up to listen to and handle the messages coming from twitter and
hitting the Serial connections associated with it
"""
keyword_mapping = None
def on_status(self, status):
if status.retweeted:
# get rid of any retweets
return
if (status.text[0:1] in ('@', 'R', '#')):
#gets rid of quoted RTs, @replies and # spam
return
# clean status up to work with
status.text = status.text.lower().strip('#:.,?!"')
# now we work out where to route it.
for keyword in self.keyword_mapping:
if keyword in status.text:
# we've found one
bauble = self.keyword_mapping[keyword]
if bauble.debug:
print "-----------GOT A TWEET --------------"
print status.text
print "Pulsing bauble %s for KW: %s" % (bauble, keyword)
bauble.pulse()
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error on %s with status code:' % (self.bauble, status_code)
return True # Don't kill the stream
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
def on_limit(self, track):
print "-------RATE LIMITED --------"
print "this filter %s just got rate limited" % track
sleep(5)
return True # don't kill the stream.
def initialise_interfaces():
"""
Iterates through the specified interfaces and then opens all the s.ports
and tries to ignore anything a bit wacky.
"""
for interface in INTERFACES:
for port in range(0, NO_INTERFACES):
serialport = "%s%s" % (interface, port)
try:
ser = serial.Serial(serialport, BAUD_RATE, timeout=5)
print "Connected to interface %s" % serialport
except serial.SerialException as e:
print e
continue;
# test the serial connection and make sure you get what we want.
ser.flush() # get the data now.
header = ser.readline()
if header in (None, ""):
print "Connection timed out. Let's try to get a name"
ser.write("n\n")
header = ser.readline()
print "We got this back: %s" % header
lightnode = None
if HEADER_STRING in header:
lightnode = header.split(':')[1]
try:
lightnode = int(lightnode)
except ValueError:
print "Total garbage off %s" % serialport
ser.close()
continue
else:
print "This is not the interface we're looking for: %s" % serialport
ser.close()
continue
conns[serialport] = {
"ser": ser,
"light_node": lightnode,
}
if __name__ == '__main__':
print "Connecting serial devices"
initialise_interfaces()
print "Connecting to twitter"
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
print "Setting up the baubles"
for keyword in keyword_set:
#print "Keyword: %s Set: %s" % (keyword, keyword_set[keyword])
item = keyword_set[keyword]
bauble = Bauble(
light_period = item["light_period"],
light_node = item["light_node"],
channel = item["channel"],
keywords = item["keywords"],
debug = item["debug"],
connections = conns,
)
# this is used to map the keywords themselves to the baubles. This
# makes it much faster to look up the keywords when we process the
# tweets on the listener - we also use a keyword set so we can look
# up a bit more broadly.
for key in item["keywords"]:
keyword_bauble_map[key] = bauble
baubles.append(bauble)
# set up the master keywords list so we have a big list to push
# to the tweepy streaming tracker
if master_keywords is None:
master_keywords = item["keywords"][:] ## add all items in a new list
else:
master_keywords.extend(item["keywords"])
print "Authed, tracking keywords"
# now we call the listener with all the baubles/
xmas_listener = XmasListener()
xmas_listener.keyword_mapping = keyword_bauble_map
sapi = tweepy.streaming.Stream(auth, xmas_listener)
run_app = True
while (run_app):
try:
sapi.filter(track=master_keywords)
except KeyboardInterrupt as kbd:
print "Keyboard interrupted - shutdown"
print "Disconnect Twitter"
sapi.disconnect()
print "Closing the serial connections"
for serialport in conns:
print "Closing: %s" % serialport
conns[serialport]["ser"].close()
run_app = False
except Exception as e:
print "There was a problem %s" % e
sapi.disconnect()

Twitter Controlled Christmas Lights

Author: Andrew Fisher Version: 0.1 Date: 19 December, 2012

Overview

This project was done to liven up my Christmas tree and make it a bit more responsive. It was inspired by something similar I saw on Instructables some time ago and decided to do on my own tree. The project gave me a good use case for a combination of a RaspberryPi as well as Arduinos being able to combine them together into a low cost and low-footprint embedded project that plays to the strengths of both.

TODO

  • Add a python script to be able to interrogate tweets coming off the stream api
  • Add threading to be able to process multiple keywords at once and send messages to serial
  • Set up a file with routing between keywords and specific lights
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment