Last active
September 18, 2015 13:39
-
-
Save linguinee/e5585aa360f21007df53 to your computer and use it in GitHub Desktop.
Add music to a Spotify playlist from past Hangouts messages.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals | |
import json | |
import os | |
import re | |
import sys | |
from apiclient.discovery import build | |
from apiclient.errors import HttpError as YouTubeHTTPError | |
from requests.exceptions import HTTPError as SoundcloudHTTPError | |
from spotipy.client import SpotifyException | |
import soundcloud | |
import spotipy | |
import spotipy.util | |
SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] | |
SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] | |
SPOTIFY_PLAYLIST_ID = os.environ["SPOTIFY_PLAYLIST_ID"] | |
SPOTIFY_REDIRECT_URI = os.environ["SPOTIFY_REDIRECT_URI"] | |
SPOTIFY_TOKEN = os.environ["SPOTIFY_TOKEN"] | |
SPOTIFY_USER = os.environ["SPOTIFY_USER"] | |
SOUNDCLOUD_CLIENT_ID = os.environ["SOUNDCLOUD_CLIENT_ID"] | |
YOUTUBE_API_KEY = os.environ["YOUTUBE_API_KEY"] | |
class SpotifyTrack: | |
def __init__(self, track_id, track_name, track_artist): | |
self.id = track_id | |
self.name = track_name | |
self.artist = track_artist | |
if len(sys.argv) == 3: | |
hangouts_file = sys.argv[1] | |
conversation_id = sys.argv[2] | |
else: | |
print "Usage: %s hangouts_file conversation_id" % (sys.argv[0],) | |
sys.exit() | |
with open(hangouts_file) as data: | |
hangouts = json.load(data) | |
conversation = None | |
conversations = hangouts["conversation_state"] | |
for c in conversations: | |
if c["conversation_id"]["id"] == conversation_id: | |
conversation = c["conversation_state"]["event"] | |
break | |
if not conversation: | |
print "Could not find conversation with id %s" % (conversation_id) | |
sys.exit() | |
# Soundcloud and YouTube are read-only. | |
soundcloud_client = soundcloud.Client(client_id=SOUNDCLOUD_CLIENT_ID) | |
youtube_client = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
# Spotify needs user auth in order to modify the playlists. | |
if SPOTIFY_TOKEN: | |
spotify_client = spotipy.Spotify(auth=SPOTIFY_TOKEN) | |
else: | |
spotify_token = spotipy.util.prompt_for_user_token( | |
SPOTIFY_USER, | |
scope="playlist-modify-public playlist-modify-private", | |
client_id=SPOTIFY_CLIENT_ID, | |
client_secret=SPOTIFY_CLIENT_SECRET, | |
redirect_uri=SPOTIFY_REDIRECT_URI) | |
print "SPOTIFY_TOKEN=" + spotify_token | |
spotify_client = spotipy.Spotify(auth=spotify_token) | |
def main(): | |
for message in conversation: | |
if message["event_type"] == "REGULAR_CHAT_MESSAGE": | |
try: | |
s = message["chat_message"]["message_content"]["segment"] | |
except KeyError as e: | |
continue # embed, not an actual message | |
process_message(s) | |
def process_message(segments): | |
spotify_remove = False | |
# Messages are divided into segments, with links separate from text. | |
for segment in segments: | |
# In text segments, only remove and query commands matter. | |
if segment["type"] == "TEXT": | |
q = segment["text"] | |
if q == "/bot spotify remove ": | |
spotify_remove = True | |
continue | |
elif ("/bot spotify " in q and q != "/bot spotify on" and | |
q != "/bot spotify off" and q != "/bot spotify playlist" and | |
q != "/bot spotify remove" and q != "/bot spotify help"): | |
query = re.split("/bot spotify\s+", q) | |
if len(query) > 1 and query[1]: | |
add_to_spotify(query[1]) | |
# Check for music links in link segments. | |
elif segment["type"] == "LINK": | |
link = extract_music_link(segment["text"]) | |
if link: | |
if spotify_remove and "spotify" in link: | |
remove_from_playlist(link) | |
spotify_remove = False | |
else: | |
if "spotify" in link: | |
t = spotify_client.track(link) | |
track = SpotifyTrack( | |
t["id"], t["name"], t["artists"][0]["name"]) | |
add_to_playlist(track) | |
else: | |
if "youtube" in link or "youtu.be" in link: | |
query = title_from_youtube(link) | |
elif "soundcloud" in link: | |
query = title_from_soundcloud(link) | |
else: | |
continue | |
if query: | |
add_to_spotify(query) | |
else: | |
print "Unable to get the song title for {}.".format(link) | |
def extract_music_link(link): | |
m = re.compile(("(https?://)?([a-z0-9.]*?\.)?(youtube.com/|youtu.be/|" | |
"soundcloud.com/|spotify.com/track/)" | |
"([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])")) | |
if m.match(link): | |
return link if re.match("https?://", link) else "https://" + link | |
else: | |
return "" | |
def add_to_spotify(query): | |
"""Searches Spotify for the query and adds the first search result | |
to the playlist. Returns a status string.""" | |
track = search_spotify(query) | |
if track: | |
print add_to_playlist(track) | |
else: | |
print "No tracks found for '{}'.".format(query) | |
def search_spotify(query): | |
"""Searches spotify for the cleaned query and returns the first search | |
result, if one exists.""" | |
bl_following = ["official", "with", "prod", "by", "from"] | |
bl_remove = ["freestyle", "acoustic", "original", "&"] | |
bl_contains = ["live", "session", "sessions", "edit", "premiere", "cover", | |
"lyric", "lyrics", "records", "release", "video", "audio", | |
"in the open"] | |
gs = _clean(query) | |
result = _search(gs) | |
if result: return result | |
# Discard hashtags and mentions. | |
gs[:] = [" ".join(re.sub("(@[A-Za-z0-9]+)|(#[A-Za-z0-9]+)", | |
" ", g).split()) for g in gs] | |
# Discard everything in a group following certain words. | |
for b in bl_following: | |
gs[:] = [re.split(b, g, flags=re.IGNORECASE)[0] for g in gs] | |
result = _search(gs) | |
if result: return result | |
# Discard certain words. | |
for b in bl_remove: | |
match = re.compile(re.escape(b), re.IGNORECASE) | |
gs[:] = [match.sub("", g) for g in gs] | |
result = _search(gs) | |
if result: return result | |
# Aggressively discard groups. | |
gs[:] = [g for g in gs if not any(b in g.lower() for b in bl_contains)] | |
return _search(gs) | |
def _clean(query): | |
"""Splits the query into groups and attempts to remove extraneous groups | |
unrelated to the song title/artist. Returns a list of groups.""" | |
# Blacklists. | |
bl_exact = ["official", "audio", "audio\s+stream", "lyric", "lyrics", | |
"with\s+lyrics?", "explicit", "clean", "explicit\s+version", | |
"clean\s+version", "original\s+version", "hq", "hd", "mv", "m/v", | |
"interscope", "4ad"] | |
bl_following = ["official\s+video", "official\s+music", "official\s+audio", | |
"official\s+lyric", "official\s+lyrics", "official\s+clip", | |
"video\s+lyric", "video\s+lyrics", "video\s+clip", | |
"full\s+video"] | |
# Split into groups. | |
gs = list(filter( | |
None, | |
re.split(u"\s*[-‐‒–—―−~\(\)\[\]\{\}\<\>\|‖¦:;‘’“”\"«»„‚‘]+\s*", | |
query))) | |
# Discard groups that match with anything in the blacklists. | |
gs[:] = [g for g in gs if g.lower() not in bl_exact] | |
for b in bl_following: | |
gs[:] = [re.split(b, g, flags=re.IGNORECASE)[0] for g in gs] | |
# Discard featured artists. | |
gs[:] = [re.split("(f(ea)?t(.|\s+))(?i)", g)[0] for g in gs] | |
return gs | |
def _search(groups): | |
try: | |
query = " ".join(filter(None, groups)) | |
print "......Searching Spotify for {}".format(query) | |
results = spotify_client.search(query) | |
except SpotifyException as e: | |
print "Error when searching Spotify: {}".format(e) | |
return "" | |
if results["tracks"]["total"]: | |
t = results["tracks"]["items"][0] | |
return SpotifyTrack(t["id"], t["name"], t["artists"][0]["name"]) | |
else: | |
return None | |
def add_to_playlist(track): | |
try: | |
spotify_client.user_playlist_remove_all_occurrences_of_tracks( | |
SPOTIFY_USER, SPOTIFY_PLAYLIST_ID, [track.id]) | |
spotify_client.user_playlist_add_tracks( | |
SPOTIFY_USER, SPOTIFY_PLAYLIST_ID, [track.id]) | |
return "Added {} by {}".format(track.name, track.artist) | |
except SpotifyException as e: | |
return "Unable to add track: {}".format(e) | |
def remove_from_playlist(track): | |
playlist = chat_playlist() | |
try: | |
sp = spotify_client | |
sp.user_playlist_remove_all_occurrences_of_tracks( | |
playlist.owner, playlist.id, [track]) | |
tr = sp.track(track) | |
return "Removed track {} by {}.".format( | |
tr["name"], tr["artists"][0]["name"]) | |
except SpotifyException as e: | |
return "Unable to remove track: {}".format(e) | |
def title_from_youtube(url): | |
# Regex by mantish from http://stackoverflow.com/a/9102270 to get the | |
# video id from a YouTube URL. | |
match = re.match( | |
"^.*(youtu.be\/|v\/|u\/\w\/|embed\/|watch\?v=|\&v=)([^#\&\?]*).*", url) | |
if match and len(match.group(2)) == 11: | |
video_id = match.group(2) | |
else: | |
print "Unable to extract video id: {}".format(url) | |
return "" | |
# YouTube response is JSON. | |
try: | |
response = youtube_client.videos().list(part="snippet", | |
id=video_id).execute() | |
items = response.get("items", []) | |
if items: | |
return items[0]["snippet"]["title"] | |
else: | |
print "YouTube response was empty: {}".format(response) | |
return "" | |
except YouTubeHTTPError as e: | |
print "Unable to get video entry: {}".format(e) | |
return "" | |
def title_from_soundcloud(url): | |
try: | |
track = soundcloud_client.get("/resolve", url=url) | |
return track.title | |
except SoundcloudHTTPError as e: | |
print "Unable to resolve url {}, {}".format(url, e) | |
return "" | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment