Skip to content

Instantly share code, notes, and snippets.

@linguinee
Last active September 18, 2015 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linguinee/e5585aa360f21007df53 to your computer and use it in GitHub Desktop.
Save linguinee/e5585aa360f21007df53 to your computer and use it in GitHub Desktop.
Add music to a Spotify playlist from past Hangouts messages.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
import os
import re
import sys
from apiclient.discovery import build
from apiclient.errors import HttpError as YouTubeHTTPError
from requests.exceptions import HTTPError as SoundcloudHTTPError
from spotipy.client import SpotifyException
import soundcloud
import spotipy
import spotipy.util
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
SPOTIFY_PLAYLIST_ID = os.environ["SPOTIFY_PLAYLIST_ID"]
SPOTIFY_REDIRECT_URI = os.environ["SPOTIFY_REDIRECT_URI"]
SPOTIFY_TOKEN = os.environ["SPOTIFY_TOKEN"]
SPOTIFY_USER = os.environ["SPOTIFY_USER"]
SOUNDCLOUD_CLIENT_ID = os.environ["SOUNDCLOUD_CLIENT_ID"]
YOUTUBE_API_KEY = os.environ["YOUTUBE_API_KEY"]
class SpotifyTrack:
def __init__(self, track_id, track_name, track_artist):
self.id = track_id
self.name = track_name
self.artist = track_artist
if len(sys.argv) == 3:
hangouts_file = sys.argv[1]
conversation_id = sys.argv[2]
else:
print "Usage: %s hangouts_file conversation_id" % (sys.argv[0],)
sys.exit()
with open(hangouts_file) as data:
hangouts = json.load(data)
conversation = None
conversations = hangouts["conversation_state"]
for c in conversations:
if c["conversation_id"]["id"] == conversation_id:
conversation = c["conversation_state"]["event"]
break
if not conversation:
print "Could not find conversation with id %s" % (conversation_id)
sys.exit()
# Soundcloud and YouTube are read-only.
soundcloud_client = soundcloud.Client(client_id=SOUNDCLOUD_CLIENT_ID)
youtube_client = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
# Spotify needs user auth in order to modify the playlists.
if SPOTIFY_TOKEN:
spotify_client = spotipy.Spotify(auth=SPOTIFY_TOKEN)
else:
spotify_token = spotipy.util.prompt_for_user_token(
SPOTIFY_USER,
scope="playlist-modify-public playlist-modify-private",
client_id=SPOTIFY_CLIENT_ID,
client_secret=SPOTIFY_CLIENT_SECRET,
redirect_uri=SPOTIFY_REDIRECT_URI)
print "SPOTIFY_TOKEN=" + spotify_token
spotify_client = spotipy.Spotify(auth=spotify_token)
def main():
for message in conversation:
if message["event_type"] == "REGULAR_CHAT_MESSAGE":
try:
s = message["chat_message"]["message_content"]["segment"]
except KeyError as e:
continue # embed, not an actual message
process_message(s)
def process_message(segments):
spotify_remove = False
# Messages are divided into segments, with links separate from text.
for segment in segments:
# In text segments, only remove and query commands matter.
if segment["type"] == "TEXT":
q = segment["text"]
if q == "/bot spotify remove ":
spotify_remove = True
continue
elif ("/bot spotify " in q and q != "/bot spotify on" and
q != "/bot spotify off" and q != "/bot spotify playlist" and
q != "/bot spotify remove" and q != "/bot spotify help"):
query = re.split("/bot spotify\s+", q)
if len(query) > 1 and query[1]:
add_to_spotify(query[1])
# Check for music links in link segments.
elif segment["type"] == "LINK":
link = extract_music_link(segment["text"])
if link:
if spotify_remove and "spotify" in link:
remove_from_playlist(link)
spotify_remove = False
else:
if "spotify" in link:
t = spotify_client.track(link)
track = SpotifyTrack(
t["id"], t["name"], t["artists"][0]["name"])
add_to_playlist(track)
else:
if "youtube" in link or "youtu.be" in link:
query = title_from_youtube(link)
elif "soundcloud" in link:
query = title_from_soundcloud(link)
else:
continue
if query:
add_to_spotify(query)
else:
print "Unable to get the song title for {}.".format(link)
def extract_music_link(link):
m = re.compile(("(https?://)?([a-z0-9.]*?\.)?(youtube.com/|youtu.be/|"
"soundcloud.com/|spotify.com/track/)"
"([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])"))
if m.match(link):
return link if re.match("https?://", link) else "https://" + link
else:
return ""
def add_to_spotify(query):
"""Searches Spotify for the query and adds the first search result
to the playlist. Returns a status string."""
track = search_spotify(query)
if track:
print add_to_playlist(track)
else:
print "No tracks found for '{}'.".format(query)
def search_spotify(query):
"""Searches spotify for the cleaned query and returns the first search
result, if one exists."""
bl_following = ["official", "with", "prod", "by", "from"]
bl_remove = ["freestyle", "acoustic", "original", "&"]
bl_contains = ["live", "session", "sessions", "edit", "premiere", "cover",
"lyric", "lyrics", "records", "release", "video", "audio",
"in the open"]
gs = _clean(query)
result = _search(gs)
if result: return result
# Discard hashtags and mentions.
gs[:] = [" ".join(re.sub("(@[A-Za-z0-9]+)|(#[A-Za-z0-9]+)",
" ", g).split()) for g in gs]
# Discard everything in a group following certain words.
for b in bl_following:
gs[:] = [re.split(b, g, flags=re.IGNORECASE)[0] for g in gs]
result = _search(gs)
if result: return result
# Discard certain words.
for b in bl_remove:
match = re.compile(re.escape(b), re.IGNORECASE)
gs[:] = [match.sub("", g) for g in gs]
result = _search(gs)
if result: return result
# Aggressively discard groups.
gs[:] = [g for g in gs if not any(b in g.lower() for b in bl_contains)]
return _search(gs)
def _clean(query):
"""Splits the query into groups and attempts to remove extraneous groups
unrelated to the song title/artist. Returns a list of groups."""
# Blacklists.
bl_exact = ["official", "audio", "audio\s+stream", "lyric", "lyrics",
"with\s+lyrics?", "explicit", "clean", "explicit\s+version",
"clean\s+version", "original\s+version", "hq", "hd", "mv", "m/v",
"interscope", "4ad"]
bl_following = ["official\s+video", "official\s+music", "official\s+audio",
"official\s+lyric", "official\s+lyrics", "official\s+clip",
"video\s+lyric", "video\s+lyrics", "video\s+clip",
"full\s+video"]
# Split into groups.
gs = list(filter(
None,
re.split(u"\s*[-‐‒–—―−~\(\)\[\]\{\}\<\>\|‖¦:;‘’“”\"«»„‚‘]+\s*",
query)))
# Discard groups that match with anything in the blacklists.
gs[:] = [g for g in gs if g.lower() not in bl_exact]
for b in bl_following:
gs[:] = [re.split(b, g, flags=re.IGNORECASE)[0] for g in gs]
# Discard featured artists.
gs[:] = [re.split("(f(ea)?t(.|\s+))(?i)", g)[0] for g in gs]
return gs
def _search(groups):
try:
query = " ".join(filter(None, groups))
print "......Searching Spotify for {}".format(query)
results = spotify_client.search(query)
except SpotifyException as e:
print "Error when searching Spotify: {}".format(e)
return ""
if results["tracks"]["total"]:
t = results["tracks"]["items"][0]
return SpotifyTrack(t["id"], t["name"], t["artists"][0]["name"])
else:
return None
def add_to_playlist(track):
try:
spotify_client.user_playlist_remove_all_occurrences_of_tracks(
SPOTIFY_USER, SPOTIFY_PLAYLIST_ID, [track.id])
spotify_client.user_playlist_add_tracks(
SPOTIFY_USER, SPOTIFY_PLAYLIST_ID, [track.id])
return "Added {} by {}".format(track.name, track.artist)
except SpotifyException as e:
return "Unable to add track: {}".format(e)
def remove_from_playlist(track):
playlist = chat_playlist()
try:
sp = spotify_client
sp.user_playlist_remove_all_occurrences_of_tracks(
playlist.owner, playlist.id, [track])
tr = sp.track(track)
return "Removed track {} by {}.".format(
tr["name"], tr["artists"][0]["name"])
except SpotifyException as e:
return "Unable to remove track: {}".format(e)
def title_from_youtube(url):
# Regex by mantish from http://stackoverflow.com/a/9102270 to get the
# video id from a YouTube URL.
match = re.match(
"^.*(youtu.be\/|v\/|u\/\w\/|embed\/|watch\?v=|\&v=)([^#\&\?]*).*", url)
if match and len(match.group(2)) == 11:
video_id = match.group(2)
else:
print "Unable to extract video id: {}".format(url)
return ""
# YouTube response is JSON.
try:
response = youtube_client.videos().list(part="snippet",
id=video_id).execute()
items = response.get("items", [])
if items:
return items[0]["snippet"]["title"]
else:
print "YouTube response was empty: {}".format(response)
return ""
except YouTubeHTTPError as e:
print "Unable to get video entry: {}".format(e)
return ""
def title_from_soundcloud(url):
try:
track = soundcloud_client.get("/resolve", url=url)
return track.title
except SoundcloudHTTPError as e:
print "Unable to resolve url {}, {}".format(url, e)
return ""
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment