Created
June 30, 2010 00:44
-
-
Save mdellavo/458056 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# http://nettech.wikia.com/wiki/Grooveshark_Internal_API | |
# http://github.com/magcius/groovespark/ | |
import cookielib, hashlib, json, os, urllib, urllib2, uuid, sys, pprint, random, shutil, re | |
from optparse import OptionParser | |
cookiejar = cookielib.LWPCookieJar() | |
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) | |
urllib2.install_opener(opener) | |
sha1 = lambda x: hashlib.sha1(x).hexdigest() | |
class GrooveShark(object): | |
API_ROOT = 'cowbell.grooveshark.com' | |
USER_AGENT = 'Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.1 (KHTML, like Gecko) Chrome/6.0.433.0 Safari/534.1' | |
CLIENT = 'gslite' | |
VERSION = '20100412.45' | |
def __init__(self, uuid=None, session=None, token=None, country=None, debug=False): | |
self.debug = debug | |
self.uuid = uuid or self.get_uuid() | |
self.session = session or self.get_session() | |
self.token = token or self.get_token() | |
self.country = country or self.get_country() | |
def get_uuid(self): | |
return str(uuid.uuid4()).upper() | |
def get_session(self): | |
cookiejar.clear() | |
req = urllib2.urlopen(urllib2.Request('http://listen.grooveshark.com')) | |
for cookie in cookiejar: | |
if cookie.domain == '.grooveshark.com' and cookie.name == 'PHPSESSID': | |
if self.debug: | |
print 'session:', cookie.value | |
return cookie.value | |
def get_token(self): | |
params = { | |
'secretKey' : hashlib.md5(self.session).hexdigest(), | |
'tokenRequired' : False | |
} | |
res = self.do_request('service.php', 'getCommunicationToken', params, https=True) | |
if self.debug: | |
print 'token:', res | |
return res | |
def generate_salt(self, method): | |
salt = "%06x" % random.getrandbits(24) | |
return salt + sha1('%s:%s:quitStealinMahShit:%s' % (method, self.token, salt)) | |
def make_request(self, script, method, params, https=False): | |
url = '%s://%s/%s?%s' % ('https' if https else 'http', self.API_ROOT, script, method) | |
http_headers = { | |
'content-type' : 'application/json', | |
'User-Agent' : self.USER_AGENT | |
} | |
payload = { | |
'header' : { | |
'uuid' : self.uuid, | |
'client' : self.CLIENT, | |
'clientRevision' : self.VERSION, | |
'privacy' : 1 | |
}, | |
'method' : method, | |
'parameters' : params | |
} | |
if getattr(self, 'session', None): | |
payload['header']['session'] = self.session | |
if getattr(self, 'token', None): | |
payload['header']['token'] = self.generate_salt(method) | |
if getattr(self, 'country', None): | |
payload['header']['country'] = self.country | |
payload_s = json.dumps(payload) | |
if self.debug: | |
print '>>>', payload_s | |
return urllib2.Request(url, payload_s, http_headers) | |
def do_request(self, script, method, params, https=False): | |
req = self.make_request(script, method, params, https) | |
if self.debug: | |
print req.get_method(), req.get_full_url() | |
data = urllib2.urlopen(req).read() | |
if self.debug: | |
print data | |
rv = json.loads(data) | |
if self.debug: | |
pprint.pprint(rv) | |
return rv.get('result') | |
def get_country(self): | |
return self.do_request('more.php', 'getCountry', {}) | |
def search(self, **query): | |
return self.do_request('more.php', 'getSearchResultsEx', query).get('result') | |
def get_stream_key(self, song_id): | |
params = { 'songID' : song_id, | |
'prefetch' : True, | |
'mobile' : False, | |
'country' : self.country } | |
return self.do_request('more.php', 'getStreamKeyFromSongIDEx', params) | |
def download_stream(self, stream_host, stream_key, local_path): | |
url = 'http://%s/stream.php' % stream_host | |
params = { | |
'streamKey' : stream_key | |
} | |
headers = { | |
'content-type' : 'application/x-www-form-urlencoded', | |
'User-Agent' : self.USER_AGENT | |
} | |
req = urllib2.Request(url, urllib.urlencode(params), headers) | |
res = urllib2.urlopen(req) | |
dest = open(local_path, 'wb') | |
shutil.copyfileobj(res, dest) | |
def get_song_token(self, song_id): | |
return self.do_request('more.php', 'getTokenForSong', {'songID': song_id, 'country': self.country}).get('Token') | |
def get_song_info(self, token): | |
return self.do_request('more.php', 'getSongFromToken', {'token': token, 'country': self.country}) | |
def print_results(results): | |
output = [] | |
for result in results: | |
output.append("%(SongID)10s\t%(ArtistName)s - %(AlbumName)s - %(Name)s" % result) | |
for o in sorted(output): | |
print o | |
def print0_results(results): | |
print ' '.join(i['SongID'] for i in results) | |
def main(options, args): | |
song_ids = [] | |
def do_fetch(gs, arg): | |
print '>>> Fetching info for "%s"' % arg | |
token = gs.get_song_token(arg) | |
info = gs.get_song_info(token) | |
basename = re.sub('[^\w\- ]+', '', '%(ArtistName)s - %(AlbumName)s - %(Name)s' % info) | |
filename = basename + '.mp3' | |
if not os.path.isfile(filename): | |
print '>>> Downloading "%s"' % basename | |
stream_info = gs.get_stream_key(arg) | |
gs.download_stream(stream_info['ip'], stream_info['streamKey'], filename) | |
else: | |
print '>>> File already exists "%s"' % basename | |
def do_search(gs, arg): | |
print '>>> Searching for "%s"' % arg | |
results = gs.search(type='Songs', query=arg) | |
print0_results(results) if options.print0 else print_results(results) | |
if options.fetch: | |
song_ids.extend([i['SongID'] for i in results]) | |
def do_about(gs, arg): | |
print '>>> Fetching info for "%s"' % arg | |
token = gs.get_song_token(arg) | |
info = gs.get_song_info(token) | |
pprint.pprint(info) | |
command = vars().get('do_' + args[0]) | |
if command: | |
for arg in args[1:]: | |
print '>>> Initializing' | |
rv = command(GrooveShark(debug=options.debug), arg) | |
for song_id in song_ids: | |
if options.fetch: | |
do_fetch(GrooveShark(debug=options.debug), str(song_id)) | |
else: | |
print 'Bad Command', args[0] | |
rv = 1 | |
return 0 | |
if __name__ == '__main__': | |
parser = OptionParser() | |
parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False, help='enable debugging') | |
parser.add_option('-0', '--print0', action='store_true', dest='print0', default=False, help='print on a single line') | |
parser.add_option('-f', '--fetch', action='store_true', dest='fetch', default=False, help='fetch songs found') | |
(options, args) = parser.parse_args() | |
sys.exit(main(options, args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment