Skip to content

Instantly share code, notes, and snippets.

@holys
Forked from PeterDing/xiami_vip_linux.py
Created September 26, 2013 05:41
Show Gist options
  • Save holys/6710245 to your computer and use it in GitHub Desktop.
Save holys/6710245 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
# vim: set fileencoding=utf8
import re, sys, os, random, datetime, time, json, urllib2, logging
from mutagen.id3 import ID3,TRCK,TIT2,TALB,TPE1,APIC
from HTMLParser import HTMLParser
parser = HTMLParser()
s = u'\x1b[1;%dm%s\x1b[0m' # terminual color template
email = ''
password = ''
#############################################################
# Xiami api for android
#{{{
# url_action_fav = "http://www.xiami.com/app/android/fav?id=%s&type=%s"
# url_action_unfav = "http://www.xiami.com/app/android/unfav?id=%s&type=%s"
# url_album = "http://www.xiami.com/app/android/album?id=%s&uid=%s"
# url_song = "http://www.xiami.com/app/android/song?id=%s&uid=%s"
# url_artist = "http://www.xiami.com/app/android/artist?id=%s"
# url_artist_albums = "http://www.xiami.com/app/android/artist-albums?id=%s&page=%s"
# url_artist_radio = "http://www.xiami.com/app/android/radio-artist?id=%s"
# url_artist_top_song = "http://www.xiami.com/app/android/artist-topsongs?id=%s"
# url_artsit_similars = "http://www.xiami.com/app/android/artist-similar?id=%s"
# url_collect = "http://www.xiami.com/app/android/collect?id=%s&uid=%s"
# url_grade = "http://www.xiami.com/app/android/grade?id=%s&grade=%s"
# url_lib_albums = "http://www.xiami.com/app/android/lib-albums?uid=%s&page=%s"
# url_lib_artists = "http://www.xiami.com/app/android/lib-artists?uid=%s&page=%s"
# url_lib_collects = "http://www.xiami.com/app/android/lib-collects?uid=%s&page=%s"
# url_lib_songs = "http://www.xiami.com/app/android/lib-songs?uid=%s&page=%s"
# url_myplaylist = "http://www.xiami.com/app/android/myplaylist?uid=%s"
# url_myradiosongs = "http://www.xiami.com/app/android/lib-rnd?uid=%s"
# url_playlog = "http://www.xiami.com/app/android/playlog?id=%s&uid=%s"
# url_push_songs = "http://www.xiami.com/app/android/push-songs?uid=%s&deviceid=%s"
# url_radio = "http://www.xiami.com/app/android/radio?id=%s&uid=%s"
# url_radio_categories = "http://www.xiami.com/app/android/radio-category"
# url_radio_similar = "http://www.xiami.com/app/android/radio-similar?id=%s&uid=%s"
# url_rndsongs = "http://www.xiami.com/app/android/rnd?uid=%s"
# url_search_all = "http://www.xiami.com/app/android/searchv1?key=%s"
# url_search_parts = "http://www.xiami.com/app/android/search-part?key=%s&type=%s&page=%s"
#}}}
#############################################################
############################################################
# Xiami api for android
# {{{
url_song = "http://www.xiami.com/app/android/song?id=%s"
url_album = "http://www.xiami.com/app/android/album?id=%s"
url_collect = "http://www.xiami.com/app/android/collect?id=%s"
url_artist_albums = "http://www.xiami.com/app/android/artist-albums?id=%s&page=%s"
url_artist_top_song = "http://www.xiami.com/app/android/artist-topsongs?id=%s"
url_lib_songs = "http://www.xiami.com/app/android/lib-songs?uid=%s&page=%s"
# }}}
############################################################
############################################################
# wget exit status
wget_es = {
0:"No problems occurred.",
2:"User interference.",
1<<8:"Generic error code.",
2<<8:"Parse error---for instance, when parsing command-line optio.wgetrc or .netrc...",
3<<8:"File I/O error.",
4<<8:"Network failure.",
5<<8:"SSL verification failure.",
6<<8:"Username/password authentication failure.",
7<<8:"Protocol errors.",
8<<8:"Server issued an error response."
}
############################################################
def decry(row, encryed_url):
url = encryed_url
urllen = len(url)
rows = int(row)
cols_base = urllen / rows # basic column count
rows_ex = urllen % rows # count of rows that have 1 more column
matrix = []
for r in xrange(rows):
length = cols_base + 1 if r < rows_ex else cols_base
matrix.append(url[:length])
url = url[length:]
url = ''
for i in xrange(urllen):
url += matrix[i % rows][i / rows]
return urllib.unquote(url).replace('^', '0')
def modificate_text(text):
text = parser.unescape(text)
text = re.sub(r'\s*:\s*', ' - ', text) # for FAT file system
text = re.sub(r'//*', '-', text)
text = re.sub(r' *', ' ', text)
text = text.replace('/', '-')
text = text.replace('\\', '-')
return text
def modificate_file_name_for_wget(file_name):
file_name = file_name.replace('?', '') # for FAT file system
file_name = file_name.replace('"', '\'') # for FAT file system
return file_name
def z_index(song_infos):
size = len(song_infos)
if size <= 9:
return 1
elif size >= 10 and size <= 99:
return 2
elif size >= 100 and size <= 999:
return 3
else:
return 1
#############################################################
# from https://gist.github.com/lepture/1014329
#############################################################
# {{{
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2011, lepture.com
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the author nor the names of its contributors
# may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import urllib
import httplib
from contextlib import closing
from Cookie import SimpleCookie
ua = 'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.95 Safari/537.36'
#ua = 'Mozilla/5.0 (Linux; Android 4.0.4; Galaxy Nexus Build/IMM76B) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.133 Mobile Safari/535.19'
checkin_headers = {
'User-Agent': ua,
'Content-Length': '0',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Host': 'www.xiami.com',
'Origin': 'http://www.xiami.com/',
'Referer': 'http://www.xiami.com/',
'Content-Length': '0',
}
class xiami_login(object):
def __init__(self, email, password):
self.email = email
self.password = password
self._auth = None
def login(self):
print 'login ....'
_form = {
'email': self.email,
'password': self.password,
'LoginButton': '登录',
}
data = urllib.urlencode(_form)
headers = {'User-Agent': ua}
headers['Referer'] = 'http://www.xiami.com/web/login'
headers['Content-Type'] = 'application/x-www-form-urlencoded'
with closing(httplib.HTTPConnection('www.xiami.com')) as conn:
conn.request('POST', '/web/login', data, headers)
res = conn.getresponse()
cookie = res.getheader('Set-Cookie')
self._auth = SimpleCookie(cookie)['member_auth'].value
print 'login success'
return self._auth
def checkin(self):
if not self._auth:
self.login()
headers = checkin_headers
headers['Cookie'] = 'member_auth=%s; t_sign_auth=1' % self._auth
with closing(httplib.HTTPConnection('www.xiami.com')) as conn:
conn.request('POST', '/task/signin', None, headers)
res = conn.getresponse()
return res.read()
# }}}
########################################################
class xiami(object):
def __init__(self, url):
self.url = url
self.song_infos = []
self.json_url = ''
self.dir_ = os.getcwd().decode('utf8')
self.cookie = ''
self.template_wgets = 'wget -nv -U "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.95 Safari/537.36" -O "%s" %s'
self.template_song = 'http://www.xiami.com/song/gethqsong/sid/%s'
self.showcollect_id = ''
self.album_id = ''
self.artist_id = ''
self.song_id = ''
self.user_id = ''
def init(self, email=email, password=password):
home = os.path.expanduser('~')
cookie_dir = os.path.join(home, '.Xiami.cookie') # directory of login_time and member_auth cookie of xiami.com
xm = xiami_login(email, password)
if os.path.exists(cookie_dir):
f = open(cookie_dir).read().split('\n')
#tm = datetime.datetime.now()
#tm = time.mktime(tm.timetuple())
tm = datetime.datetime.now().strftime('%s') # strftime('%s') only works on linux.
tm_s = (int(tm) - int(f[0])) > 18000 # 5 hours later, login after next loginning
xm._auth = f[1]
member_auth = xm._auth
auth = xm.checkin()
if auth != '0' or tm_s:
member_auth = xm.login()
if member_auth != f[1]: print ' --> member_auth is new one.'
tm = datetime.datetime.now().strftime('%s')
open(cookie_dir, 'w').write(tm + '\n' + member_auth)
self.member_auth = member_auth
else:
self.member_auth = member_auth
else:
member_auth = xm.login()
#tm = datetime.datetime.now()
#tm = time.mktime(tm.timetuple())
tm = datetime.datetime.now().strftime('%s')
open(cookie_dir, 'w').write(tm + '\n' + member_auth)
self.member_auth = member_auth
def get_durl(self, id_):
api_json = self.opener.open(self.template_song % id_).read()
j = json.loads(api_json)
t = j['location']
row = t[0]
encryed_url = t[1:]
durl = decry(row, encryed_url)
return durl
def modified_id3(self, file_name, info):
id3 = ID3()
id3.add(TRCK(encoding=3, text=info['track']))
id3.add(TIT2(encoding=3, text=info['song_name']))
id3.add(TALB(encoding=3, text=info['album_name']))
id3.add(TPE1(encoding=3, text=info['artist_name']))
id3.add(APIC(encoding=3, mime='->', type=3, desc=u'Cover', data=info['album_pic_url']))
id3.save(file_name)
def url_parser(self):
if '/showcollect/' in self.url:
self.showcollect_id = re.search(r'/showcollect/id/(\d+)', self.url).group(1)
self.json_url = url_collect % self.showcollect_id
print '正在分析精选集信息 ...'
self.get_collect_infos()
elif '/album/' in self.url:
self.album_id = re.search(r'/album/(\d+)', self.url).group(1)
self.json_url = url_album % self.album_id
print '正在分析专辑信息 ...'
self.get_album_infos()
elif '/artist/' in self.url:
self.artist_id = re.search(r'/artist/(\d+)', self.url).group(1)
code = raw_input('输入 a 下载该艺术家所有专辑.\n输入 t 下载该艺术家top 20歌曲.\n>')
if code == 'a':
self.json_url = url_artist_albums % (self.artist_id, '%s')
print '正在分析艺术家专辑信息 ...'
self.get_artist_albums_infos()
elif code == 't':
self.json_url = url_artist_top_song % self.artist_id
print '正在分析艺术家top20信息 ...'
self.get_artist_top_20_songs_infos()
else:
print ' --> Over'
elif '/song/' in self.url:
self.song_id = re.search(r'/song/(\d+)', self.url).group(1)
self.json_url = url_song % self.song_id
print '正在分析歌曲信息 ...'
self.get_song_infos()
elif '/u/' in self.url:
self.user_id = re.search(r'/u/(\d+)', self.url).group(1)
self.json_url = url_lib_songs % (self.user_id, '%s')
print '正在分析用户歌曲库信息 ...'
self.get_user_songs_infos()
else:
print '请正确输入虾米网址.'
def get_song_infos(self):
logging.info('url -> http://www.xiami.com/song/%s' % self.song_id)
api_json = self.opener.open(self.json_url).read()
j = json.loads(api_json)
song_info = {}
song_info['song_id'] = j['song']['song_id']
song_info['index'] = u''
song_info['track'] = u''
song_info['song_name'] = modificate_text(j['song']['song_name'])
song_info['album_name'] = modificate_text(j['song']['album_name'])
song_info['artist_name'] = modificate_text(j['song']['artist_name'])
song_info['album_pic_url'] = re.sub(r'_\d*', '', j['song']['album_logo'])
file_name = song_info['song_name'] + ' - ' + song_info['artist_name'] + '.mp3'
song_info['file_name'] = file_name
# song_info['low_mp3'] = i['location']
self.song_infos.append(song_info)
self.download()
def get_album_infos(self):
logging.info('url -> http://www.xiami.com/album/%s' % self.album_id)
api_json = self.opener.open(self.json_url).read()
j = json.loads(api_json)
d = modificate_text(j['album']['title'] + ' - ' + j['album']['artist_name'])
self.dir_ = os.path.join(os.getcwd().decode('utf8'), d)
cd_serial_auth = j['album']['songs'][-1]['cd_serial'] > u'1'
z = 0
if not cd_serial_auth:
z = z_index(j['album']['songs'])
for i in j['album']['songs']:
song_info = {}
song_info['song_id'] = i['song_id']
song_info['track'] = i['track']
song_info['song_name'] = modificate_text(i['name'])
song_info['cd_serial'] = i['cd_serial']
song_info['artist_name'] = modificate_text(i['artist_name'])
song_info['album_pic_url'] = re.sub(r'_\d*', '', i['album_logo'])
if song_info['cd_serial'] and cd_serial_auth:
file_name = '[CD-' + song_info['cd_serial'] + '] ' + song_info['track'] + '.' + song_info['song_name'] + ' - ' + song_info['artist_name'] + '.mp3'
song_info['file_name'] = file_name
song_info['album_name'] = modificate_text(i['title'] + ' [CD-' + i['cd_serial'] + ']')
else:
file_name = song_info['track'].zfill(z) + '.' + song_info['song_name'] + ' - ' + song_info['artist_name'] + '.mp3'
song_info['file_name'] = file_name
song_info['album_name'] = modificate_text(i['title'])
# song_info['low_mp3'] = i['location']
self.song_infos.append(song_info)
self.download()
def get_collect_infos(self):
logging.info('url -> http://www.xiami.com/song/showcollect/id/%s' % self.showcollect_id)
api_json = self.opener.open(self.json_url).read()
j = json.loads(api_json)
z = z_index(j['collect']['songs'])
d = modificate_text(j['collect']['name'])
self.dir_ = os.path.join(os.getcwd().decode('utf8'), d)
ii = 1
for i in j['collect']['songs']:
song_info = {}
song_info['song_id'] = i['song_id']
song_info['index'] = unicode(ii).zfill(z)
song_info['track'] = unicode(ii)
song_info['song_name'] = modificate_text(i['name'])
song_info['album_name'] = modificate_text(i['title'])
song_info['artist_name'] = modificate_text(i['artist_name'])
song_info['album_pic_url'] = re.sub(r'_\d*', '', i['album_logo'])
file_name = song_info['index'] + '.' + song_info['song_name'] + ' - ' + song_info['artist_name'] + '.mp3'
song_info['file_name'] = file_name
# song_info['low_mp3'] = i['location']
self.song_infos.append(song_info)
ii += 1
self.download()
def get_artist_albums_infos(self):
ii = 1
while True:
api_json = self.opener.open(self.json_url % str(ii)).read()
j = json.loads(api_json)
if j['albums']:
for i in j['albums']:
id_ = i['album_id']
self.json_url = url_album % id_
self.dir_ = ''
self.song_infos = []
self.get_album_infos()
else:
break
ii += 1
def get_artist_top_20_songs_infos(self):
logging.info('url (top20) -> http://www.xiami.com/artist/%s' % self.artist_id)
api_json = self.opener.open(self.json_url).read()
j = json.loads(api_json)
z = z_index(j['songs'])
d = modificate_text(j['songs'][0]['artist_name'] + u' - top 20')
self.dir_ = os.path.join(os.getcwd().decode('utf8'), d)
ii = 1
for i in j['songs']:
song_info = {}
song_info['song_id'] = i['song_id']
song_info['index'] = unicode(ii).zfill(z)
song_info['track'] = unicode(ii)
song_info['song_name'] = modificate_text(i['name'])
song_info['album_name'] = modificate_text(i['title'])
song_info['artist_name'] = modificate_text(i['artist_name'])
song_info['album_pic_url'] = re.sub(r'_\d*', '', i['album_logo'])
file_name = song_info['index'] + '.' + song_info['song_name'] + ' - ' + song_info['artist_name'] + '.mp3'
song_info['file_name'] = file_name
# song_info['low_mp3'] = i['location']
self.song_infos.append(song_info)
ii += 1
self.download()
def get_user_songs_infos(self):
logging.info('url -> http://www.xiami.com/u/%s' % self.user_id)
ii = 1
while True:
api_json = self.opener.open(self.json_url % str(ii)).read()
j = json.loads(api_json)
if j['songs']:
for i in j['songs']:
song_info = {}
song_info['song_id'] = i['song_id']
song_info['song_name'] = modificate_text(i['name'])
song_info['album_name'] = modificate_text(i['title'])
song_info['artist_name'] = modificate_text(i['artist_name'])
song_info['album_pic_url'] = re.sub(r'_\d*', '', i['album_logo'])
# song_info['low_mp3'] = i['location']
self.song_infos.append(song_info)
else:
break
ii += 1
z = z_index(self.song_infos)
for i in xrange(len(self.song_infos)):
self.song_infos[i]['index'] = unicode(i+1).zfill(z)
self.song_infos[i]['track'] = unicode(i+1)
file_name = self.song_infos[i]['index'] + '.' + self.song_infos[i]['song_name'] + ' - ' + self.song_infos[i]['artist_name'] + '.mp3'
self.song_infos[i]['file_name'] = file_name
self.dir_ = os.path.join(os.getcwd().decode('utf8'), u'虾米用户 %s 收藏的歌曲' % self.user_id)
self.download()
def download(self):
dir_ = modificate_file_name_for_wget(self.dir_)
cwd = os.getcwd().decode('utf8')
csongs = len(self.song_infos)
if dir_ != cwd:
if os.path.exists(dir_):
exist_file_number = len([i for i in os.walk(dir_).next()[2] if i[-4:] == '.mp3'])
if 0 < exist_file_number <= csongs:
self.song_infos = self.song_infos[exist_file_number-1:]
csongs = len(self.song_infos)
#print '请将', u'\x1b[1;91m%s\x1b[0m' % dir_, '删除后下载.'
#sys.exit(0)
else:
os.mkdir(dir_)
print ' >>>', csongs, '首歌曲将要下载.'
logging.info('directory: %s' % dir_)
logging.info('total songs: %d' % csongs)
ii = 1
for i in self.song_infos:
logging.info(' #%d -> %s' % (ii, i['file_name'].encode('utf8')))
num = random.randint(0,100) % 7
col = s % (num + 90, i['file_name'])
print u'\n ++ 正在下载: %s' % col
durl = self.get_durl(i['song_id'])
t = modificate_file_name_for_wget(i['file_name'])
file_name = os.path.join(dir_, t)
file_name_for_wget = file_name.replace('`', '\`')
wget = self.template_wgets % (file_name_for_wget, durl)
wget = wget.encode('utf8')
status = os.system(wget)
time.sleep(20)
if status == 1024:
iii = 0
while iii < 3:
print u' # Error 4 (Network failure), 10秒后从新尝试下载.'
os.remove(file_name)
time.sleep(10)
status = os.system(wget)
if status == 0:
break
else:
iii += 1
if status != 0: # other http-errors, such as 302.
wget_exit_status_info = wget_es[status]
logging.info(' \\\n \\->WARN: status: %d (%s), command: %s' % (status, wget_exit_status_info, wget))
print '\n\n ----### \x1b[1;91mERROR\x1b[0m ==> \x1b[1;91m%d (%s)\x1b[0m ###--- \n\n' % (status, wget_exit_status_info)
print ' ===> ', wget
break
self.modified_id3(file_name, i)
ii += 1
#if status not in [0, 2048]: # other http-errors, such as 302.
#logging.info('WARN: status: %d, command: %s' % (status, wget))
#print '\n\n ----### \x1b[1;91mERROR\x1b[0m ==> \x1b[1;91m%d\x1b[0m ###--- \n\n' % status
#print ' ===> ', wget
#break
#self.modified_id3(file_name, i)
#ii += 1
#else:
#print ' >>>', csongs, '首歌曲将要下载.'
#logging.info('directory: %s' % os.getcwd())
#logging.info('total songs: %d' % csongs)
#ii = 1
#for i in self.song_infos:
#logging.info(' #%d -> %s' % (ii, i['file_name'].encode('utf8')))
#num = random.randint(0,100) % 7
#col = s % (num + 90, i['file_name'])
#print u'\n ++ 正在下载: %s' % col
#durl = self.get_durl(i['song_id'])
#file_name = i['file_name']
#wget = self.template_wgets % (file_name.replace('"', '\''), durl)
#wget = wget.encode('utf8')
#status = os.system(wget)
##time.sleep(10)
#if status not in [0, 2048]: # other http-errors, such as 302.
#logging.info('WARN: status: %d, command: %s' % (status, wget))
#print '\n\n ----### \x1b[1;91mERROR\x1b[0m ==> \x1b[1;91m%d\x1b[0m ###--- \n\n' % status
#print ' ===> ', wget
#break
#self.modified_id3(file_name, i)
#ii += 1
def main(url):
x = xiami(url)
x.init()
opener = urllib2.build_opener()
opener.addheaders = [('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'), ('User-Agent', ua), ('Cookie', 'member_auth=%s' % x.member_auth)]
x.opener = opener
x.url_parser()
logging.info(' ########### work is over ###########\n')
if __name__ == '__main__':
log_file = os.path.join(os.path.expanduser('~'), '.Xiami.log')
logging.basicConfig(filename=log_file, level=10, format='%(asctime)s %(message)s')
print s % (91, u'程序运行日志在 %s' % log_file)
argv = sys.argv
try:
main(argv[1])
except IndexError:
print 'Usage:\n xiami_vip.py url -- download anything of xiami.com'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment