Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
BBC6Music Spotify Playlist Updater (via @BBC6MusicBot)
python-twitter==3.5
spotipy==2.13.0
tqdm==4.47.0
TWITTER_ACCESS_TOKEN = ""
TWITTER_ACCESS_TOKEN_SECRET = ""
TWITTER_API_KEY = ""
TWITTER_API_SECRET_KEY = ""
TWITTER_BEARER_TOKEN = ""
SPOTIFY_SCOPES = ""
SPOTIFY_USERNAME = ""
SPOTIFY_CLIENT_ID = ""
SPOTIFY_CLIENT_SECRET = ""
SPOTIFY_REDIRECT_URL = ""
SPOTIFY_PLAYLIST_ID = ""
import os
import re
import sqlite3
from datetime import datetime
from typing import List, Tuple
import logging
import tqdm
import twitter
import spotipy
import spotipy.util
from secrets import *
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
logging.basicConfig(filename=f'{__location__}/logs.log',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
class TweetyFi:
def __init__(self, account_name, sp_token=None):
if sp_token is None:
sp_token = spotipy.util.prompt_for_user_token(SPOTIFY_USERNAME, scope=SPOTIFY_SCOPES, client_id=SPOTIFY_CLIENT_ID, client_secret=SPOTIFY_CLIENT_SECRET, redirect_uri=SPOTIFY_REDIRECT_URL, cache_path=f"{__location__}/.spotipycache")
self.db = sqlite3.connect(f'{__location__}/{account_name}.db')
self.account_name = account_name
self.sp = spotipy.Spotify(auth=sp_token)
self.tw = twitter.Api(consumer_key=TWITTER_API_KEY,
consumer_secret=TWITTER_API_SECRET_KEY,
access_token_key=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
self.db_init()
def run(self) -> None:
"""
Get all new tweets since last run, parse them, and update playlist
"""
last_run = self.get_last_tweet_id()
statuses = self.get_statuses(last_run)
logging.info(f"Processing {len(statuses)} statuses")
pbar = tqdm.tqdm(statuses)
for status in pbar:
pbar.set_description(f"Processing: {status.text}")
self.process_status(status)
playlist_tracks = self.get_tracks_for_playlist()
self.update_playlist(playlist_tracks)
def process_status(self, status: twitter.models.Status) -> None:
"""
Parse and log the given status.
"""
already_added = (self.log_status(status) == 0)
if not already_added:
# run the regex to get the name, artist
song, artist = self.extract_song_info(status)
# try and get the track from spotify API
spotify_track = self.get_spotify_track(song, artist)
# we don't always succeed
track_id = spotify_track['id'] if spotify_track is not None else None
# log play regardless, we can deal with missing spotify ID during playlist generation
self.log_play(status.id, status.created_at_in_seconds, artist, song, track_id)
def db_init(self) -> None:
"""
If we're dealing with a fresh DB, create tables to store tweets and song plays.
Note that we probably don't need to store the tweets, this is for debugging in
case something goes wrong and I need to look back at the cause.
"""
c = self.db.cursor()
sql = 'CREATE TABLE IF NOT EXISTS tweets(id INTEGER NOT NULL PRIMARY KEY, timestamp DATETIME, tweet TEXT, UNIQUE(id))'
c.execute(sql)
sql = 'CREATE TABLE IF NOT EXISTS plays(play_id INTEGER PRIMARY KEY AUTOINCREMENT, tweet INTEGER, timestamp DATETIME, artist TEXT, song TEXT, track_id TEXT, FOREIGN KEY(tweet) REFERENCES tweets(id))'
c.execute(sql)
self.db.commit()
def get_statuses(self, since_id:int=None, count:int=200) -> List[twitter.models.Status]:
"""
Retrieve statuses for the given account from the Twitter API.
"""
return self.tw.GetUserTimeline(screen_name=self.account_name, count=count, since_id=since_id)
def extract_song_info(self, tweet: twitter.models.Status) -> Tuple[str, str]:
"""
The @BBC6MusicBot tweets in the form: <artist> - <song><other stuff>, this regex (fairly?) reliably extracts that info.
"""
match = re.match(r"Now Playing (?P<artist>.+)\s-\s(?P<song>[^@#]+)(?P<other>.+)?", tweet.text)
song, artist = match.group("song"), match.group("artist")
# sometimes @BBC6MusicBot seems to bork up ampersands. there are probably others here and maybe
# we should ultimately use some html entity decoder?
song = song.replace("&amp;", "&")
artist = artist.replace("&amp;", "&")
song = song.strip()
artist = artist.strip()
return song, artist
def get_current_playlist_tracks(self) -> List[str]:
"""
Retrieve all current tracks from the Spotify playlist.
We have to go through pagination here.
"""
offset = 0
current_tracks = []
while True:
pl = self.sp.playlist_tracks(SPOTIFY_PLAYLIST_ID, fields='items.track.id', offset=offset)
if 'items' in pl:
current_tracks += [t['track']['id'] for t in pl['items']]
offset += len(pl['items'])
if len(pl['items']) == 0:
break
return current_tracks
def update_playlist(self, playlist_tracks) -> None:
"""
Sync the existing playlist with the desired playlist -- get current tracks and do set intersection stuff
to determine removals and adds.
We need to batch this as Spotify API doesn't let you add more than 100 tracks at a time.
"""
current_tracks = self.get_current_playlist_tracks()
to_remove = list(set(current_tracks) - set(playlist_tracks))# in current, not in desired
to_add = list(set(playlist_tracks) - set(current_tracks)) # in desired, not in current
max_at_once = 100
for n in range(0, len(to_remove), max_at_once):
self.sp.user_playlist_remove_all_occurrences_of_tracks(SPOTIFY_USERNAME, SPOTIFY_PLAYLIST_ID, to_remove[n:n+max_at_once])
for n in range(0, len(to_add), max_at_once):
self.sp.user_playlist_add_tracks(SPOTIFY_USERNAME, SPOTIFY_PLAYLIST_ID, to_add[n:n+max_at_once])
if len(to_remove) > 0 or len(to_add) > 0:
logging.info(f"Removed songs: {', '.join(to_remove)}")
logging.info(f"Added songs: {', '.join(to_add)}")
def get_tracks_for_playlist(self) -> List[str]:
"""
Query our SQLite DB to get the 50 most played tracks from the last month. In cases of ties, the
most recent play wins (second ORDER).
"""
cur = self.db.cursor()
cur.execute("""SELECT track_id
FROM plays
WHERE track_id IS NOT NULL
AND timestamp BETWEEN datetime('now', 'localtime', '-1 months') AND datetime('now', 'localtime')
GROUP BY track_id
ORDER BY count(track_id) DESC,
max(timestamp) DESC
LIMIT 50;""")
rows = cur.fetchall()
return [r[0] for r in rows]
def get_spotify_track(self, song, artist) -> str:
"""
Query Spotify API to get track ID of the song with the given name and artist.
This fails (and returns None) for missing songs, or for i.e. live recordings
"""
results = self.sp.search(f"{song} - {artist}", type="track", limit=1)
if len(results['tracks']['items']) > 0:
return results['tracks']['items'][0]
else:
return None
def log_play(self, tweet_id: int, timestamp: int, artist: str, song: str, track_id: str) -> None:
"""
Log a song play in the plays table, with reference to the original tweet (for debugging purposes)
"""
cur = self.db.cursor()
ts = datetime.fromtimestamp(timestamp)
cur.execute("INSERT INTO plays (timestamp, tweet, artist, song, track_id) VALUES (?, ?, ?, ?, ?)", (ts, tweet_id, artist, song, track_id))
self.db.commit()
def log_status(self, status: twitter.models.Status) -> None:
"""
Log a tweet, returning the row ID. If the tweet has already been logged then 0 will be returned.
"""
cur = self.db.cursor()
ts = datetime.fromtimestamp(status.created_at_in_seconds)
cur.execute("INSERT OR IGNORE INTO tweets (id, timestamp, tweet) VALUES (?, ?, ?)", (status.id, ts, status.text))
rowid = cur.lastrowid
self.db.commit()
return rowid
def get_last_tweet_id(self) -> int:
"""
Get ID of most recently added tweet. We use this when querying twitter API to make sure
we only get new tweets.
"""
cur = self.db.cursor()
cur.execute("SELECT id FROM tweets ORDER BY timestamp DESC LIMIT 1")
rows = cur.fetchall()
if len(rows) == 0:
return None
return rows[0][0]
if __name__ == "__main__":
account_name = "BBC6MusicBot"
tweetyfi = TweetyFi(account_name)
tweetyfi.run()
@JosephRedfern

This comment has been minimized.

Copy link
Owner Author

@JosephRedfern JosephRedfern commented Oct 15, 2020

Update: I was banned for Twitter for using this script, so I can't advise that you use it. https://redfern.me/banned-from-twitter/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.