Created February 5, 2011 07:52
My basic hack to pull down all of my track history from Last.FM into XML and CSV. It even has resume/top-off functionality.
import urllib
import UnicodeCSV
from xml.etree import ElementTree as ET
def ask_user(question):
cont = raw_input(question)
return (cont != "" and cont[0].upper() == 'Y')
apikey = '7b93d3395ddf82a0ae762a8d4f11981d'
fetchurl = ''
username = raw_input('Username? ');
outfile = raw_input('Outfile? [tracks.xml] ');
if(outfile == ""):
outfile = "tracks.xml"
pagenum = raw_input('Start page? [1] ');
if(pagenum == "" or int(pagenum) < 1):
pagenum = 1
pagenum = int(pagenum)
xml = None
#Check for existing files
print "Checking for existing XML file..."
xml = ET.parse(open(outfile, 'r'))
if(ask_user("There appears to already be an existing XML file. Append to that file (y/n)? ")):
if(ask_user("Shall I only pull records newer than the most recent existing record (y/n)? ")):
maxuts = 0
for t in xml.findall("recenttracks/track"):
if(int(t.find('date').attrib['uts']) > maxuts):
maxuts = int(t.find('date').attrib['uts'])
fetchurl += "&from=" + str(maxuts)
if(ask_user("Are you resuming, and I should pull only older than the oldest (y/n)? ")):
minuts = 1e10000
for t in xml.findall("recenttracks/track"):
if(int(t.find('date').attrib['uts']) < minuts):
minuts = int(t.find('date').attrib['uts'])
fetchurl += "&to=" + str(minuts)
xml = None
except IOError:
print "Not found."
if(xml == None):
xml = ET.ElementTree()
rt = xml.find('recenttracks')
appending = False
csvhandle = None
print "Checking for existing CSV file..."
csvhandle = open(outfile + '.csv', 'r')
if(ask_user("There appears to already be an existing CSV file. Append to that file (y/n)? ")):
csvhandle = open(outfile + '.csv', 'ab')
appending = True
except IOError:
print "Not found."
#If we're overwriting, create the file
if(not appending):
csvhandle = open(outfile + '.csv','wb')
csvfile = UnicodeCSV.UnicodeWriter(csvhandle)
if(not appending):
pagenum = 1
done = 0
while (not done):
print "Fetching page " + str(pagenum) + "..."
skip = False
success = False
while(not success and not skip):
url = fetchurl % \
{"username": username, "apikey": apikey, "pagenum": pagenum}
print url
curpage = urllib.urlopen(url)
success = True
except IOError:
if(ask_user("I/O Error. Try again? (y/n)")):
skip = False
skip = True
if(curpage.getcode() != 200):
if(ask_user("HTTP error " + str(curpage.getcode()) + ". Try again? (y/n)")):
skip = False
skip = True
curxml = ET.parse(curpage)
print str(int(curxml.find("recenttracks").attrib["totalPages"]) - pagenum) + " pages remaining."
newtracks = curxml.findall("recenttracks/track")
for t in newtracks:
trackdata = {
"track": t.findtext('name'),
"artist": t.findtext('artist'),
"album": t.findtext('album'),
"timestamp": t.find('date').attrib['uts'],
print "Imported \"%(track)s\" by %(artist)s, played on %(playtime)s" % \
xml.write(outfile, "UTF-8")
if(not success):
if(ask_user("We had problems on page " + pagenum + ". Abort (y/n)? ")):
done = True
#Taken from
import csv, codecs, cStringIO
class UTF8Recoder:
Iterator that reads an encoded stream and reencodes the input to UTF-8
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
class UnicodeReader:
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def next(self):
row =
return [unicode(s, "utf-8") for s in row]
def __iter__(self):
return self
class UnicodeWriter:
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds) = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow([s.encode("utf-8") for s in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
# empty queue
def writerows(self, rows):
for row in rows:
