public
Last active

Set up some Christmas lights to pulse depending on keywords being made on twitter

  • Download Gist
LICENSE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
Copyright (c) Andrew Fisher and individual contributors.
All rights reserved.
 
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
 
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
 
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
 
3. Neither the name of ajfisher-lights nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
 
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
config.py.sample
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
# config for twitter.
 
# put your twitter keys in here
consumer_key = ""
consumer_secret=""
access_token = ""
access_token_secret = ""
 
# set up the various keywords and their mapping.
# Each of these is a bauble on the tree. The light node
# specifies which arduino to use.
# Period is the time to illuminate
# kewords is a list of twitter keywords to track for this bauble
# channel is the specific PWM channel to be used
# Debug prints data to stdout when it finds a keyword it should illuminate on
keyword_set = {
"tree": {
"light_node": 0,
"light_period": 500,
"keywords": ["xmas tree", "christmas tree",],
"channel": 0,
"debug": False,
},
 
"snowflake": {
"light_node": 0,
"light_period": 5000,
"keywords": ["snowflake", "snow flake", "white christmas",],
"channel": 1,
"debug": False,
},
 
"santa": {
"light_node": 0,
"light_period": 500,
"keywords": ["santa claus", "father christmas",],
"channel": 2,
"debug": False,
},
"family": {
"light_node": 0,
"light_period": 5000,
"keywords": ["family christmas",],
"channel": 3,
"debug": False,
},
"present": {
"light_node": 1,
"light_period": 1000,
"keywords": ["xmas present", "christmas present",],
"channel": 0,
"debug": False,
},
 
"rudolph": {
"light_node": 1,
"light_period": 1000,
"keywords": ["rudolph",],
"channel": 1,
"debug": False,
},
 
"love": {
"light_node": 1,
"light_period": 5000,
"keywords": ["love christmas","love xmas"],
"channel": 2,
"debug": False,
},
 
"xmas": {
"light_node": 1,
"light_period": 1000,
"keywords": ["merry xmas",],
"channel": 3,
"debug": False,
},
}
generic_serial_pulser/generic_serial_pulser.ino
Arduino
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
//board:duemilanove
// Arduino serial pulser.
 
// Pulses a PWM pin supplied over Serial for the duration that was sent.
#include <stdint.h>
 
//#define DEBUG
#define CHANNEL0 9
#define CHANNEL1 6
#define CHANNEL2 5
#define CHANNEL3 3
 
#define NO_CHANNELS 4
 
#define DECAY_TIME 8 // number of cycles to run before doing a decay loop
 
#define UINT16_MAX (65535U)
 
 
#define NODE 1 // used to define which node we're looking at. TODO move this to EEPROM and burn in
 
byte channels[] = {CHANNEL0, CHANNEL1, CHANNEL2, CHANNEL3};
 
long channel_time_values[NO_CHANNELS]; // used to hold the time to stay lit
uint8_t channel_light_val[NO_CHANNELS]; // used to hold the light value
 
enum LIGHT_DIRECTION {
FADE_DOWN = -1,
HOLD = 0,
FADE_UP = 1
} channel_light_dir[NO_CHANNELS]; // used to hold the direction
 
uint8_t current_channel = 0;
 
long current_value = 0;
 
enum STATE {
WAITING_DATA,
WAITING_VALUE,
DISCLOSE_NODE
} state = WAITING_DATA;
 
 
 
void setup() {
 
// we're going to be talking to the serial line so best we open it.
Serial.begin(9600);
Serial.print("Lightnode:");
Serial.println(NODE);
Serial.flush();
#ifdef DEBUG
Serial.println("Serial pulser.");
Serial.println("Send a channel number then a duration in msec");
Serial.println("Send CR to complete");
#endif
 
// set up the PWM pins for use.
for (uint8_t i=0; i< NO_CHANNELS; i++) {
pinMode(channels[i], OUTPUT);
digitalWrite(channels[i], LOW);
channel_light_val[i] = 0;
}
}
 
 
void loop() {
 
// check to see if anything is coming in on the serial line and if there
// is then process it.
 
if(Serial.available()){
 
char ch = Serial.read();
if (ch >= '0' && ch<= '9') {
if (state == WAITING_DATA) {
current_channel = (current_channel * 10) + (ch - '0');
#ifdef DEBUG
Serial.print("Got a char: ");
Serial.println(ch);
#endif
} else if (state == WAITING_VALUE) {
current_value = (current_value * 10) + (ch - '0');
}
} else if (ch == 32) { // space so we split the channel stuff.
if (state == WAITING_DATA) {
state = WAITING_VALUE;
#ifdef DEBUG
Serial.println("Got a space");
#endif
}
} else if (ch == 'n') {
//channel has asked to disclose our name
state = DISCLOSE_NODE;
} else if (ch == 10) {
// newline so end of val.
if (state == DISCLOSE_NODE) {
Serial.print("Lightnode:");
Serial.println(NODE);
Serial.flush();
state = WAITING_DATA;
} else if (state == WAITING_VALUE) {
state = WAITING_DATA;
#ifdef DEBUG
Serial.print("Got a new line ");
Serial.print("Channel: ");
Serial.print(current_channel);
Serial.print("msecs: ");
Serial.println(current_value);
#endif
pulse(current_channel, current_value);
current_value = 0;
current_channel = 0;
}
}
}
 
// decay the values on the channels appropriately.
 
for (int i = 0; i < NO_CHANNELS; i++) {
channel_time_values[i]--;
if (channel_time_values[i] < 0) {
channel_time_values[i] = 0;
if (channel_light_val[i] > 0 && channel_light_dir[i] == HOLD) {
channel_light_dir[i] = FADE_DOWN;
#ifdef DEBUG
Serial.print("Fading down channel: ");
Serial.println(i);
#endif
}
}
 
if (channel_light_dir[i] == FADE_UP) {
if (channel_light_val[i] < 255) {
channel_light_val[i] += channel_light_dir[i];
} else {
channel_light_dir[i] = HOLD;
#ifdef DEBUG
Serial.print("HOLDING HIGH channel: ");
Serial.println(i);
#endif
}
} else if (channel_light_dir[i] == FADE_DOWN) {
if (channel_light_val[i] > 0) {
channel_light_val[i] += channel_light_dir[i];
} else {
channel_light_dir[i] = HOLD;
#ifdef DEBUG
Serial.print("HOLDING LOW channel: ");
Serial.println(i);
#endif
}
}
// now we actually know what the value should be write it to the pin.
analogWrite(channels[i], channel_light_val[i]);
}
 
delay(1);
 
}
 
void pulse(uint8_t channel, uint16_t msecs ) {
// this function adds the value of a pulse onto the specified channel
 
if ((channel_time_values[channel]+ msecs) > UINT16_MAX ) {
channel_time_values[channel] = UINT16_MAX;
} else {
channel_time_values[channel] += msecs;
#ifdef DEBUG
Serial.print("Setting time value to: ");
Serial.println(channel_time_values[channel]);
#endif
}
channel_light_dir[channel] = FADE_UP;
 
#ifdef DEBUG
Serial.print("Fading up channel: ");
Serial.println(channel);
#endif
}
requirements.txt
1 2 3 4 5
argparse==1.2.1
distribute==0.6.24
pyserial==2.6
tweepy==1.12
wsgiref==0.1.2
twitter-lights.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
# Twitter connector to talk to the arduino driven LEDs over serial.
 
import sys
from time import sleep
import threading
 
import serial
import tweepy
 
from config import *
 
INTERFACES = ("/dev/ttyUSB", "/dev/ttyACM")
BAUD_RATE = 9600
 
NO_INTERFACES = 4
 
HEADER_STRING = "Lightnode:"
 
conns = {} #has all the serial connections
master_keywords = []
keyword_bauble_map = {} #sets up a map between the keywords as keys to specific baubles
 
baubles = []
 
class Bauble():
"""
This class represents one of the baubles. It contains a reference
to the serial node that controls it and the keywords associated with it
"""
 
light_period = 0 # msecs that the light should illuminate on keyword
light_node = None # which node are you looking at
channel = None # which PWM channel are you talking to
serial_conn = None # which serial connection is this bauble connected to
keywords = [] # keywords to track.
debug = False # used to determine to display tweet or not on this bauble
 
def __init__(self, light_period=500, light_node=0, channel=0, keywords=[],
connections = {}, debug = False):
"""
Set up the bauble with the various params set
"""
# set up the object
self.light_period = light_period
self.light_node = light_node
self.channel = channel
self.keywords = keywords
self.debug = debug
 
for serialport in connections:
if (connections[serialport]["light_node"] == self.light_node):
self.serial_conn = connections[serialport]["ser"]
 
def __str__(self):
return "%s on (LN)%s:(C)%s" % (self.keywords, self.light_node, self.channel)
def pulse(self):
"""
This function pulses the bauble according to the settings
"""
if self.serial_conn is not None:
self.serial_conn.write("%s %s\n" % (self.channel, self.light_period) )
else:
print "Tried to write to an invalid connection %s" % self.keywords
 
 
class XmasListener(tweepy.StreamListener):
"""
Class set up to listen to and handle the messages coming from twitter and
hitting the Serial connections associated with it
"""
 
keyword_mapping = None
 
def on_status(self, status):
 
if status.retweeted:
# get rid of any retweets
return
 
if (status.text[0:1] in ('@', 'R', '#')):
#gets rid of quoted RTs, @replies and # spam
return
 
 
# clean status up to work with
status.text = status.text.lower().strip('#:.,?!"')
 
# now we work out where to route it.
for keyword in self.keyword_mapping:
if keyword in status.text:
# we've found one
bauble = self.keyword_mapping[keyword]
if bauble.debug:
print "-----------GOT A TWEET --------------"
print status.text
print "Pulsing bauble %s for KW: %s" % (bauble, keyword)
bauble.pulse()
 
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error on %s with status code:' % (self.bauble, status_code)
return True # Don't kill the stream
 
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
 
def on_limit(self, track):
print "-------RATE LIMITED --------"
print "this filter %s just got rate limited" % track
sleep(5)
return True # don't kill the stream.
 
def initialise_interfaces():
"""
Iterates through the specified interfaces and then opens all the s.ports
and tries to ignore anything a bit wacky.
"""
 
for interface in INTERFACES:
for port in range(0, NO_INTERFACES):
serialport = "%s%s" % (interface, port)
 
try:
ser = serial.Serial(serialport, BAUD_RATE, timeout=5)
print "Connected to interface %s" % serialport
except serial.SerialException as e:
print e
continue;
 
# test the serial connection and make sure you get what we want.
ser.flush() # get the data now.
header = ser.readline()
if header in (None, ""):
print "Connection timed out. Let's try to get a name"
ser.write("n\n")
header = ser.readline()
 
print "We got this back: %s" % header
 
lightnode = None
if HEADER_STRING in header:
lightnode = header.split(':')[1]
try:
lightnode = int(lightnode)
except ValueError:
print "Total garbage off %s" % serialport
ser.close()
continue
else:
print "This is not the interface we're looking for: %s" % serialport
ser.close()
continue
 
conns[serialport] = {
"ser": ser,
"light_node": lightnode,
}
 
 
if __name__ == '__main__':
print "Connecting serial devices"
initialise_interfaces()
 
print "Connecting to twitter"
 
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
 
print "Setting up the baubles"
 
for keyword in keyword_set:
#print "Keyword: %s Set: %s" % (keyword, keyword_set[keyword])
item = keyword_set[keyword]
bauble = Bauble(
light_period = item["light_period"],
light_node = item["light_node"],
channel = item["channel"],
keywords = item["keywords"],
debug = item["debug"],
connections = conns,
)
 
# this is used to map the keywords themselves to the baubles. This
# makes it much faster to look up the keywords when we process the
# tweets on the listener - we also use a keyword set so we can look
# up a bit more broadly.
for key in item["keywords"]:
keyword_bauble_map[key] = bauble
 
baubles.append(bauble)
 
# set up the master keywords list so we have a big list to push
# to the tweepy streaming tracker
if master_keywords is None:
master_keywords = item["keywords"][:] ## add all items in a new list
else:
master_keywords.extend(item["keywords"])
 
print "Authed, tracking keywords"
 
# now we call the listener with all the baubles/
xmas_listener = XmasListener()
xmas_listener.keyword_mapping = keyword_bauble_map
 
sapi = tweepy.streaming.Stream(auth, xmas_listener)
 
run_app = True
while (run_app):
try:
sapi.filter(track=master_keywords)
 
except KeyboardInterrupt as kbd:
print "Keyboard interrupted - shutdown"
print "Disconnect Twitter"
sapi.disconnect()
 
print "Closing the serial connections"
for serialport in conns:
print "Closing: %s" % serialport
conns[serialport]["ser"].close()
 
run_app = False
 
except Exception as e:
print "There was a problem %s" % e
sapi.disconnect()
twitter-xmas-lights.md
Markdown

Twitter Controlled Christmas Lights

Author: Andrew Fisher Version: 0.1 Date: 19 December, 2012

Overview

This project was done to liven up my Christmas tree and make it a bit more responsive. It was inspired by something similar I saw on Instructables some time ago and decided to do on my own tree. The project gave me a good use case for a combination of a RaspberryPi as well as Arduinos being able to combine them together into a low cost and low-footprint embedded project that plays to the strengths of both.

TODO

  • Add a python script to be able to interrogate tweets coming off the stream api
  • Add threading to be able to process multiple keywords at once and send messages to serial
  • Set up a file with routing between keywords and specific lights

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.