Skip to content

Instantly share code, notes, and snippets.

@mdellavo
Created June 30, 2010 00:44
Show Gist options
  • Save mdellavo/458056 to your computer and use it in GitHub Desktop.
Save mdellavo/458056 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# http://nettech.wikia.com/wiki/Grooveshark_Internal_API
# http://github.com/magcius/groovespark/
import cookielib, hashlib, json, os, urllib, urllib2, uuid, sys, pprint, random, shutil, re
from optparse import OptionParser
cookiejar = cookielib.LWPCookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar))
urllib2.install_opener(opener)
sha1 = lambda x: hashlib.sha1(x).hexdigest()
class GrooveShark(object):
API_ROOT = 'cowbell.grooveshark.com'
USER_AGENT = 'Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.1 (KHTML, like Gecko) Chrome/6.0.433.0 Safari/534.1'
CLIENT = 'gslite'
VERSION = '20100412.45'
def __init__(self, uuid=None, session=None, token=None, country=None, debug=False):
self.debug = debug
self.uuid = uuid or self.get_uuid()
self.session = session or self.get_session()
self.token = token or self.get_token()
self.country = country or self.get_country()
def get_uuid(self):
return str(uuid.uuid4()).upper()
def get_session(self):
cookiejar.clear()
req = urllib2.urlopen(urllib2.Request('http://listen.grooveshark.com'))
for cookie in cookiejar:
if cookie.domain == '.grooveshark.com' and cookie.name == 'PHPSESSID':
if self.debug:
print 'session:', cookie.value
return cookie.value
def get_token(self):
params = {
'secretKey' : hashlib.md5(self.session).hexdigest(),
'tokenRequired' : False
}
res = self.do_request('service.php', 'getCommunicationToken', params, https=True)
if self.debug:
print 'token:', res
return res
def generate_salt(self, method):
salt = "%06x" % random.getrandbits(24)
return salt + sha1('%s:%s:quitStealinMahShit:%s' % (method, self.token, salt))
def make_request(self, script, method, params, https=False):
url = '%s://%s/%s?%s' % ('https' if https else 'http', self.API_ROOT, script, method)
http_headers = {
'content-type' : 'application/json',
'User-Agent' : self.USER_AGENT
}
payload = {
'header' : {
'uuid' : self.uuid,
'client' : self.CLIENT,
'clientRevision' : self.VERSION,
'privacy' : 1
},
'method' : method,
'parameters' : params
}
if getattr(self, 'session', None):
payload['header']['session'] = self.session
if getattr(self, 'token', None):
payload['header']['token'] = self.generate_salt(method)
if getattr(self, 'country', None):
payload['header']['country'] = self.country
payload_s = json.dumps(payload)
if self.debug:
print '>>>', payload_s
return urllib2.Request(url, payload_s, http_headers)
def do_request(self, script, method, params, https=False):
req = self.make_request(script, method, params, https)
if self.debug:
print req.get_method(), req.get_full_url()
data = urllib2.urlopen(req).read()
if self.debug:
print data
rv = json.loads(data)
if self.debug:
pprint.pprint(rv)
return rv.get('result')
def get_country(self):
return self.do_request('more.php', 'getCountry', {})
def search(self, **query):
return self.do_request('more.php', 'getSearchResultsEx', query).get('result')
def get_stream_key(self, song_id):
params = { 'songID' : song_id,
'prefetch' : True,
'mobile' : False,
'country' : self.country }
return self.do_request('more.php', 'getStreamKeyFromSongIDEx', params)
def download_stream(self, stream_host, stream_key, local_path):
url = 'http://%s/stream.php' % stream_host
params = {
'streamKey' : stream_key
}
headers = {
'content-type' : 'application/x-www-form-urlencoded',
'User-Agent' : self.USER_AGENT
}
req = urllib2.Request(url, urllib.urlencode(params), headers)
res = urllib2.urlopen(req)
dest = open(local_path, 'wb')
shutil.copyfileobj(res, dest)
def get_song_token(self, song_id):
return self.do_request('more.php', 'getTokenForSong', {'songID': song_id, 'country': self.country}).get('Token')
def get_song_info(self, token):
return self.do_request('more.php', 'getSongFromToken', {'token': token, 'country': self.country})
def print_results(results):
output = []
for result in results:
output.append("%(SongID)10s\t%(ArtistName)s - %(AlbumName)s - %(Name)s" % result)
for o in sorted(output):
print o
def print0_results(results):
print ' '.join(i['SongID'] for i in results)
def main(options, args):
song_ids = []
def do_fetch(gs, arg):
print '>>> Fetching info for "%s"' % arg
token = gs.get_song_token(arg)
info = gs.get_song_info(token)
basename = re.sub('[^\w\- ]+', '', '%(ArtistName)s - %(AlbumName)s - %(Name)s' % info)
filename = basename + '.mp3'
if not os.path.isfile(filename):
print '>>> Downloading "%s"' % basename
stream_info = gs.get_stream_key(arg)
gs.download_stream(stream_info['ip'], stream_info['streamKey'], filename)
else:
print '>>> File already exists "%s"' % basename
def do_search(gs, arg):
print '>>> Searching for "%s"' % arg
results = gs.search(type='Songs', query=arg)
print0_results(results) if options.print0 else print_results(results)
if options.fetch:
song_ids.extend([i['SongID'] for i in results])
def do_about(gs, arg):
print '>>> Fetching info for "%s"' % arg
token = gs.get_song_token(arg)
info = gs.get_song_info(token)
pprint.pprint(info)
command = vars().get('do_' + args[0])
if command:
for arg in args[1:]:
print '>>> Initializing'
rv = command(GrooveShark(debug=options.debug), arg)
for song_id in song_ids:
if options.fetch:
do_fetch(GrooveShark(debug=options.debug), str(song_id))
else:
print 'Bad Command', args[0]
rv = 1
return 0
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False, help='enable debugging')
parser.add_option('-0', '--print0', action='store_true', dest='print0', default=False, help='print on a single line')
parser.add_option('-f', '--fetch', action='store_true', dest='fetch', default=False, help='fetch songs found')
(options, args) = parser.parse_args()
sys.exit(main(options, args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment