Skip to content

Instantly share code, notes, and snippets.

@azimut
Last active August 29, 2015 14:05
Show Gist options
  • Save azimut/339799de2940730e9f61 to your computer and use it in GitHub Desktop.
Save azimut/339799de2940730e9f61 to your computer and use it in GitHub Desktop.
last fm exporter modified to get track,album and artist info

lastfm-export

Modified version of the original last-export script to also obtain album, artist and track info. Just run it and wait. It might take a while (hours) to complete. If we rush, we might get blocked from the API.

Usage

# lastfm-export.py -u <LASTFM_USERNAME>

What to do next?

#!/usr/bin/env python
#-*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Script for exporting tracks through audioscrobbler API.
Usage: lastexport.py -u USER [-o OUTFILE] [-p STARTPAGE] [-s SERVER]
"""
import urllib2, urllib, sys, time, re
import xml.etree.ElementTree as ET
from optparse import OptionParser
import json
import pickle
import os
__version__ = '0.0.4'
if os.path.exists('artist.p'):
artist_dict = pickle.load(open('artist.p', 'rb'))
else:
artist_dict = dict()
if os.path.exists('track.p'):
track_dict = pickle.load(open('track.p', 'rb'))
else:
track_dict = dict()
if os.path.exists('album.p'):
album_dict = pickle.load(open('album.p', 'rb'))
else:
album_dict = dict()
def get_options(parser):
""" Define command line options."""
parser.add_option("-u", "--user", dest="username", default=None,
help="User name.")
parser.add_option("-o", "--outfile", dest="outfile", default="exported_tracks.txt",
help="Output file, default is exported_tracks.txt")
parser.add_option("-p", "--page", dest="startpage", type="int", default="1",
help="Page to start fetching tracks from, default is 1")
parser.add_option("-s", "--server", dest="server", default="last.fm",
help="Server to fetch track info from, default is last.fm")
parser.add_option("-t", "--type", dest="infotype", default="scrobbles",
help="Type of information to export, scrobbles|loved|banned, default is scrobbles")
options, args = parser.parse_args()
if not options.username:
sys.exit("User name not specified, see --help")
if options.infotype == "loved":
infotype = "lovedtracks"
elif options.infotype == "banned":
infotype = "bannedtracks"
else:
infotype = "recenttracks"
return options.username, options.outfile, options.startpage, options.server, infotype
def connect_server(server, username, startpage, sleep_func=time.sleep, tracktype='recenttracks'):
""" Connect to server and get a XML page."""
if server == "libre.fm":
baseurl = 'http://alpha.libre.fm/2.0/?'
urlvars = dict(method='user.get%s' % tracktype,
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'),
user=username,
page=startpage,
limit=200)
elif server == "last.fm":
baseurl = 'http://ws.audioscrobbler.com/2.0/?'
urlvars = dict(method='user.get%s' % tracktype,
api_key='e38cc7822bd7476fe4083e36ee69748e',
user=username,
page=startpage,
limit=50)
else:
if server[:7] != 'http://':
server = 'http://%s' % server
baseurl = server + '/2.0/?'
urlvars = dict(method='user.get%s' % tracktype,
api_key=('lastexport.py-%s' % __version__).ljust(32, '-'),
user=username,
page=startpage,
limit=200)
url = baseurl + urllib.urlencode(urlvars)
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400):
try:
f = urllib2.urlopen(url)
break
except Exception, e:
last_exc = e
print "Exception occured, retrying in %ds: %s" % (interval, e)
sleep_func(interval)
else:
print "Failed to open page %s" % urlvars['page']
raise last_exc
response = f.read()
f.close()
#bad hack to fix bad xml
response = re.sub('\xef\xbf\xbe', '', response)
return response
def connect_server_artist(server, mbid, artist, sleep_func=time.sleep):
""" Connect to server and get a XML page."""
artist = artist.encode('utf-8')
if server == "last.fm":
baseurl = 'http://ws.audioscrobbler.com/2.0/?'
urlvars = dict(method='artist.getinfo',
api_key='e38cc7822bd7476fe4083e36ee69748e',
format='json',
artist=artist,
mbid=mbid)
url = baseurl + urllib.urlencode(urlvars)
print url
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400):
try:
f = urllib2.urlopen(url)
break
except Exception, e:
last_exc = e
print "Exception occured, retrying in %ds: %s" % (interval, e)
sleep_func(interval)
else:
print "Failed to open page %s" % urlvars['page']
raise last_exc
response = f.read()
f.close()
return response
def connect_server_track(server, mbid, track, artist, sleep_func=time.sleep):
""" Connect to server and get a XML page."""
track = track.encode('utf-8')
artist = artist.encode('utf-8')
if server == "last.fm":
baseurl = 'http://ws.audioscrobbler.com/2.0/?'
urlvars = dict(method='track.getInfo',
api_key='e38cc7822bd7476fe4083e36ee69748e',
format='json',
track=track,
artist=artist,
mbid=mbid)
url = baseurl + urllib.urlencode(urlvars)
print url
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400):
try:
f = urllib2.urlopen(url)
break
except Exception, e:
last_exc = e
print "Exception occured, retrying in %ds: %s" % (interval, e)
sleep_func(interval)
else:
print "Failed to open page %s" % urlvars['page']
raise last_exc
response = f.read()
f.close()
return response
def connect_server_album(server, mbid, album, artist, sleep_func=time.sleep):
""" Connect to server and get a XML page."""
album = album.encode('utf-8')
artist = artist.encode('utf-8')
if server == "last.fm":
baseurl = 'http://ws.audioscrobbler.com/2.0/?'
urlvars = dict(method='album.getinfo',
api_key='e38cc7822bd7476fe4083e36ee69748e',
format='json',
mbid=mbid,
album=album,
artist=artist)
url = baseurl + urllib.urlencode(urlvars)
print url
for interval in (1, 5, 10, 62, 240, 480, 1200, 2400):
try:
f = urllib2.urlopen(url)
break
except Exception, e:
last_exc = e
print "Exception occured, retrying in %ds: %s" % (interval, e)
sleep_func(interval)
else:
print "Failed to open page %s" % urlvars['page']
raise last_exc
response = f.read()
f.close()
return response
def get_pageinfo(response, tracktype='recenttracks'):
"""Check how many pages of tracks the user have."""
xmlpage = ET.fromstring(response)
totalpages = xmlpage.find(tracktype).attrib.get('totalPages')
return int(totalpages)
def get_tracklist(response):
"""Read XML page and get a list of tracks and their info."""
xmlpage = ET.fromstring(response)
tracklist = xmlpage.getiterator('track')
return tracklist
def parse_artist(response_dict):
tmp_artist_dict = {}
mbid = response_dict['artist']['mbid']
tmp_artist_dict[mbid] = {}
if isinstance(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}), dict):
tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearfrom','')
tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{}).get('yearto','')
else:
tmp_artist_dict[mbid]['yearfrom'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[0].get('yearfrom','')
year_lenght = len(response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})) - 1
tmp_artist_dict[mbid]['yearto'] = response_dict['artist'].get('bio',{}).get('formationlist',{}).get('formation',{})[year_lenght].get('yearto','')
tmp_artist_dict[mbid]['placeformed'] = response_dict['artist'].get('bio',{}).get('placeformed','')
tmp_artist_dict[mbid]['listeners'] = response_dict['artist'].get('stats',{}).get('listeners','')
tmp_artist_dict[mbid]['playcount'] = response_dict['artist'].get('stats',{}).get('playcount','')
if response_dict['artist'].get('tags',{}) == u'\n ':
tmp_artist_dict[mbid]['tag0'] = ''
tmp_artist_dict[mbid]['tag1'] = ''
tmp_artist_dict[mbid]['tag2'] = ''
tmp_artist_dict[mbid]['tag3'] = ''
tmp_artist_dict[mbid]['tag4'] = ''
elif isinstance(response_dict['artist'].get('tags',{}).get('tag'),dict):
tmp_artist_dict[mbid]['tag0'] = response_dict['artist'].get('tags',{}).get('tag').get('name')
tmp_artist_dict[mbid]['tag1'] = ''
tmp_artist_dict[mbid]['tag2'] = ''
tmp_artist_dict[mbid]['tag3'] = ''
tmp_artist_dict[mbid]['tag4'] = ''
else:
for n_tag in [0,1,2,3,4]:
if len(response_dict['artist'].get('tags',{}).get('tag',{})) >= n_tag + 1:
tmp_artist_dict[mbid]['tag' + str(n_tag)] = response_dict['artist'].get('tags',{}).get('tag',{})[n_tag]['name']
else:
tmp_artist_dict[mbid]['tag' + str(n_tag)] = ''
return tmp_artist_dict
def parse_trackinfo(response_dict):
tmp_track_dict = {}
myid = response_dict['track']['mbid']
tmp_track_dict[myid] = {}
tmp_track_dict[myid]['duration'] = response_dict['track'].get('duration','')
tmp_track_dict[myid]['listeners'] = response_dict['track'].get('listeners','')
tmp_track_dict[myid]['playcount'] = response_dict['track'].get('playcount','')
if response_dict['track'].get('toptags',{}) == u'\n ':
tmp_track_dict[myid]['tag0'] = ''
tmp_track_dict[myid]['tag1'] = ''
tmp_track_dict[myid]['tag2'] = ''
tmp_track_dict[myid]['tag3'] = ''
tmp_track_dict[myid]['tag4'] = ''
elif isinstance(response_dict['track'].get('toptags',{}).get('tag'),dict):
tmp_track_dict[myid]['tag0'] = response_dict['track'].get('toptags',{}).get('tag').get('name')
tmp_track_dict[myid]['tag1'] = ''
tmp_track_dict[myid]['tag2'] = ''
tmp_track_dict[myid]['tag3'] = ''
tmp_track_dict[myid]['tag4'] = ''
else:
for n_tag in [0,1,2,3,4]:
if len(response_dict['track'].get('toptags',{}).get('tag')) >= n_tag + 1:
tmp_track_dict[myid]['tag' + str(n_tag)] = response_dict['track'].get('toptags',{}).get('tag',{})[n_tag]['name']
else:
tmp_track_dict[myid]['tag' + str(n_tag)] = ''
return myid, tmp_track_dict
def parse_album(response_dict):
tmp_album_dict = {}
myid = response_dict['album']['mbid']
tmp_album_dict[myid] = {}
tmp_album_dict[myid]['releasedate'] = response_dict['album'].get('releasedate','')
tmp_album_dict[myid]['listeners'] = response_dict['album'].get('listeners','')
tmp_album_dict[myid]['playcount'] = response_dict['album'].get('playcount','')
return myid, tmp_album_dict
def parse_track(trackelement, username):
"""Extract info from every track entry and output to list."""
if trackelement.find('artist').getchildren():
#artist info is nested in loved/banned tracks xml
artistname = trackelement.find('artist').find('name').text
artistmbid = trackelement.find('artist').find('mbid').text
else:
artistname = trackelement.find('artist').text
artistmbid = trackelement.find('artist').get('mbid')
if trackelement.find('album') is None:
#no album info for loved/banned tracks
albumname = ''
albummbid = ''
else:
albumname = trackelement.find('album').text
albummbid = trackelement.find('album').get('mbid')
trackname = trackelement.find('name').text
trackmbid = trackelement.find('mbid').text
date = trackelement.find('date').get('uts')
if artistmbid and artistmbid is not '' and artistmbid not in artist_dict:
response_artist = connect_server_artist(server, artistmbid, artistname)
print json.loads(response_artist)
artist_dict.update(parse_artist(json.loads(response_artist)))
if trackmbid and trackmbid is not '' and trackmbid not in track_dict:
response_track = connect_server_track(server, trackmbid, trackname, artistname)
print json.loads(response_track)
myid, response_track = parse_trackinfo(json.loads(response_track))
track_dict.update(response_track)
if albummbid and albummbid is not '' and albummbid not in album_dict:
response_album = connect_server_album(server, albummbid, albumname, artistname)
print json.loads(response_album)
if 'message' in json.loads(response_album):
pass
elif isinstance(json.loads(response_album), basestring):
pass
else:
myalbumid, response_album = parse_album(json.loads(response_album))
album_dict.update(response_album)
output = [date,
trackname,
artistname,
albumname,
track_dict.get(trackmbid,{}).get('duration',''),
track_dict.get(trackmbid,{}).get('listeners',''),
track_dict.get(trackmbid,{}).get('playcount',''),
track_dict.get(trackmbid,{}).get('tag0',''),
track_dict.get(trackmbid,{}).get('tag1',''),
track_dict.get(trackmbid,{}).get('tag2',''),
track_dict.get(trackmbid,{}).get('tag3',''),
track_dict.get(trackmbid,{}).get('tag4',''),
trackmbid,
artist_dict.get(artistmbid,{}).get('yearfrom',''),
artist_dict.get(artistmbid,{}).get('yearto',''),
artist_dict.get(artistmbid,{}).get('placeformed',''),
artist_dict.get(artistmbid,{}).get('listeners',''),
artist_dict.get(artistmbid,{}).get('playcount',''),
artist_dict.get(artistmbid,{}).get('tag0',''),
artist_dict.get(artistmbid,{}).get('tag1',''),
artist_dict.get(artistmbid,{}).get('tag2',''),
artist_dict.get(artistmbid,{}).get('tag3',''),
artist_dict.get(artistmbid,{}).get('tag4',''),
artistmbid,
album_dict.get(albummbid,{}).get('releasedate',''),
album_dict.get(albummbid,{}).get('playcount',''),
album_dict.get(albummbid,{}).get('listeners',''),
albummbid]
for i, v in enumerate(output):
if v is None:
output[i] = ''
return output
def write_tracks(tracks, outfileobj):
"""Write tracks to an open file"""
for fields in tracks:
outfileobj.write(("\t".join(fields) + "\n").encode('utf-8'))
def get_tracks(server, username, startpage=1, sleep_func=time.sleep, tracktype='recenttracks'):
page = startpage
response = connect_server(server, username, page, sleep_func, tracktype)
totalpages = get_pageinfo(response, tracktype)
if startpage > totalpages:
raise ValueError("First page (%s) is higher than total pages (%s)." % (startpage, totalpages))
while page <= totalpages:
#Skip connect if on first page, already have that one stored.
if page > startpage:
response = connect_server(server, username, page, sleep_func, tracktype)
tracklist = get_tracklist(response)
tracks = []
for trackelement in tracklist:
# do not export the currently playing track.
if not trackelement.attrib.has_key("nowplaying") or not trackelement.attrib["nowplaying"]:
tracks.append(parse_track(trackelement, username))
yield page, totalpages, tracks
page += 1
sleep_func(.5)
def main(server, username, startpage, outfile, infotype='recenttracks'):
trackdict = dict()
page = startpage # for case of exception
totalpages = -1 # ditto
n = 0
try:
for page, totalpages, tracks in get_tracks(server, username, startpage, tracktype=infotype):
print "Got page %s of %s.." % (page, totalpages)
pickle.dump(artist_dict, open('artist.p','wb'))
pickle.dump(track_dict, open('track.p','wb'))
pickle.dump(album_dict, open('album.p','wb'))
for track in tracks:
if infotype == 'recenttracks':
trackdict.setdefault(track[0], track)
else:
#Can not use timestamp as key for loved/banned tracks as it's not unique
n += 1
trackdict.setdefault(n, track)
except ValueError, e:
exit(e)
except Exception:
raise
finally:
with open(outfile, 'a') as outfileobj:
tracks = sorted(trackdict.values(), reverse=True)
write_tracks(tracks, outfileobj)
print "Wrote page %s-%s of %s to file %s" % (startpage, page, totalpages, outfile)
if __name__ == "__main__":
parser = OptionParser()
username, outfile, startpage, server, infotype = get_options(parser)
main(server, username, startpage, outfile, infotype)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment