Created
January 7, 2010 22:50
-
-
Save swinton/271686 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Have OS X play a song on Spotify via a Twitter request | |
This work is licensed under the Creative Commons Attribution-Noncommercial 2.0 UK: England & Wales License. To view a copy of this license, visit http://creativecommons.org/licenses/by-nc/2.0/uk/ or send a letter to Creative Commons, 171 Second Street, Suite 300, San Francisco, California, 94105, USA. | |
See: http://www.nixonmcinnes.co.uk/2010/01/07/crowd-sourcing-the-office-stereo-using-twitter-and-spotify/ | |
usage: | |
tweetify.py -u <user> [-p <password>] -t <comma-seperated list of keywords to track> | |
""" | |
# Uses the Twitter streaming API, Spotimeta (http://pypi.python.org/pypi/spotimeta/) and TweetStream (http://bitbucket.org/runeh/tweetstream/src/) | |
import anyjson, getopt, getpass, httplib, os, re, socket, spotimeta, subprocess, sys, tweetstream, urllib2 | |
from threading import Timer | |
from urllib import urlencode | |
# | |
# 'default_bufsize = 0' trick via http://blog.persistent.info/2009/08/twitter-streaming-api-from-python.html | |
# | |
socket._fileobject.default_bufsize = 0 | |
timer = None | |
requests = [] | |
urlopener = None | |
def init_urlopener(user, passwd): | |
""" | |
Initializes the urlopener, for tweeting purposes! | |
""" | |
global urlopener | |
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() | |
password_mgr.add_password(None, 'http://twitter.com', user, passwd) | |
handler = urllib2.HTTPBasicAuthHandler(password_mgr) | |
urlopener = urllib2.build_opener(handler) | |
def tweet(msg): | |
""" | |
Tweets the specified message, so long as the urlopener has been initialized | |
""" | |
global urlopener | |
if urlopener == None: | |
return | |
sys.stdout.write("Tweeting %s...\n" % msg) | |
urlopener.open('http://twitter.com/statuses/update.json', urlencode({'status' : msg})) | |
def get_spotl_url(uri): | |
""" | |
Returns a spo.tl URL, given a Spotify track URI | |
""" | |
try: | |
conn = urllib2.urlopen('http://spo.tl/api/1/shorten.json', data=urlencode({'uri' : uri})) | |
res = '' | |
for line in conn: | |
res += line | |
res = anyjson.deserialize(res) | |
if res['status'] == 'OK': | |
return 'http://spo.tl/%s' % res['result']['token'] | |
except Exception: | |
pass | |
return '' | |
def play(song): | |
""" | |
Play the song on Spotify | |
""" | |
global timer | |
sys.stdout.write("Playing %s...\n" % str(song)) | |
# | |
# Get the song info (from Spotify API) | |
# | |
try: | |
track = spotimeta.lookup(song[0]) | |
if track['result']['artist']['name'] == 'Rick Astley' and track['result']['name'] == 'Never Gonna Give You Up': | |
tweet("Ah ah ah, @%s no Rickrolling! :P" % song[1][0]) | |
raise Exception("No Rickrolling") | |
except Exception: | |
# | |
# Track not found, try and play next song | |
# (wait a tenth of a second to avoid being black-listed by Spotify API) | |
# | |
timer = Timer(0.1, next) | |
timer.start() | |
return | |
# | |
# Track found! Play it | |
# | |
# subprocess.call(['osascript', os.path.realpath('play_through_spotify.scpt'), song[0]]) | |
subprocess.call(['open', '-g', '/Applications/Spotify.app', song[0]]) | |
# | |
# Setup timer to play next song, set timer to expire at end of this song | |
# | |
timer = Timer(track['result']['length'], next) | |
timer.start() | |
# | |
# Tweet what's now playing | |
# | |
msg = 'Now playing "%s", by "%s", as requested by @%s %s' % (track['result']['name'], track['result']['artist']['name'], song[1][0], get_spotl_url(song[0])) | |
tweet(msg) | |
def next(): | |
""" | |
Plays next requested song | |
""" | |
global timer, requests | |
try: | |
play(requests.pop()) | |
except Exception: | |
# | |
# Nothing to play :( | |
# | |
timer = None | |
def queue(song): | |
""" | |
Adds a requested track to the request queue | |
""" | |
global requests | |
sys.stdout.write("Queueing %s...\n" % str(song)) | |
requests.insert(0, song) | |
def listen(user, passwd, keywords): | |
""" | |
Listen to what songs should be played and queue them | |
""" | |
global timer | |
# | |
# Announce what we're tracking | |
# | |
sys.stdout.write("Tracking %s...\n" % ", ".join(keywords)) | |
# | |
# Connect to stream | |
# | |
try: | |
stream = tweetstream.TrackStream(user, passwd, keywords) | |
# | |
# Compile regex | |
# | |
regex = re.compile(r'\b(spotify:track:\S+)\b') | |
# | |
# Loop through tweets | |
# | |
for tweet in stream: | |
# | |
# Extract the spotify track URI from the tweet | |
# | |
m = regex.search(tweet['text']) | |
if m == None: | |
# | |
# No Spotify track URI found, continue | |
# | |
continue | |
# | |
# Construct the 'song' from Spotify URI and requester info | |
# | |
song = (m.group(1), (tweet['user']['screen_name'], tweet['user']['name'])) | |
# | |
# Play the song immediately, or queue it | |
# | |
if timer == None: | |
# | |
# Nothing playing | |
# | |
play(song) | |
else: | |
# | |
# Something playing | |
# | |
queue(song) | |
# | |
# Handle keyboard interrupt, just exit gracefully | |
# | |
except KeyboardInterrupt: | |
sys.exit(0) | |
def usage(): | |
""" | |
Prints out _very_ basic usage instructions | |
""" | |
sys.stdout.write("tweetify.py -u <user> [-p <password>] -t <comma-seperated list of keywords to track>\n") | |
def main(argv): | |
try: | |
opts, args = getopt.getopt(argv, "u:p:t:", ["user=", "password=", "track="]) | |
except getopt.GetoptError: | |
usage() | |
sys.exit(2) | |
user = None | |
passwd = None | |
track = None | |
for o, a in opts: | |
if o in ("-u", "--user"): | |
user = a | |
elif o in ("-p", "--password"): | |
passwd = a | |
elif o in ("-t", "--track"): | |
track = a.split(",") | |
if user == None or track == None: | |
usage() | |
sys.exit(2) | |
if passwd == None: | |
passwd = getpass.getpass() | |
init_urlopener(user, passwd) | |
listen(user, passwd, track) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment