Skip to content

Instantly share code, notes, and snippets.

@cincodenada
Created February 5, 2011 07:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cincodenada/812304 to your computer and use it in GitHub Desktop.
Save cincodenada/812304 to your computer and use it in GitHub Desktop.
My basic hack to pull down all of my track history from Last.FM into XML and CSV. It even has resume/top-off functionality.
import urllib
import UnicodeCSV
from xml.etree import ElementTree as ET
def ask_user(question):
cont = raw_input(question)
return (cont != "" and cont[0].upper() == 'Y')
apikey = '7b93d3395ddf82a0ae762a8d4f11981d'
fetchurl = 'http://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&user=%(username)s&api_key=%(apikey)s&page=%(pagenum)d'
username = raw_input('Username? ');
outfile = raw_input('Outfile? [tracks.xml] ');
if(outfile == ""):
outfile = "tracks.xml"
pagenum = raw_input('Start page? [1] ');
if(pagenum == "" or int(pagenum) < 1):
pagenum = 1
else:
pagenum = int(pagenum)
xml = None
#Check for existing files
print "Checking for existing XML file..."
try:
xml = ET.parse(open(outfile, 'r'))
if(ask_user("There appears to already be an existing XML file. Append to that file (y/n)? ")):
if(ask_user("Shall I only pull records newer than the most recent existing record (y/n)? ")):
maxuts = 0
for t in xml.findall("recenttracks/track"):
if(int(t.find('date').attrib['uts']) > maxuts):
maxuts = int(t.find('date').attrib['uts'])
fetchurl += "&from=" + str(maxuts)
else:
if(ask_user("Are you resuming, and I should pull only older than the oldest (y/n)? ")):
minuts = 1e10000
for t in xml.findall("recenttracks/track"):
if(int(t.find('date').attrib['uts']) < minuts):
minuts = int(t.find('date').attrib['uts'])
fetchurl += "&to=" + str(minuts)
else:
xml = None
except IOError:
print "Not found."
if(xml == None):
xml = ET.ElementTree()
xml._setroot(ET.XML('<lfm><recenttracks></recenttracks></lfm>'))
rt = xml.find('recenttracks')
appending = False
csvhandle = None
try:
print "Checking for existing CSV file..."
csvhandle = open(outfile + '.csv', 'r')
if(ask_user("There appears to already be an existing CSV file. Append to that file (y/n)? ")):
csvhandle.close()
csvhandle = open(outfile + '.csv', 'ab')
appending = True
except IOError:
print "Not found."
#If we're overwriting, create the file
if(not appending):
csvhandle = open(outfile + '.csv','wb')
csvfile = UnicodeCSV.UnicodeWriter(csvhandle)
if(not appending):
csvfile.writerow(['track','artist','album','timestamp'])
pagenum = 1
done = 0
while (not done):
print "Fetching page " + str(pagenum) + "..."
skip = False
success = False
while(not success and not skip):
try:
url = fetchurl % \
{"username": username, "apikey": apikey, "pagenum": pagenum}
print url
curpage = urllib.urlopen(url)
success = True
except IOError:
if(ask_user("I/O Error. Try again? (y/n)")):
skip = False
else:
skip = True
if(curpage.getcode() != 200):
if(ask_user("HTTP error " + str(curpage.getcode()) + ". Try again? (y/n)")):
skip = False
else:
skip = True
curxml = ET.parse(curpage)
print str(int(curxml.find("recenttracks").attrib["totalPages"]) - pagenum) + " pages remaining."
newtracks = curxml.findall("recenttracks/track")
for t in newtracks:
rt.append(t)
trackdata = {
"track": t.findtext('name'),
"artist": t.findtext('artist'),
"album": t.findtext('album'),
"timestamp": t.find('date').attrib['uts'],
"playtime":t.findtext('date')
}
csvfile.writerow([trackdata['track'],trackdata['artist'],trackdata['album'],trackdata['timestamp']])
print "Imported \"%(track)s\" by %(artist)s, played on %(playtime)s" % \
trackdata
xml.write(outfile, "UTF-8")
pagenum+=1
if(not success):
if(ask_user("We had problems on page " + pagenum + ". Abort (y/n)? ")):
done = True
#Taken from http://docs.python.org/library/csv.html#examples
import csv, codecs, cStringIO
class UTF8Recoder:
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8
"""
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
return self.reader.next().encode("utf-8")
class UnicodeReader:
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def next(self):
row = self.reader.next()
return [unicode(s, "utf-8") for s in row]
def __iter__(self):
return self
class UnicodeWriter:
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow([s.encode("utf-8") for s in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment