Last active
November 28, 2016 02:18
-
-
Save wgkoro/7248231 to your computer and use it in GitHub Desktop.
スクレイピング補助クラス。
BeautifulSoup, chardetが必要。
easy_install beautifulsoup4, OR pip install beautifulsoup4,
easy_install chardet, OR pip install chardet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-*- coding:utf-8 -*- | |
u""" | |
Author : wg.koro | |
Update : 2013/10/29 | |
ページスクレイピング補助クラス。 | |
def scrape(soup_obj) をオーバーライドして使うと幸せになれる。 | |
Args: | |
soup_obj: BeautifulSoupオブジェクト。 | |
BeautifulSoupを使ったスクレイピング方法は下記ドキュメントを参照。 | |
http://www.crummy.com/software/BeautifulSoup/bs4/doc/ | |
必須パッケージ: | |
easy_install beautifulsoup4 (OR pip install beautifulsoup4) | |
easy_install chardet (OR pip install chardet) | |
基本的な使い方: | |
scraper = Scraper(urlリスト, 文字コード) # 第二引数を省略すると各URLで文字コード自動検知 | |
scraper.set_action(スクレイピング用関数) # スクレイピング処理を記述した関数を渡す | |
result = scraper.run() # 処理スタート & 結果取得 | |
# 結果出力 | |
for page in result: | |
print page.url | |
print page.data | |
print page.status_code | |
オプション ===== | |
注:以下はstart() を呼ぶ前にセットすること | |
show_status_message(True/False) # どのURLを処理しているかを画面に出力する。True:出力, False:出力なし (デフォルト: 出力なし) | |
set_dummy_ua(new_ua) # ページリクエスト時のUser-agent文字列をnew_uaに変更。(デフォルト: IE10) | |
set_interval(new_sec) # 複数ページを取得する時のインターバル秒数をnew_secにセット (デフォルト: 3秒) | |
""" | |
import urllib2 | |
import chardet | |
import traceback | |
import time | |
from bs4 import BeautifulSoup | |
from urlparse import urlparse | |
class Scraper: | |
def __init__(self, url_list=[], charset=None): | |
self._url_list = url_list | |
self._ua = None | |
self._show_status = False | |
self._interval_sec = 3 | |
self._result_list = [] | |
self._scrape_action = None | |
self._charset = None | |
if charset: | |
self._charset = charset | |
def set_action(self, action): | |
self._scrape_action = action | |
def show_status_message(self, flg=True): | |
self._show_status = flg | |
def set_dummy_ua(self, new_ua): | |
""" | |
Set new user-agent | |
Args: | |
new_ua: User-agent strings | |
Returns: | |
None | |
""" | |
if not new_ua: | |
return | |
self._ua = new_ua | |
def set_interval(self, new_sec): | |
""" | |
Set new interval(seconds) | |
Args: | |
new_sec: Interval seconds | |
Returns: | |
None | |
""" | |
if not new_sec.isdigit(): | |
return | |
self._interval_sec = new_sec | |
def scrape(self, soup_obj): | |
""" | |
Override this method for get data from pages. | |
Args: | |
soup_obj: BeautifulSoup object contains HTML. | |
Returns: | |
Anyt objects you want to get. | |
""" | |
if not self._scrape_action: | |
return None | |
return self._scrape_action(soup_obj) | |
def run(self): | |
""" | |
Start crawling and scraping. | |
Returns: | |
None | |
""" | |
if not self._url_list: | |
print 'ERROR: URL not defined.' | |
return | |
for url in self._url_list: | |
self._print_status('Fetching %s ...' % url) | |
html = Html(url, self._charset) | |
html.fetch(self._ua) | |
if html.status_code < 400: | |
soup = BeautifulSoup(html.html) | |
html.data = self.scrape(soup) | |
self._result_list.append(html) | |
self._print_status('...Done.') | |
if len(self._url_list) > 1: | |
time.sleep(self._interval_sec) | |
return self._result_list | |
def _print_status(self, status_str): | |
if not self._show_status: | |
return | |
print status_str | |
class Html: | |
def __init__(self, url, charset=None): | |
self.url = url | |
self.charset = charset | |
self.data = None | |
self.error = None | |
self._ua = 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; ARM; Trident/6.0)' | |
self._parse_url() | |
def _parse_url(self): | |
parsed = urlparse(self.url) | |
self.domain = parsed.netloc | |
self.https = False | |
if parsed.scheme == 'https': | |
self.https = True | |
def fetch(self, ua_string=None): | |
if ua_string: | |
self._ua = ua_string | |
self._fetch_html() | |
def _fetch_html(self): | |
opener = urllib2.build_opener() | |
opener.addheaders = [('User-agent', self._ua)] | |
urllib2.install_opener(opener) | |
try: | |
page_obj = urllib2.urlopen(self.url) | |
except urllib2.URLError, e: | |
self.status_code = e.code | |
else: | |
self.status_code = page_obj.code | |
raw_html = page_obj.read() | |
self.html = self._str2unicode(raw_html) | |
def _str2unicode(self, raw_string): | |
""" | |
Detect charset, and convert strings to unicode. | |
Returns: | |
Unicode HTML source strings. | |
""" | |
if not self.charset: | |
self.charset = chardet.detect(raw_string)['encoding'] | |
if self.charset is not None: | |
return unicode(raw_string, self.charset, 'ignore') | |
else: | |
raise ValueError, 'Could not detect encoding!' | |
if __name__ == '__main__': | |
""" | |
Example Usage | |
""" | |
url_list = ['http://www.google.com/', 'http://yahoo.co.jp'] | |
""" | |
# Override scrape() method | |
class Example(Scraper): | |
def __init__(self, url_list=[], charset=None): | |
Scraper.__init__(self, url_list, charset) | |
def scrape(self, soup_obj): | |
return soup_obj.title | |
ex = Example(url_list) | |
for result in ex.run(): | |
print result.status_code | |
print result.https | |
print result.domain | |
print result.charset | |
print result.data | |
print '=====' | |
""" | |
# Add scrape action | |
def action(soup_obj=None): | |
if not soup_obj: | |
return | |
return soup_obj.title.string | |
scraper = Scraper(url_list) | |
scraper.set_action(action()) # スクレイピング用アクションを追加 | |
# scraper.show_status_message(True) # 状態表示オプション | |
for result in scraper.run(): | |
print result.status_code | |
print result.https | |
print result.domain | |
print result.charset | |
print result.data | |
print '=====' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment