Created
July 4, 2020 09:25
-
-
Save JosephRedfern/b7c3079d4889e2ebab72c40a05cce06b to your computer and use it in GitHub Desktop.
BBC6Music Spotify Playlist Updater (via @BBC6MusicBot)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python-twitter==3.5 | |
spotipy==2.13.0 | |
tqdm==4.47.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TWITTER_ACCESS_TOKEN = "" | |
TWITTER_ACCESS_TOKEN_SECRET = "" | |
TWITTER_API_KEY = "" | |
TWITTER_API_SECRET_KEY = "" | |
TWITTER_BEARER_TOKEN = "" | |
SPOTIFY_SCOPES = "" | |
SPOTIFY_USERNAME = "" | |
SPOTIFY_CLIENT_ID = "" | |
SPOTIFY_CLIENT_SECRET = "" | |
SPOTIFY_REDIRECT_URL = "" | |
SPOTIFY_PLAYLIST_ID = "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sqlite3 | |
from datetime import datetime | |
from typing import List, Tuple | |
import logging | |
import tqdm | |
import twitter | |
import spotipy | |
import spotipy.util | |
from secrets import * | |
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) | |
logging.basicConfig(filename=f'{__location__}/logs.log', | |
format='%(asctime)s %(levelname)-8s %(message)s', | |
level=logging.DEBUG, | |
datefmt='%Y-%m-%d %H:%M:%S') | |
class TweetyFi: | |
def __init__(self, account_name, sp_token=None): | |
if sp_token is None: | |
sp_token = spotipy.util.prompt_for_user_token(SPOTIFY_USERNAME, scope=SPOTIFY_SCOPES, client_id=SPOTIFY_CLIENT_ID, client_secret=SPOTIFY_CLIENT_SECRET, redirect_uri=SPOTIFY_REDIRECT_URL, cache_path=f"{__location__}/.spotipycache") | |
self.db = sqlite3.connect(f'{__location__}/{account_name}.db') | |
self.account_name = account_name | |
self.sp = spotipy.Spotify(auth=sp_token) | |
self.tw = twitter.Api(consumer_key=TWITTER_API_KEY, | |
consumer_secret=TWITTER_API_SECRET_KEY, | |
access_token_key=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
self.db_init() | |
def run(self) -> None: | |
""" | |
Get all new tweets since last run, parse them, and update playlist | |
""" | |
last_run = self.get_last_tweet_id() | |
statuses = self.get_statuses(last_run) | |
logging.info(f"Processing {len(statuses)} statuses") | |
pbar = tqdm.tqdm(statuses) | |
for status in pbar: | |
pbar.set_description(f"Processing: {status.text}") | |
self.process_status(status) | |
playlist_tracks = self.get_tracks_for_playlist() | |
self.update_playlist(playlist_tracks) | |
def process_status(self, status: twitter.models.Status) -> None: | |
""" | |
Parse and log the given status. | |
""" | |
already_added = (self.log_status(status) == 0) | |
if not already_added: | |
# run the regex to get the name, artist | |
song, artist = self.extract_song_info(status) | |
# try and get the track from spotify API | |
spotify_track = self.get_spotify_track(song, artist) | |
# we don't always succeed | |
track_id = spotify_track['id'] if spotify_track is not None else None | |
# log play regardless, we can deal with missing spotify ID during playlist generation | |
self.log_play(status.id, status.created_at_in_seconds, artist, song, track_id) | |
def db_init(self) -> None: | |
""" | |
If we're dealing with a fresh DB, create tables to store tweets and song plays. | |
Note that we probably don't need to store the tweets, this is for debugging in | |
case something goes wrong and I need to look back at the cause. | |
""" | |
c = self.db.cursor() | |
sql = 'CREATE TABLE IF NOT EXISTS tweets(id INTEGER NOT NULL PRIMARY KEY, timestamp DATETIME, tweet TEXT, UNIQUE(id))' | |
c.execute(sql) | |
sql = 'CREATE TABLE IF NOT EXISTS plays(play_id INTEGER PRIMARY KEY AUTOINCREMENT, tweet INTEGER, timestamp DATETIME, artist TEXT, song TEXT, track_id TEXT, FOREIGN KEY(tweet) REFERENCES tweets(id))' | |
c.execute(sql) | |
self.db.commit() | |
def get_statuses(self, since_id:int=None, count:int=200) -> List[twitter.models.Status]: | |
""" | |
Retrieve statuses for the given account from the Twitter API. | |
""" | |
return self.tw.GetUserTimeline(screen_name=self.account_name, count=count, since_id=since_id) | |
def extract_song_info(self, tweet: twitter.models.Status) -> Tuple[str, str]: | |
""" | |
The @BBC6MusicBot tweets in the form: <artist> - <song><other stuff>, this regex (fairly?) reliably extracts that info. | |
""" | |
match = re.match(r"Now Playing (?P<artist>.+)\s-\s(?P<song>[^@#]+)(?P<other>.+)?", tweet.text) | |
song, artist = match.group("song"), match.group("artist") | |
# sometimes @BBC6MusicBot seems to bork up ampersands. there are probably others here and maybe | |
# we should ultimately use some html entity decoder? | |
song = song.replace("&", "&") | |
artist = artist.replace("&", "&") | |
song = song.strip() | |
artist = artist.strip() | |
return song, artist | |
def get_current_playlist_tracks(self) -> List[str]: | |
""" | |
Retrieve all current tracks from the Spotify playlist. | |
We have to go through pagination here. | |
""" | |
offset = 0 | |
current_tracks = [] | |
while True: | |
pl = self.sp.playlist_tracks(SPOTIFY_PLAYLIST_ID, fields='items.track.id', offset=offset) | |
if 'items' in pl: | |
current_tracks += [t['track']['id'] for t in pl['items']] | |
offset += len(pl['items']) | |
if len(pl['items']) == 0: | |
break | |
return current_tracks | |
def update_playlist(self, playlist_tracks) -> None: | |
""" | |
Sync the existing playlist with the desired playlist -- get current tracks and do set intersection stuff | |
to determine removals and adds. | |
We need to batch this as Spotify API doesn't let you add more than 100 tracks at a time. | |
""" | |
current_tracks = self.get_current_playlist_tracks() | |
to_remove = list(set(current_tracks) - set(playlist_tracks))# in current, not in desired | |
to_add = list(set(playlist_tracks) - set(current_tracks)) # in desired, not in current | |
max_at_once = 100 | |
for n in range(0, len(to_remove), max_at_once): | |
self.sp.user_playlist_remove_all_occurrences_of_tracks(SPOTIFY_USERNAME, SPOTIFY_PLAYLIST_ID, to_remove[n:n+max_at_once]) | |
for n in range(0, len(to_add), max_at_once): | |
self.sp.user_playlist_add_tracks(SPOTIFY_USERNAME, SPOTIFY_PLAYLIST_ID, to_add[n:n+max_at_once]) | |
if len(to_remove) > 0 or len(to_add) > 0: | |
logging.info(f"Removed songs: {', '.join(to_remove)}") | |
logging.info(f"Added songs: {', '.join(to_add)}") | |
def get_tracks_for_playlist(self) -> List[str]: | |
""" | |
Query our SQLite DB to get the 50 most played tracks from the last month. In cases of ties, the | |
most recent play wins (second ORDER). | |
""" | |
cur = self.db.cursor() | |
cur.execute("""SELECT track_id | |
FROM plays | |
WHERE track_id IS NOT NULL | |
AND timestamp BETWEEN datetime('now', 'localtime', '-1 months') AND datetime('now', 'localtime') | |
GROUP BY track_id | |
ORDER BY count(track_id) DESC, | |
max(timestamp) DESC | |
LIMIT 50;""") | |
rows = cur.fetchall() | |
return [r[0] for r in rows] | |
def get_spotify_track(self, song, artist) -> str: | |
""" | |
Query Spotify API to get track ID of the song with the given name and artist. | |
This fails (and returns None) for missing songs, or for i.e. live recordings | |
""" | |
results = self.sp.search(f"{song} - {artist}", type="track", limit=1) | |
if len(results['tracks']['items']) > 0: | |
return results['tracks']['items'][0] | |
else: | |
return None | |
def log_play(self, tweet_id: int, timestamp: int, artist: str, song: str, track_id: str) -> None: | |
""" | |
Log a song play in the plays table, with reference to the original tweet (for debugging purposes) | |
""" | |
cur = self.db.cursor() | |
ts = datetime.fromtimestamp(timestamp) | |
cur.execute("INSERT INTO plays (timestamp, tweet, artist, song, track_id) VALUES (?, ?, ?, ?, ?)", (ts, tweet_id, artist, song, track_id)) | |
self.db.commit() | |
def log_status(self, status: twitter.models.Status) -> None: | |
""" | |
Log a tweet, returning the row ID. If the tweet has already been logged then 0 will be returned. | |
""" | |
cur = self.db.cursor() | |
ts = datetime.fromtimestamp(status.created_at_in_seconds) | |
cur.execute("INSERT OR IGNORE INTO tweets (id, timestamp, tweet) VALUES (?, ?, ?)", (status.id, ts, status.text)) | |
rowid = cur.lastrowid | |
self.db.commit() | |
return rowid | |
def get_last_tweet_id(self) -> int: | |
""" | |
Get ID of most recently added tweet. We use this when querying twitter API to make sure | |
we only get new tweets. | |
""" | |
cur = self.db.cursor() | |
cur.execute("SELECT id FROM tweets ORDER BY timestamp DESC LIMIT 1") | |
rows = cur.fetchall() | |
if len(rows) == 0: | |
return None | |
return rows[0][0] | |
if __name__ == "__main__": | |
account_name = "BBC6MusicBot" | |
tweetyfi = TweetyFi(account_name) | |
tweetyfi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Update: I was banned for Twitter for using this script, so I can't advise that you use it. https://redfern.me/banned-from-twitter/