Skip to content

Instantly share code, notes, and snippets.

@iydon
Created November 17, 2019 14:25
Show Gist options
  • Save iydon/25e1ac04f2ab97d98c7893e6a3a174dd to your computer and use it in GitHub Desktop.
Save iydon/25e1ac04f2ab97d98c7893e6a3a174dd to your computer and use it in GitHub Desktop.
网易云歌词爬虫 (低效)
#!/usr/bin/python3
# -*- encoding: utf-8 -*-
'''
@File : netease_music.py
@Time : 2019/11/13
@Author : Vian Lee
@Contact : <no contact>
@Docstring : <no docstring>
'''
from collections import defaultdict
from random import gauss
from time import sleep
import pickle
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
class NetEase:
'''网易云歌词爬虫 (模仿人类行为)
:Public API:
- doit(): 开始爬取数据
- result: 数据结果 (dict 形式)
- current_url: 当前地址
- to_pickle(filename): 保存数据
- to_json(filename): 保存数据
'''
_HOME = 'https://music.163.com/#/artist/album?id={}'
_ENCODE = 'utf-8'
_FLAG_HREF = 'href'
_FLAG_LINK = 'a'
_FLAG_INNER_TEXT = 'innerText'
_FLAG_TITLE = 'title'
_FLAG_ATTR = 'get_attribute'
_FLAG_NEXT_PAGE = 'u-page', 'zbtn.znxt'
_FLAG_FRAME = 'contentFrame'
_FLAG_ALBUM = 'tit.s-fc0'
_FLAG_SONG = 'txt', 'b'
_FLAG_LYRIC = 'flag_ctrl', 'lyric-content', 'innerText'
_SLEEP_PARAM = 7, 1
def __init__(self, artist_id=5771, web_driver='Chrome'):
'''Initialize NetEase (网易云音乐).
:Argument:
- artist_id: [str, int], id of artist, default is 5771 (许嵩)
- web_driver:
- str, which web driver to use, default is 'Chrome'
- selenium.webdriver.*.webdriver.WebDriver
:Return:
- None
'''
if isinstance(web_driver, str):
self._browser = getattr(webdriver, web_driver)() # policy: let it crash
elif isinstance(web_driver, webdriver.Remote):
self._browser = web_driver
else:
raise TypeError('Argument `web_driver` has wrong type.')
self._artist_id = artist_id
self._goto(self._HOME.format(artist_id))
self._result = defaultdict(dict)
def __repr__(self):
return f'<NetEase(artist_id={self._artist_id}) @ {hash(self):#x}>'
@property
def result(self):
return self._result
@property
def current_url(self):
return self._browser.current_url
def to_pickle(self, filename='result.pickle'):
'''Save `self.result` to pickle format.
:Argument:
- filename: str
'''
with open(filename, 'wb') as f:
pickle.dump(self._result, f)
def to_json(self, filename='result.json'):
'''Save `self.result` to json format.
:Argument:
- filename: str
'''
with open(filename, 'w', encoding=self._ENCODE) as f:
json.dump(self._result, f,
ensure_ascii=False,
sort_keys=True,
indent=4)
def doit(self):
'''Similar to `sympy.core.basic.Basic.doit`.
:Argument:
- None
:Return:
- None
'''
while True:
current_url = self.current_url
self._switch_to_frame()
for album, album_name in self._find_albums():
self._goto(album, frame=True)
self._result[album_name]
for song, song_name in self._find_songs():
self._goto(song, frame=True)
self._result[album_name][song_name] = self._find_lyric()
self._goto(current_url, frame=True)
if not self._next_page():
break
def _switch_to_frame(self, frame_name=''):
'''网易云采取了框架, 所以需要使用 `switch_to.frame`.
:Argument:
- frame_name: str, DeprecationWarning
:Return:
- None
'''
frame_name = frame_name or self._FLAG_FRAME
frame = self._browser.find_element_by_name(frame_name)
self._browser.switch_to.frame(frame)
def _click(self, web_element):
'''使用 `web_element.click` 可能出现 `ElementClickInterceptedException`
异常, 所以必须等待 5-10 秒, 为避免出错所以手动获取地址并前往.
:Argument:
- web_element: selenium.webdriver.remote.webelement.WebElement
:Return:
- True: Successful.
- False: Some exceptions occur.
:Reference:
- [StackOverflow](https://stackoverflow.com/questions/56779520)
'''
next_page = self._get_href(web_element)
if next_page:
try:
self._goto(next_page)
return True
except Exception as e:
print(e)
return False
def _get_href(self, web_element):
'''得到 `web_element` 的 `href` 属性.
:Argument:
- web_element: selenium.webdriver.remote.webelement.WebElement
:Return:
- str, attribution if successful.
'''
get_attribute = getattr(web_element, self._FLAG_ATTR, None)
if get_attribute is not None:
result = get_attribute(self._FLAG_HREF)
if result is not None:
return result
return ''
def _get_innertext(self, web_element):
'''得到 `web_element` 的 `innerText` 属性.
:Argument:
- web_element: selenium.webdriver.remote.webelement.WebElement
:Return:
- str, attribution if successful.
'''
get_attribute = getattr(web_element, self._FLAG_ATTR, None)
if get_attribute is not None:
result = get_attribute(self._FLAG_INNER_TEXT)
if result is not None:
return result
return ''
def _get_title(self, web_element):
'''得到 `web_element` 的 `title` 属性.
:Argument:
- web_element: selenium.webdriver.remote.webelement.WebElement
:Return:
- str, attribution if successful.
'''
get_attribute = getattr(web_element, self._FLAG_ATTR, None)
if get_attribute is not None:
result = get_attribute(self._FLAG_TITLE)
if result is not None:
return result
return ''
def _goto(self, url, frame=False):
'''Loads a web page in the current browser session.
:Argument:
- url: str
- frame: bool, wheather to switch frame
'''
mu, sigma = self._SLEEP_PARAM
sleep(max(0, gauss(mu, sigma)))
self._browser.get(url)
if frame:
self._switch_to_frame()
def _back(self, frame=False):
'''Goes one step backward in the browser history.
:Argument:
- frame: bool, wheather to switch frame
'''
self._browser.back()
if frame:
self._switch_to_frame()
def _wait(self, timeout=5, poll_frequency=0.5, **kwargs):
'''Web driver wait.
:Argument:
- timeout: [int, float]
- poll_frequency: [int, float]
:Example:
>>> self._wait(element_to_be_clickable=(By.ID, '...'))
'''
EC = expected_conditions
wait = WebDriverWait(self._browser, timeout, poll_frequency)
for key, val in kwargs.items():
wait.until(getattr(EC, key)(val))
def _next_page(self):
'''Goto next page.
:Argument:
- None
:Return:
- True: Successful.
- False: Some exceptions occur @ `self._click`.
:TODO:
1. 可以采用链接形式避免 bug, 例如:
[URL](https://music.163.com/#/artist/album?id=5771&limit=12&offset=24)
'''
f, g = self._FLAG_NEXT_PAGE
pages = self._browser.find_element_by_class_name(f)
next_page = pages.find_element_by_class_name(g)
return self._click(next_page)
def _find_albums(self, mask_name=''):
'''Find albums in `self._browser`.
:Argument:
- mask_name: str, DeprecationWarning
:Return:
- list, list of album links.
'''
mask_name = mask_name or self._FLAG_ALBUM
albums = self._browser.find_elements_by_class_name(mask_name)
urls = map(self._get_href, albums)
vals = map(self._get_innertext, albums)
return list(zip(urls, vals))
def _find_songs(self):
'''Find songs in `self._browser`.
:Argument:
- None
:Return:
- list, list of song links.
'''
f, g = self._FLAG_SONG
txt = self._browser.find_elements_by_class_name(f)
songs = list(map(lambda w: w.find_element_by_tag_name(self._FLAG_LINK), txt))
urls = map(self._get_href, songs)
vals = map(lambda w: self._get_title(w.find_element_by_tag_name(g)), songs)
return list(zip(urls, vals))
def _find_lyric(self):
'''Find lyric in `self._browser`.
:Argument:
- None
:Return:
- str, lyric.
:TODO:
1. [API](http://music.163.com/api/song/lyric?id=411214279&lv=1&kv=1&tv=-1)
2. DO NOT use `execute_script`, and avoid `ElementClickInterceptedException`
'''
f, g, h = self._FLAG_LYRIC
if not self._browser.find_elements_by_id(f):
return ''
self._wait(element_to_be_clickable=(By.ID, f))
# self._browser.find_element_by_id(f).click()
self._browser.execute_script(f'document.getElementById("{f}").click()')
lyric = self._browser.find_element_by_id(g)
return lyric.get_attribute(h)
if __name__ == "__main__":
# IPython >>> %run -i crawler.py
## Cache the webdriver
_locals = locals()
if 'browser' in _locals:
browser = _locals['browser']
n = NetEase(web_driver=browser)
else:
n = NetEase(artist_id='5771&limit=32', web_driver='Firefox')
_locals['browser'] = n._browser
## doit and save as pickle as format
try:
n.doit()
except Exception as e:
from IPython import embed
embed()
finally:
n.to_pickle()
n.to_json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment