Created
November 17, 2019 14:25
-
-
Save iydon/25e1ac04f2ab97d98c7893e6a3a174dd to your computer and use it in GitHub Desktop.
网易云歌词爬虫 (低效)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- encoding: utf-8 -*- | |
''' | |
@File : netease_music.py | |
@Time : 2019/11/13 | |
@Author : Vian Lee | |
@Contact : <no contact> | |
@Docstring : <no docstring> | |
''' | |
from collections import defaultdict | |
from random import gauss | |
from time import sleep | |
import pickle | |
import json | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.wait import WebDriverWait | |
from selenium.webdriver.support import expected_conditions | |
class NetEase: | |
'''网易云歌词爬虫 (模仿人类行为) | |
:Public API: | |
- doit(): 开始爬取数据 | |
- result: 数据结果 (dict 形式) | |
- current_url: 当前地址 | |
- to_pickle(filename): 保存数据 | |
- to_json(filename): 保存数据 | |
''' | |
_HOME = 'https://music.163.com/#/artist/album?id={}' | |
_ENCODE = 'utf-8' | |
_FLAG_HREF = 'href' | |
_FLAG_LINK = 'a' | |
_FLAG_INNER_TEXT = 'innerText' | |
_FLAG_TITLE = 'title' | |
_FLAG_ATTR = 'get_attribute' | |
_FLAG_NEXT_PAGE = 'u-page', 'zbtn.znxt' | |
_FLAG_FRAME = 'contentFrame' | |
_FLAG_ALBUM = 'tit.s-fc0' | |
_FLAG_SONG = 'txt', 'b' | |
_FLAG_LYRIC = 'flag_ctrl', 'lyric-content', 'innerText' | |
_SLEEP_PARAM = 7, 1 | |
def __init__(self, artist_id=5771, web_driver='Chrome'): | |
'''Initialize NetEase (网易云音乐). | |
:Argument: | |
- artist_id: [str, int], id of artist, default is 5771 (许嵩) | |
- web_driver: | |
- str, which web driver to use, default is 'Chrome' | |
- selenium.webdriver.*.webdriver.WebDriver | |
:Return: | |
- None | |
''' | |
if isinstance(web_driver, str): | |
self._browser = getattr(webdriver, web_driver)() # policy: let it crash | |
elif isinstance(web_driver, webdriver.Remote): | |
self._browser = web_driver | |
else: | |
raise TypeError('Argument `web_driver` has wrong type.') | |
self._artist_id = artist_id | |
self._goto(self._HOME.format(artist_id)) | |
self._result = defaultdict(dict) | |
def __repr__(self): | |
return f'<NetEase(artist_id={self._artist_id}) @ {hash(self):#x}>' | |
@property | |
def result(self): | |
return self._result | |
@property | |
def current_url(self): | |
return self._browser.current_url | |
def to_pickle(self, filename='result.pickle'): | |
'''Save `self.result` to pickle format. | |
:Argument: | |
- filename: str | |
''' | |
with open(filename, 'wb') as f: | |
pickle.dump(self._result, f) | |
def to_json(self, filename='result.json'): | |
'''Save `self.result` to json format. | |
:Argument: | |
- filename: str | |
''' | |
with open(filename, 'w', encoding=self._ENCODE) as f: | |
json.dump(self._result, f, | |
ensure_ascii=False, | |
sort_keys=True, | |
indent=4) | |
def doit(self): | |
'''Similar to `sympy.core.basic.Basic.doit`. | |
:Argument: | |
- None | |
:Return: | |
- None | |
''' | |
while True: | |
current_url = self.current_url | |
self._switch_to_frame() | |
for album, album_name in self._find_albums(): | |
self._goto(album, frame=True) | |
self._result[album_name] | |
for song, song_name in self._find_songs(): | |
self._goto(song, frame=True) | |
self._result[album_name][song_name] = self._find_lyric() | |
self._goto(current_url, frame=True) | |
if not self._next_page(): | |
break | |
def _switch_to_frame(self, frame_name=''): | |
'''网易云采取了框架, 所以需要使用 `switch_to.frame`. | |
:Argument: | |
- frame_name: str, DeprecationWarning | |
:Return: | |
- None | |
''' | |
frame_name = frame_name or self._FLAG_FRAME | |
frame = self._browser.find_element_by_name(frame_name) | |
self._browser.switch_to.frame(frame) | |
def _click(self, web_element): | |
'''使用 `web_element.click` 可能出现 `ElementClickInterceptedException` | |
异常, 所以必须等待 5-10 秒, 为避免出错所以手动获取地址并前往. | |
:Argument: | |
- web_element: selenium.webdriver.remote.webelement.WebElement | |
:Return: | |
- True: Successful. | |
- False: Some exceptions occur. | |
:Reference: | |
- [StackOverflow](https://stackoverflow.com/questions/56779520) | |
''' | |
next_page = self._get_href(web_element) | |
if next_page: | |
try: | |
self._goto(next_page) | |
return True | |
except Exception as e: | |
print(e) | |
return False | |
def _get_href(self, web_element): | |
'''得到 `web_element` 的 `href` 属性. | |
:Argument: | |
- web_element: selenium.webdriver.remote.webelement.WebElement | |
:Return: | |
- str, attribution if successful. | |
''' | |
get_attribute = getattr(web_element, self._FLAG_ATTR, None) | |
if get_attribute is not None: | |
result = get_attribute(self._FLAG_HREF) | |
if result is not None: | |
return result | |
return '' | |
def _get_innertext(self, web_element): | |
'''得到 `web_element` 的 `innerText` 属性. | |
:Argument: | |
- web_element: selenium.webdriver.remote.webelement.WebElement | |
:Return: | |
- str, attribution if successful. | |
''' | |
get_attribute = getattr(web_element, self._FLAG_ATTR, None) | |
if get_attribute is not None: | |
result = get_attribute(self._FLAG_INNER_TEXT) | |
if result is not None: | |
return result | |
return '' | |
def _get_title(self, web_element): | |
'''得到 `web_element` 的 `title` 属性. | |
:Argument: | |
- web_element: selenium.webdriver.remote.webelement.WebElement | |
:Return: | |
- str, attribution if successful. | |
''' | |
get_attribute = getattr(web_element, self._FLAG_ATTR, None) | |
if get_attribute is not None: | |
result = get_attribute(self._FLAG_TITLE) | |
if result is not None: | |
return result | |
return '' | |
def _goto(self, url, frame=False): | |
'''Loads a web page in the current browser session. | |
:Argument: | |
- url: str | |
- frame: bool, wheather to switch frame | |
''' | |
mu, sigma = self._SLEEP_PARAM | |
sleep(max(0, gauss(mu, sigma))) | |
self._browser.get(url) | |
if frame: | |
self._switch_to_frame() | |
def _back(self, frame=False): | |
'''Goes one step backward in the browser history. | |
:Argument: | |
- frame: bool, wheather to switch frame | |
''' | |
self._browser.back() | |
if frame: | |
self._switch_to_frame() | |
def _wait(self, timeout=5, poll_frequency=0.5, **kwargs): | |
'''Web driver wait. | |
:Argument: | |
- timeout: [int, float] | |
- poll_frequency: [int, float] | |
:Example: | |
>>> self._wait(element_to_be_clickable=(By.ID, '...')) | |
''' | |
EC = expected_conditions | |
wait = WebDriverWait(self._browser, timeout, poll_frequency) | |
for key, val in kwargs.items(): | |
wait.until(getattr(EC, key)(val)) | |
def _next_page(self): | |
'''Goto next page. | |
:Argument: | |
- None | |
:Return: | |
- True: Successful. | |
- False: Some exceptions occur @ `self._click`. | |
:TODO: | |
1. 可以采用链接形式避免 bug, 例如: | |
[URL](https://music.163.com/#/artist/album?id=5771&limit=12&offset=24) | |
''' | |
f, g = self._FLAG_NEXT_PAGE | |
pages = self._browser.find_element_by_class_name(f) | |
next_page = pages.find_element_by_class_name(g) | |
return self._click(next_page) | |
def _find_albums(self, mask_name=''): | |
'''Find albums in `self._browser`. | |
:Argument: | |
- mask_name: str, DeprecationWarning | |
:Return: | |
- list, list of album links. | |
''' | |
mask_name = mask_name or self._FLAG_ALBUM | |
albums = self._browser.find_elements_by_class_name(mask_name) | |
urls = map(self._get_href, albums) | |
vals = map(self._get_innertext, albums) | |
return list(zip(urls, vals)) | |
def _find_songs(self): | |
'''Find songs in `self._browser`. | |
:Argument: | |
- None | |
:Return: | |
- list, list of song links. | |
''' | |
f, g = self._FLAG_SONG | |
txt = self._browser.find_elements_by_class_name(f) | |
songs = list(map(lambda w: w.find_element_by_tag_name(self._FLAG_LINK), txt)) | |
urls = map(self._get_href, songs) | |
vals = map(lambda w: self._get_title(w.find_element_by_tag_name(g)), songs) | |
return list(zip(urls, vals)) | |
def _find_lyric(self): | |
'''Find lyric in `self._browser`. | |
:Argument: | |
- None | |
:Return: | |
- str, lyric. | |
:TODO: | |
1. [API](http://music.163.com/api/song/lyric?id=411214279&lv=1&kv=1&tv=-1) | |
2. DO NOT use `execute_script`, and avoid `ElementClickInterceptedException` | |
''' | |
f, g, h = self._FLAG_LYRIC | |
if not self._browser.find_elements_by_id(f): | |
return '' | |
self._wait(element_to_be_clickable=(By.ID, f)) | |
# self._browser.find_element_by_id(f).click() | |
self._browser.execute_script(f'document.getElementById("{f}").click()') | |
lyric = self._browser.find_element_by_id(g) | |
return lyric.get_attribute(h) | |
if __name__ == "__main__": | |
# IPython >>> %run -i crawler.py | |
## Cache the webdriver | |
_locals = locals() | |
if 'browser' in _locals: | |
browser = _locals['browser'] | |
n = NetEase(web_driver=browser) | |
else: | |
n = NetEase(artist_id='5771&limit=32', web_driver='Firefox') | |
_locals['browser'] = n._browser | |
## doit and save as pickle as format | |
try: | |
n.doit() | |
except Exception as e: | |
from IPython import embed | |
embed() | |
finally: | |
n.to_pickle() | |
n.to_json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment